Spark-Riak Connector Add-on (Riak TS)
Working With Spark Dataframes

Spark Dataframes With TS Table

To enable DataFrames functionality the first steps are:

val sc = new SparkContext()
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
ts_table_name = "test_table"
sc = pyspark.SparkContext(conf=conf)
sqlContext = pyspark.SQLContext(sc)
ts_table_name = "test_table"

To read data from the existing TS table test-table standard SQLContext means can be used by providing a org.apache.spark.sql.riak data format and using a Riak TS range query:

val df = sqlContext.read
  .option("spark.riak.connection.hosts","riak_host_ip:10017")
    .format("org.apache.spark.sql.riak")
    .load(ts_table_name)
  .select(“time”, “col1”, “col2”)
    .filter(s"time >= CAST($from AS TIMESTAMP) AND time <= CAST($to AS TIMESTAMP) AND  col1= $value1")
df = sqlContext.read /
  .option("spark.riak.connection.hosts","riak_host_ip:10017") /
    .format("org.apache.spark.sql.riak") /
    .load(ts_table_name) /
  .select(“time”, “col1”, “col2”) /
    .filter(s"time >= CAST($from AS TIMESTAMP) AND time <= CAST($to AS TIMESTAMP) AND  col1= $value1")

Schema may be provided using the .schema() method. If not provided, it will be inferred. Any of the connector options can be provided in .option() or .options().

Alternatively, org.apache.spark.sql.riak.RiakSQLContext can be created and then queried with range query using sql() method:

val riakSqlContext = new RiakSQLContext(sc, ts_table_name)
val alternativeDf = riakSqlContext.sql(s"SELECT time, col1 from $ts_table_name WHERE time >= CAST($from AS TIMESTAMP) AND time <= CAST($to AS TIMESTAMP) AND  col1= $value1")

A DataFrame, inputDF, that has the same schema as an existing TS table (column order and types) can be saved to Riak TS as follows:

inputDF.write
   .option("spark.riak.connection.hosts","riak_host_ip:10017")
   .format("org.apache.spark.sql.riak")
   .mode(SaveMode.Append)
   .save(ts_table_name)
inputDF.write /
   .option("spark.riak.connection.hosts","riak_host_ip:10017") /
   .format("org.apache.spark.sql.riak") /
   .mode(SaveMode.Append) /
   .save(ts_table_name)

SaveMode.Append is the only mode available. Any of the connector options can be provided in .option() or .options().