Writing Data to Riak TS
Now that you’ve planned and activated your Riak TS table, you are ready to write data to it.
Writing Data
Riak TS allows you to write multiple rows of data at a time. To demonstrate, we’ll use the example table from earlier:
CREATE TABLE GeoCheckin
(
myfamily varchar not null,
myseries varchar not null,
time timestamp not null,
weather varchar not null,
temperature double,
PRIMARY KEY (
(myfamily, myseries, quantum(time, 15, 'm')),
myfamily, myseries, time
)
)
To write data to your table, put the data in a list:
{ok, Pid} = riakc_pb_socket:start_link("myriakdb.host", 10017).
riakc_ts:put(Pid, "GeoCheckin", [[<<"family1">>, <<"series1">>, 1234567, <<"hot">>, 23.5], [<<"family2">>, <<"series99">>, 1234567, <<"windy">>, 19.8]]).
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.timeseries.Store;
import com.basho.riak.client.core.query.timeseries.*;
import java.util.*;
public class RiakTSInsert {
public static void main(String [] args)
throws UnknownHostException, ExecutionException, InterruptedException
{
// Riak Client with supplied IP and Port
RiakClient client = RiakClient.newClient(10017, "myriakdb.host");
List<Row> rows = Arrays.asList(
new Row(
new Cell("family1"),
new Cell("series1"),
Cell.newTimestamp(1234567),
new Cell("hot"),
new Cell(23.5)
),
new Row(
new Cell("family2"),
new Cell("series99"),
Cell.newTimestamp(1234567),
new Cell("windy"),
new Cell(19.8)
)
);
Store storeCmd = new Store.Builder("GeoCheckin").withRows(rows).build();
client.execute(storeCmd);
client.shutdown();
}
}
import datetime
from riak.client import RiakClient
# NB: modify 'host' and 'pb_port' to match your installation
client = RiakClient(host='myriakdb.host', pb_port=8087)
fiveMins = datetime.timedelta(0, 300)
ts0 = datetime.datetime(2015, 1, 1, 12, 0, 0)
ts1 = ts0 + fiveMins
table = client.table('GeoCheckin')
rows = [
['family1', 'series1', ts0, 'hot', 23.5],
['family1', 'series1', ts1, 'windy', 19.8]
]
ts_obj = table.new(rows)
print "Store result:", ts_obj.store()
client = Riak::Client.new 'myriakdb.host', pb_port: 10017
submission = Riak::TimeSeries::Submission.new client, "GeoCheckin"
submission.measurements = [["family1", "series1", 1234567, "hot", 23.5], ["family2", "series99", 1234567, "windy", 19.8]]
submission.write!
var Riak = require('basho-riak-client');
var hosts = [ 'riak-1:8087', 'riak-2:8087' ];
var client = new Riak.Client(hosts);
var columns = [
{ name: 'geohash', type: Riak.Commands.TS.ColumnType.Varchar },
{ name: 'user', type: Riak.Commands.TS.ColumnType.Varchar },
{ name: 'time', type: Riak.Commands.TS.ColumnType.Timestamp },
{ name: 'weather', type: Riak.Commands.TS.ColumnType.Varchar },
{ name: 'temperature', type: Riak.Commands.TS.ColumnType.Double }
];
var rows = [
[ 'hash1', 'user2', twentyMinsAgo, 'hurricane', 82.3 ],
[ 'hash1', 'user2', fifteenMinsAgo, 'rain', 79.0 ],
[ 'hash1', 'user2', fiveMinsAgo, 'wind', null ],
[ 'hash1', 'user2', now, 'snow', 20.1 ]
];
var cb = function (err, response) {
// NB: response will be true on success
};
var store = new Riak.Commands.TS.Store.Builder()
.withTable('GeoCheckin')
// NB: withColumns is optional
// TS column types will be inferred if it's omitted
.withColumns(columns)
.withRows(rows)
.withCallback(cb)
.build();
client.execute(store);
Note on validation:
Riak TS 1.1.0 validates all rows on the server side before writing occurs, checking the number of row elements and types. If any of the rows fails validation then none of the rows will be written. An error message will be returned with the index numbers of the invalid rows in the batch. The first item in the batch being index one.
Depending on your client, you will recieve different messages indicating whether or not your write was successful.
Successful responses:
- Erlang -
ok
- Java -
void
, not throwing an exception indicates a successful write - Python -
True
- Ruby -
void
, not raising an error indicates a successful write - Node.js -
true
Failure responses:
- Erlang -
RpbErrorResp
- Java - exceptions will be thrown
- Python - exceptions will be thrown
- Ruby -
RpbErrorResp
with errors - Node.js - The
err
callback parameter will have information, and theresponse
parameter will befalse
In the event that your write fails, you should check the error message to see which rows failed validation. For example:
RiakClient client = RiakClient.newClient(10017, "myriakdb.host");
List<Row> someRows = Arrays.asList(
// Good Row
new Row(new Cell("hash1"), new Cell("user1"), Cell.newTimestamp(1234567), new Cell("cloudy"), new Cell(79.0)),
// Bad Rows
new Row(new Cell("hash1"), Cell.newTimestamp(fiveMinsAgo)), // too short
new Row() // no data
);
Store store = new Store.Builder("GeoCheckin").withRows(someRows).build();
final RiakFuture<Void, String> storeFuture = client.executeAsync(store);
storeFuture.await();
assertFalse(storeFuture.isSuccess());
System.out.println(storeFuture.cause().detailMessage);
// Prints "Invalid data found at row index(es) 2, 3"
You could also try the original write again. Failures may be transitory when servers are temporarily unable to talk to each other.
Guidelines
- Batches should not be too large. In our testing, 100 rows per write is a sweet spot, but you should expect different results depending on your hardware and schema.
- Writes will assume that columns are in the same order they’ve been declared in the table.
- Timestamps should be in Unix epoch/UTC milliseconds.
Deleting Data
Below are examples of how to delete data by key in each of our time series supported clients.
riakc_ts:delete(Pid, <<"GeoCheckins">>, [<<"family1">>, <<"series1">>, 1420113600000]).
final List<Cell> keyCells = Arrays.asList(
new Cell("family1"), new Cell("series1"), Cell.newTimestamp(1420113600000));
Delete delete = new Delete.Builder("GeoCheckins", keyCells).build();
client.execute(delete);
client.ts_delete('GeoCheckin', ['family1', 'series1', datetime.datetime(2015, 1, 1, 12, 0, 0)])
delete_operation = Riak::TimeSeries::Deletion.new client, 'GeoCheckins'
delete_operation.key = ['family1', 'series1', 1420113600000]
delete_operation.delete!
var key = [ 'family1', 'series1', 1420113600000 ];
var cb = function (err, response) {
// NB: response will be true on success
};
var cmd = new Riak.Commands.TS.Delete.Builder()
.withTable('GeoCheckins')
.withKey(key)
.withCallback(cb)
.build();
client.execute(cmd);
Next Steps
Now that you’ve written data to your tables, you can query your data.