Spark-Riak Connector Add-on (Riak TS)
Quick Start Guide
This guide will run you through a quick example that uses the Spark-Riak connector to read and write data using Java, Scala, and Python. We will assume you are running this guide on Mac OSX.
Prerequisites
- Update Homebrew with
brew update
. - Install Riak TS OSX build. Instruction can be found here
- Set open file limits for Riak by following the guide here.
- Install Spark with
brew install apache-spark
. - Download the Spark-Riak connector uber jar (containing all dependencies) from here: https://github.com/basho/spark-riak-connector/releases/latest.
Start Riak TS with riak start
.
Scroll down or click below to find the desired quick start guide: - Scala - Python - Java
Scala
In this quick start guide we will run you through an example usage of the Spark-Riak connector using the Spark Scala REPL.
Start Spark Scala REPL with:
path/to/spark-shell /
--conf spark.riak.connection.host=127.0.0.1:8087 /
--driver-class-path /path/to/spark-riak-connector-»VERSION«-uber.jar
Import the following:
import org.apache.spark.sql.SaveMode
import java.sql.Timestamp
import com.basho.riak.spark.rdd.connector.Riakconnector
Then set up Spark SQLContext
:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
Create an RDD with some data:
val testRDD = sc.parallelize(Seq(
(1, "f", Timestamp.valueOf("1980-1-1 10:00:00"), "v1"),
(1, "f", Timestamp.valueOf("1980-1-1 10:10:00"), "v2"),
(1, "f", Timestamp.valueOf("1980-1-1 10:20:00"), "v3")))
Convert the RDD to a Spark SQL DataFrame and look at the schema:
val df = testRDD.toDF("k", "family", "ts", "value")
df.printSchema()
Then, create a TS table with the same data format as the testRDD that we created:
val tableName = "ts_table_c"
val connector = Riakconnector(sc.getConf)
connector.withSessionDo(session =>{
val request = new com.basho.riak.client.api.commands.timeseries.Query.Builder(
s"""
| CREATE TABLE $tableName (
| k SINT64 not null,
| family VARCHAR not null,
| ts TIMESTAMP not null,
| value VARCHAR,
|
| primary key ((k, family, quantum(ts,1,h)), k, family, ts)
| )
|
""".stripMargin)
.build()
val response = session.execute(request)})
Now, write the Spark SQL DataFrame to the newly created Riak TS table:
df.write.format("org.apache.spark.sql.riak").mode(SaveMode.Append).save(tableName)
And, finally, check that the table was successfully written into the Riak TS by making a simple query and printing the result:
val test_query = "ts >= CAST('1980-1-1 10:00:00' AS TIMESTAMP) AND ts <= CAST('1980-1-1 10:30:00' AS TIMESTAMP) AND k = 1 AND family = 'f'"
val df2 = sqlContext.read.format("org.apache.spark.sql.riak").load(tableName).filter(test_query)
df2.show()
Python
In this quick start guide we will run through some examples usages of the Spark-Riak connector using the Spark Python REPL, pyspark
.
Start pyspark
with:
/path/to/bin/pyspark /
--conf spark.riak.connection.host=127.0.0.1:8087 /
--driver-class-path /path/to/spark-riak-connector-{{version}}-uber.jar
Make some imports
:
import riak, datetime, time, random
Set up Riak TS connection:
host='127.0.0.1'
pb_port = '8087'
hostAndPort = ":".join([host, pb_port])
client = riak.RiakClient(host=host, pb_port=pb_port)
table_name = 'pyspark-%d' % int(time.time())
table = client.table(table_name)
Create a TS table:
create_sql = """CREATE TABLE %(table_name)s (
site varchar not null,
species varchar not null,
measurementDate timestamp not null,
value double,
PRIMARY KEY ((site, species, quantum(measurementDate, 24, h)),
site, species, measurementDate))
""" % ({'table_name': table_name})
result = table.query(create_sql)
Print the schema:
schema = table.describe().rows
for r in schema:
print r
You should see:
['site', 'varchar', False, 1L, 1L]
['species', 'varchar', False, 2L, 2L]
['measurementDate', 'timestamp', False, 3L, 3L]
['value', 'double', True, None, None]
Generate and print some data:
site = 'AA'
species = 'fff'
start_date = int(time.time())
events = []
for i in range(9):
measurementDate = start_date + i
value = random.uniform(-20, 110)
events.append([site, species, measurementDate, value])
end_date = measurementDate
for e in events:
print e
You should see something like this:
['AA', 'fff', 1460147465, 84.2863373359625]
['AA', 'fff', 1460147466, 22.460677478919976]
['AA', 'fff', 1460147467, 99.44873894866066]
['AA', 'fff', 1460147468, 79.22655985587694]
['AA', 'fff', 1460147469, 20.37795468066598]
['AA', 'fff', 1460147470, 77.30363887094994]
['AA', 'fff', 1460147471, 77.48514266033274]
['AA', 'fff', 1460147472, 78.94730225284083]
['AA', 'fff', 1460147473, 29.09084815136098]
Create an RDD from the generated data:
testRDD = sc.parallelize(events)
Take this RDD and convert it to a DataFrame and rename the columns to match the Riak TS table:
df = testRDD.toDF(['site', 'species','measurementDate','value'])
df.show()
You should see something like this:
+----+-------+---------------+------------------+
|site|species|measurementDate| value|
+----+-------+---------------+------------------+
| AA| fff| 1460147465| 84.2863373359625|
| AA| fff| 1460147466|22.460677478919976|
| AA| fff| 1460147467| 99.44873894866066|
| AA| fff| 1460147468| 79.22655985587694|
| AA| fff| 1460147469| 20.37795468066598|
| AA| fff| 1460147470| 77.30363887094994|
| AA| fff| 1460147471| 77.48514266033274|
| AA| fff| 1460147472| 78.94730225284083|
| AA| fff| 1460147473| 29.09084815136098|
+----+-------+---------------+------------------+
Write the DataFrame to the TS table:
df.write /
.format('org.apache.spark.sql.riak') /
.option('spark.riak.connection.host', hostAndPort) /
.mode('Append') /
.save(table_name)
Let’s check that the write was successful by reading the TS table into a new DataFrame:
sqlContext = SQLContext(sc)
df2 = sqlContext.read/
.format("org.apache.spark.sql.riak")/
.option("spark.riak.connection.host", hostAndPort)/
.option("spark.riakts.bindings.timestamp", "useLong")/
.load(table_name)/
.filter("""measurementDate > %(start_date)s
AND measurementDate < %(end_date)s
AND site = '%(site)s'
AND species = '%(species)s'
""" % ({'start_date': start_date, 'end_date': end_date, 'site': site, 'species': species}))
Print the table schema:
df2.printSchema()
You should see:
root
|-- site: string (nullable = false)
|-- species: string (nullable = false)
|-- measurementDate: long (nullable = false)
|-- value: double (nullable = true)
Show the DataFrame:
df2.show()
You should see something like this:
+----+-------+---------------+------------------+
|site|species|measurementDate| value|
+----+-------+---------------+------------------+
| AA| fff| 1460147466|22.460677478919976|
| AA| fff| 1460147467| 99.44873894866066|
| AA| fff| 1460147468| 79.22655985587694|
| AA| fff| 1460147469| 20.37795468066598|
| AA| fff| 1460147470| 77.30363887094994|
| AA| fff| 1460147471| 77.48514266033274|
| AA| fff| 1460147472| 78.94730225284083|
+----+-------+---------------+------------------+
Register the DataFrame as a temp SQL table and run a SQL query to obtain the average of the “value” column:
df2.registerTempTable("pyspark_tmp")
sqlContext.sql("select avg(value) as average_value from pyspark_tmp").show()
You should see something similar to this:
+-----------------+
| average_value|
+-----------------+
|65.03571639260672|
+-----------------+
Java
Coming Soon!