Using Secondary Indexes (2i)

Secondary indexes (2i) in Riak enable you to tag objects stored in Riak, at write time, with one or more queryable values. Those values can then be used to find multiple objects in Riak. If you’re storing user data, for example, you could tag each object associated with that user with a username or other unique marker. Once tagged, you could find all objects in a Riak bucket sharing that tag. Secondary indexes can be either a binary or string, such as sensor_1_data or admin_user or click_event, or an integer, such as 99 or 141121.

Please note that 2i can be used only with the LevelDB and Memory backends.

Features

  • Allows two types of secondary attributes: integers and strings (aka binaries)
  • Allows querying by exact match or range on one index
  • Allows pagination of results
  • Allows streaming of results
  • Query results can be used as input to a MapReduce query

Note on 2i and strong consistency Secondary indexes do not currently work with the strong consistency feature introduced in Riak version 2.0. If you store objects in strongly consistent buckets and attach secondary index metadata to those objects, you can still perform strongly consistent operations on those objects but the secondary indexes will be ignored.

When to Use Secondary Indexes

Secondary indexes are useful when you want to find data on the basis of something other than objects’ bucket type, bucket, and key, i.e. when you want objects to be discoverable based on more than their location alone.

2i works best for objects whose value is stored in an opaque blob, like a binary file, because those objects don’t offer any clues that enable you to discover them later. Indexing enables you to tag those objects and find all objects with the same tag in a specified bucket later on.

2i is thus recommended when your use case requires an easy-to-use search mechanism that does not require a schema and a basic query interface, i.e. an interface that enables an application to tell Riak things like “fetch all objects tagged with the string Milwaukee_Bucks” or “fetch all objects tagged with numbers between 1500 and 1509.”

2i is also recommended if your use case requires anti-entropy. Since secondary indexes are just metadata attached to key/value objects, 2i piggybacks off of read-repair.

When Not to Use Secondary Indexes

  • If your ring size exceeds 512 partitions, 2i can cause performance issues in large clusters.
  • When you need more than the exact match and range searches that 2i supports. A query like last_name=zezeski AND state=MD would have to be split into two queries and the results merged (or it would need to involve MapReduce).

Query Interfaces and Examples

Typically, the result set from a 2i query is a list of object keys from the specified bucket that include the index values in question. As we’ll see below, when executing range queries in Riak 1.4 or higher, it is possible to retrieve the index values along with the object keys.

Inserting Objects with Secondary Indexes

In this example, the key john_smith is used to store user data in the bucket users, which bears the default bucket type. Let’s say that an application would like add a Twitter handle and an email address to this object as secondary indexes.

Location johnSmithKey = new Location(new Namespace("default", "users"), "john_smith");

// In the Java client (and all clients), if you do not specify a bucket type,
// the client will use the default type. And so the following store command
// would be equivalent to the one above:
Location johnSmithKey = new Location(new Namespace("users"), "john_smith");

RiakObject obj = new RiakObject()
        .setContentType("application/json")
        .setValue(BinaryValue.create("{'user_data':{ ... }}"));

obj.getIndexes().getIndex(StringBinIndex.named("twitter")).add("jsmith123");
obj.getIndexes().getIndex(StringBinIndex.named("email")).add("jsmith@riak.info");

StoreValue store = new StoreValue.Builder(obj)
        .withLocation(johnSmithKey)
        .build();
client.execute(store);
bucket = client.bucket_type('default').bucket('users')
obj = Riak::RObject.new(bucket, 'john_smith')
obj.content_type = 'application/json'
obj.raw_data = '{"user_data":{ ... }}'

# String/binary indexes must be set as an array of strings
obj.indexes['twitter_bin'] = %w{ jsmith123 }
obj.indexes['email_bin'] = %w{ jsmith@riak.info }
obj.store

# In the Ruby client (and all clients), if you do not specify a bucket
# type, the client will use the default type. And so the following set
# of commands would be equivalent to the one above:

bucket = client.bucket('users')
# repeat the same commands for building the object
obj.store
$object = (new \Basho\Riak\Object('{"user_data":{ ... }}', ['Content-type' => 'application/json']))
  ->addValueToIndex('twitter_bin', 'jsmith123')
  ->addValueToIndex('email_bin', 'jsmith@riak.info');

(new \Basho\Riak\Command\Builder\StoreObject($riak))
  ->withObject($object)
  ->buildLocation('john_smith', 'users', 'default')
  ->build()
  ->execute();
bucket = client.bucket_type('default').bucket('users')
# In the Python client (and all clients), if you do not specify a bucket type,
# the client will use the default type. And so the following store command
# would be equivalent to the one above:
bucket = client.bucket('users')

obj = RiakObject(client, bucket, 'john_smith')
obj.content_type = 'text/plain'
obj.data = '...user data...'
obj.add_index('twitter_bin', 'jsmith123')
obj.add_index('email_bin', 'jsmith@riak.info')
obj.store()
var id = new RiakObjectId("default", "users", "john_smith");
var obj = new RiakObject(id, "...user data...",
    RiakConstants.ContentTypes.TextPlain);
obj.BinIndex("twitter").Set("jsmith123");
obj.BinIndex("email").Set"jsmith@riak.info");
var rslt = client.Put(obj);
var riakObj = new Riak.Commands.KV.RiakObject();
riakObj.setContentType('text/plain');
riakObj.setBucket('users');
riakObj.setKey('john_smith');
riakObj.setValue('...user data...');
riakObj.addToIndex('twitter_bin', 'jsmith123');
riakObj.addToIndex('email_bin', 'jsmith@riak.info');
client.storeValue({ value: riakObj }, function (err, rslt) {
    if (err) {
        throw new Error(err);
    }
});
Obj = riakc_obj:new({<<"default">>, <<"users">>},
                    <<"john_smith">>,
                    <<"...user data...">>,
                    <<"text/plain">>),
%% In the Erlang client (and all clients), if you do not specify a bucket type,
%% the client will use the default type. And so the following object would be
%% equivalent to the one above:

Obj = riakc_obj:new(<<"users">>,
                    <<"john_smith">>,
                    <<"...user data...">>,
                    <<"text/plain">>),
MD1 = riakc_obj:get_update_metadata(Obj),
MD2 = riakc_obj:set_secondary_index(
    MD1,
    [{{binary_index, "twitter"}, [<<"jsmith123">>]},
     {{binary_index, "email"}, [<<"jsmith@riak.info">>]}]),
Obj2 = riakc_obj:update_metadata(Obj, MD2),
riakc_pb_socket:put(Pid, Obj2).
obj := &riak.Object{
    ContentType:     "text/plain",
    Charset:         "utf-8",
    ContentEncoding: "utf-8",
    BucketType:      "indexes",
    Bucket:          "users",
    Key:             "john_smith",
    Value:           []byte("…user data…"),
}

obj.AddToIndex("twitter_bin", "jsmith123")
obj.AddToIndex("email_bin", "jsmith@riak.info")

cmd, err := riak.NewStoreValueCommandBuilder().
    WithContent(obj).
    Build()
if err != nil {
    return err
}

if err := cluster.Execute(cmd); err != nil {
    return err
}
curl -XPOST localhost:8098/types/default/buckets/users/keys/john_smith \
  -H 'x-riak-index-twitter_bin: jsmith123' \
  -H 'x-riak-index-email_bin: jsmith@riak.info' \
  -H 'Content-Type: application/json' \
  -d '{"userData":"data"}'

Getting started with Riak clients

If you are connecting to Riak using one of Basho’s official client libraries, you can find more information about getting started with your client in the Developing with Riak KV: Getting Started section.

This has accomplished the following:

  • The object has been stored with a primary bucket/key of users/john_smith
  • The object now has a secondary index called twitter_bin with a value of jsmith123
  • The object now has a secondary index called email_bin with a value of jsmith@riak.info

Querying Objects with Secondary Indexes

Let’s query the users bucket on the basis of Twitter handle to make sure that we can find our stored object:

Namespace usersBucket = new Namespace("users");
BinIndexQuery biq = new BinIndexQuery.Builder(usersBucket, "twitter", "jsmith123")
        .build();
BinIndexQuery.Response response = client.execute(biq);
List<BinIndexQuery.Response.Entry> entries = response.getEntries();
for (BinIndexQuery.Response.Entry entry : entries) {
    System.out.println(entry.getRiakObjectLocation().getKey());
}
bucket = client.bucket('users')
bucket.get_index('twitter_bin', 'jsmith123')

# This is equivalent to the following:
bucket = client.bucket_type('default').bucket('users')
bucket.get_index('twitter_bin', 'jsmith123')
$response = (new \Basho\Riak\Command\Builder\QueryIndex($riak))
  ->buildBucket('users')
  ->withIndexName('twitter_bin')
  ->withScalarValue('jsmith123')
  ->build()
  ->execute()
  ->getResults();
bucket = client.bucket('users') # equivalent to client.bucket_type('default').bucket('users')
bucket.get_index('twitter_bin', 'jsmith123').results
var idxId = new RiakIndexId("default", "users", "twitter");
var rslt = client.GetSecondaryIndex(idxId, "jsmith123");
var idxRslt = rslt.Value;
foreach (var keyTerm in idxRslt.IndexKeyTerms)
{
    Debug.WriteLine(keyTerm.Key);
}
var query_keys = [];
function query_cb(err, rslt) {
    if (err) {
        throw new Error(err);
    }

    if (rslt.done) {
        query_keys.forEach(function (key) {
            logger.info("2i query key: '%s'", key);
        });
    }

    if (rslt.values.length > 0) {
        Array.prototype.push.apply(query_keys,
            rslt.values.map(function (value) {
                return value.objectKey;
            }));
    }
}

var cmd = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
    .withBucket('users')
    .withIndexName('twitter_bin')
    .withIndexKey('jsmith123')
    .withCallback(query_cb)
    .build();
client.execute(cmd);
{ok, Results} =
    riakc_pb_socket:get_index(Pid,
                              <<"users">>, %% bucket
                              {binary_index, "twitter"}, %% index name
                              <<"jsmith123">>). %% index
cmd, err := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("users").
    WithIndexName("twitter_bin").
    WithIndexKey("jsmith123").
    Build()
if err != nil {
    return err
}

if err := cluster.Execute(cmd); err != nil {
    return err
}
curl localhost:8098/buckets/users/index/twitter_bin/jsmith123

The response:

john_smith
["john_smith"]
['john_smith']
['john_smith']
john_smith
john_smith
{ok,{index_results_v1,[<<"john_smith">>],
                      undefined,undefined}}.
john_smith
{
  "keys": [
    "john_smith"
  ]
}

Examples

To run the following examples, make sure that Riak is configured to use an index-capable storage backend, such as LevelDB or Memory.

Indexing Objects

The following example indexes four different objects. Notice that we’re storing both integer and string (aka binary) fields. Field names are automatically lowercased, some fields have multiple values, and duplicate fields are automatically de-duplicated, as in the following example:

Namespace peopleBucket = new Namespace("indexes", "people");

RiakObject larry = new RiakObject()
        .setValue(BinaryValue.create("My name is Larry"));
larry.getIndexes().getIndex(StringBinIndex.named("field1")).add("val1");
larry.getIndexes().getIndex(LongIntIndex.named("field2")).add(1001L);
StoreValue storeLarry = new StoreValue.Builder(larry)
        .withLocation(peopleBucket.setKey("larry"))
        .build();
client.execute(storeLarry);

RiakObject moe = new RiakObject()
        .setValue(BinaryValue.create("Ny name is Moe"));
moe.getIndexes().getIndex(StringBinIdex.named("Field1")).add("val2");
moe.getIndexes().getIndex(LongIntIndex.named("Field2")).add(1002L);
StoreValue storeMoe = new StoreValue.Builder(moe)
        .withLocation(peopleBucket.setKey("moe"))
        .build();
client.execute(storeMoe);

RiakObject curly = new RiakObject()
        .setValue(BinaryValue.create("My name is Curly"));
curly.getIndexes().getIndex(StringBinIndex.named("FIELD1")).add("val3");
curly.getIndexes().getIndex(LongIntIndex.named("FIELD2")).add(1003L);
StoreValue storeCurly = new StoreValue.Builder(curly)
        .withLocation(peopleBucket.setKey("curly"))
        .build();
client.execute(storeCurly);

RiakObject veronica = new RiakObject()
        .setValue(BinaryValue.create("My name is Veronica"));
veronica.getIndexes().getIndex(StringBinIndex.named("field1"))
        .add("val4").add("val4");
veronica.getIndexes().getIndex(LongIntIndex.named("field2"))
        .add(1004L).add(1005L).add(1006L).add(1004L).add(1004L).add(1007L);
StoreValue storeVeronica = new StoreValue.Builder(veronica)
        .withLocation(peopleBucket.setKey("veronica"))
        .build();
client.execute(storeVeronica);
bucket = client.bucket_type('indexes').bucket('people')

obj1 = Riak::RObject.new(bucket, 'larry')
obj1.content_type = 'text/plain'
obj1.raw_data = 'My name is Larry'
obj1.indexes['field1_bin'] = %w{ val1 }
# Like binary/string indexes, integer indexes must be set as an array,
# even if you wish to add only a single index
obj1.indexes['field2_int'] = [1001]
obj1.store

obj2 = Riak::RObject.new(bucket, 'moe')
obj2.content_type = 'text/plain'
obj2.raw_data = 'My name is Larry'
obj2.indexes['Field1_bin'] = %w{ val2 }
obj2.indexes['Field2_int'] = [1002]
obj2.store

obj3 = Riak::RObject.new(bucket, 'curly')
obj3.content_type = 'text/plain'
obj3.raw_data = 'My name is Curly'
obj3.indexes['FIELD1_BIN'] = %w{ val3 }
obj3.indexes['FIELD2_INT'] = [1003]
obj3.store

obj4 = Riak::RObject.new(bucket, 'veronica')
obj4.content_type = 'text/plain'
obj4.raw_data = 'My name is Veronica'
obj4.indexes['field1_bin'] = %w{ val4 val4 val4a val4b }
obj4.indexes['field2_int'] = [1004, 1004, 1005, 1006]
obj4.indexes['field2_int'] = [1004]
obj4.indexes['field2_int'] = [1004]
obj4.indexes['field2_int'] = [1004]
obj4.indexes['field2_int'] = [1007]
obj4.store
$bucket = new \Basho\Riak\Bucket('people', 'indexes');

$object = (new \Basho\Riak\Object'My name is Larry', ['Content-type' => 'text/plain']))
  ->addValueToIndex('field1_bin', 'val1')
  ->addValueToIndex('field2_int', 1001);

(new \Basho\Riak\Command\Builder\StoreObject($riak))
  ->withObject($object)
  ->withLocation(new \Basho\Riak\Location('larry', $bucket))
  ->build()
  ->execute();

$object = (new \Basho\Riak\Object'My name is Moe', ['Content-type' => 'text/plain']))
  ->addValueToIndex('Field1_bin', 'val2')
  ->addValueToIndex('Field2_int', 1002);

(new \Basho\Riak\Command\Builder\StoreObject($riak))
  ->withObject($object)
  ->withLocation(new \Basho\Riak\Location('moe', $bucket))
  ->build()
  ->execute();

$object = (new \Basho\Riak\Object'My name is Curly', ['Content-type' => 'text/plain']))
  ->addValueToIndex('FIELD1_BIN', 'val3')
  ->addValueToIndex('FIELD2_int', 1003);

(new \Basho\Riak\Command\Builder\StoreObject($riak))
  ->withObject($object)
  ->withLocation(new \Basho\Riak\Location('curly', $bucket))
  ->build()
  ->execute();

$object = (new \Basho\Riak\Object'My name is Veronica', ['Content-type' => 'text/plain']))
  ->addValueToIndex('field1_bin', 'val4')
  ->addValueToIndex('field1_bin', 'val4')
  ->addValueToIndex('field1_bin', 'val4a')
  ->addValueToIndex('field1_bin', 'val4b')
  ->addValueToIndex('field2_int', 1004)
  ->addValueToIndex('field2_int', 1005)
  ->addValueToIndex('field2_int', 1006)
  ->addValueToIndex('field2_int', 1004)
  ->addValueToIndex('field2_int', 1004)
  ->addValueToIndex('field2_int', 1007);

(new \Basho\Riak\Command\Builder\StoreObject($riak))
  ->withObject($object)
  ->withLocation(new \Basho\Riak\Location('veronica', $bucket))
  ->build()
  ->execute();
bucket = client.bucket_type('indexes').bucket('people')

obj1 = RiakObject(client, bucket, 'larry')
obj1.content_type = 'text/plain'
obj1.data = 'My name is Larry'
obj1.add_index('field1_bin', 'val1').add_index('field2_int', 1001)
obj1.store()

obj2 = RiakObject(client, bucket, 'moe')
obj2.content_type = 'text/plain'
obj2data = 'Moe'
obj2.add_index('Field1_bin', 'val2').add_index('Field2_int', 1002)
obj2.store()

obj3 = RiakObject(client, bucket, 'curly')
obj3.content_type = 'text/plain'
obj3.data = 'Curly'
obj3.add_index('FIELD1_BIN', 'val3').add_index('FIELD2_INT', 1003)
obj3.store()

obj4 = RiakObject(client, bucket, 'veronica')
obj4.content_type = 'text/plain'
obj4.data = 'Veronica'
obj4.add_index('field1_bin', 'val4').add_index('field1_bin', 'val4a').add_index('field1_bin', 'val4b').add_index('field2_int', 1004).add_index('field2_int', 1004).add_index('field2_int', 1005).add_index('field2_int', 1006).add_index('field2_int', 1004).add_index('field2_int', 1004).add_index('field2_int', 1004).add_index('field2_int', 1007)
obj4.store()
var larryId = new RiakObjectId("indexes", "people", "larry");
var larry = new RiakObject(larryId, "My name is Larry",
    RiakConstants.ContentTypes.TextPlain);

larry.BinIndex("field1").Set("val1");
larry.IntIndex("field2").Set(1001);

client.Put(larry);

var moeId = new RiakObjectId("indexes", "people", "moe");
var moe = new RiakObject(moeId, "My name is Moe",
    RiakConstants.ContentTypes.TextPlain);

moe.BinIndex("Field1").Set("val2");
moe.IntIndex("Field2").Set(1002);

client.Put(moe);

var curlyId = new RiakObjectId("indexes", "people", "curly");
var curly = new RiakObject(curlyId, "My name is Curly",
    RiakConstants.ContentTypes.TextPlain);

curly.BinIndex("FIELD1").Set("val3");
curly.IntIndex("FIELD2").Set(1003);

client.Put(curly);

var veronicaId = new RiakObjectId("indexes", "people", "veronica");
var veronica = new RiakObject(veronicaId, "My name is Veronica",
    RiakConstants.ContentTypes.TextPlain);

veronica.BinIndex("FIELD1").Set(new string[] { "val4", "val4a", "val4b" });
veronica.IntIndex("FIELD2").Set(new BigInteger[] {
    1004, 1005, 1006, 1004, 1004, 1007
});

client.Put(veronica);
function store_cb(err, rslt, async_cb) {
    if (err) {
        throw new Error(err);
    }
    async_cb(null, rslt);
}

var storeFuncs = [
    function (async_cb) {
        var riakObj = new Riak.Commands.KV.RiakObject();
        riakObj.setContentType('text/plain');
        riakObj.setBucketType('indexes');
        riakObj.setBucket('people');
        riakObj.setKey('larry');
        riakObj.setValue('My name is Larry');
        riakObj.addToIndex('field1_bin', 'val1');
        riakObj.addToIndex('field2_int', 1001);
        client.storeValue({ value: riakObj }, function (err, rslt) {
            store_cb(err, rslt, async_cb);
        });
    },
    function (async_cb) {
        var riakObj = new Riak.Commands.KV.RiakObject();
        riakObj.setContentType('text/plain');
        riakObj.setBucketType('indexes');
        riakObj.setBucket('people');
        riakObj.setKey('moe');
        riakObj.setValue('My name is Moe');
        riakObj.addToIndex('Field1_bin', 'val2');
        riakObj.addToIndex('Field2_int', 1002);
        client.storeValue({ value: riakObj }, function (err, rslt) {
            store_cb(err, rslt, async_cb);
        });
    },
    function (async_cb) {
        var riakObj = new Riak.Commands.KV.RiakObject();
        riakObj.setContentType('text/plain');
        riakObj.setBucketType('indexes');
        riakObj.setBucket('people');
        riakObj.setKey('curly');
        riakObj.setValue('My name is Curly');
        riakObj.addToIndex('FIELD1_BIN', 'val3');
        riakObj.addToIndex('FIELD2_INT', 1003);
        client.storeValue({ value: riakObj }, function (err, rslt) {
            store_cb(err, rslt, async_cb);
        });
    },
    function (async_cb) {
        var riakObj = new Riak.Commands.KV.RiakObject();
        riakObj.setContentType('text/plain');
        riakObj.setBucketType('indexes');
        riakObj.setBucket('people');
        riakObj.setKey('veronica');
        riakObj.setValue('My name is Veronica');
        riakObj.addToIndex('FIELD1_bin', 'val4');
        riakObj.addToIndex('FIELD1_bin', 'val4');
        riakObj.addToIndex('FIELD1_bin', 'val4a');
        riakObj.addToIndex('FIELD1_bin', 'val4b');
        riakObj.addToIndex('FIELD2_int', 1004);
        riakObj.addToIndex('FIELD2_int', 1005);
        riakObj.addToIndex('FIELD2_int', 1006);
        riakObj.addToIndex('FIELD2_int', 1004);
        riakObj.addToIndex('FIELD2_int', 1004);
        riakObj.addToIndex('FIELD2_int', 1007);
        client.storeValue({ value: riakObj }, function (err, rslt) {
            store_cb(err, rslt, async_cb);
        });
    }
];
async.parallel(storeFuncs, function (err, rslts) {
    if (err) {
        throw new Error(err);
    }
});
Larry = riakc_obj:new(
    {<<"indexes">>, <<"people">>},
    <<"larry">>,
    <<"My name is Larry">>,
    <<"text/plain">>),
LarryMetadata = riakc_obj:get_update_metadata(Larry),
LarryIndexes = riakc_obj:set_secondary_index(
    LarryMetadata,
    [{{binary_index, "field1"}, [<<"val1">>]}, {{integer_index, "field2"}, [1001]}]
),
LarryWithIndexes = riakc_obj:update_metadata(Larry, LarryIndexes).

Moe = riakc_obj:new(
    {<<"indexes">>, <<"people">>},
    <<"moe">>,
    <<"My name is Moe">>,
    <<"text/plain">>),
MoeMetadata = riakc_obj:get_update_metadata(Moe),
MoeIndexes = riakc_obj:set_secondary_index(
    MoeMetadata,
    [{{binary_index, "Field1"}, [<<"val2">>]}, {{integer_index, "Field2"}, [1002]}]
),
MoeWithIndexes = riakc_obj:update_metadata(Moe, MoeIndexes).

Curly = riakc_obj:new(
    {<<"indexes">>, <<"people">>},
    <<"curly">>,
    <<"My name is Curly">>,
    <<"text/plain">>),
CurlyMetadata = riakc_obj:get_update_metadata(Curly),
CurlyIndexes = riakc_obj:set_secondary_index(
    CurlyMetadata,
    [{{binary_index, "FIELD1"}, [<<"val3">>]}, {{integer_index, "FIELD2"}, [1003]}]
),
CurlyWithIndexes = riakc_obj:update_metadata(Curly, CurlyIndexes).

Veronica = riakc_obj:new(
    {<<"indexes">>, <<"people">>},
    <<"veronica">>,
    <<"My name is Veronica">>,
    <<"text/plain">>),
VeronicaMetadata = riakc_obj:get_update_metadata(Veronica),
VeronicaIndexes = riakc_obj:set_secondary_index(
    VeronicaMetadata,
    [{{binary_index, "field1"}, [<<"val4">>]}, {{binary_index, "field1"}, [<<"val4">>]}, {{integer_index, "field2"}, [1004]}, {{integer_index, "field2"}, [1004]}, {{integer_index, "field2"}, [1005]}, {{integer_index, "field2"}, [1006]}, {{integer_index, "field2"}, [1004]}, {{integer_index, "field2"}, [1004]}, {{integer_index, "field2"}, [1007]}]
),
VeronicaWithIndexes = riakc_obj:update_metadata(Veronica, VeronicaIndexes).
o1 := &riak.Object{
    Key:   "larry",
    Value: []byte("My name is Larry"),
}
o1.AddToIndex("field1_bin", "val1")
o1.AddToIntIndex("field2_int", 1001)

o2 := &riak.Object{
    Key:   "moe",
    Value: []byte("My name is Moe"),
}
o2.AddToIndex("Field1_bin", "val2")
o2.AddToIntIndex("Field2_int", 1002)

o3 := &riak.Object{
    Key:   "curly",
    Value: []byte("My name is Curly"),
}
o3.AddToIndex("FIELD1_BIN", "val3")
o3.AddToIntIndex("FIELD2_INT", 1003)

o4 := &riak.Object{
    Key:   "veronica",
    Value: []byte("My name is Veronica"),
}
o4.AddToIndex("FIELD1_bin", "val4")
o4.AddToIndex("FIELD1_bin", "val4")
o4.AddToIndex("FIELD1_bin", "val4a")
o4.AddToIndex("FIELD1_bin", "val4b")
o4.AddToIntIndex("FIELD2_int", 1004)
o4.AddToIntIndex("FIELD2_int", 1005)
o4.AddToIntIndex("FIELD2_int", 1006)
o4.AddToIntIndex("FIELD2_int", 1004)
o4.AddToIntIndex("FIELD2_int", 1004)
o4.AddToIntIndex("FIELD2_int", 1007)

objs := [...]*riak.Object{o1, o2, o3, o4}

wg := &sync.WaitGroup{}
for _, obj := range objs {
    obj.ContentType = "text/plain"
    obj.Charset = "utf-8"
    obj.ContentEncoding = "utf-8"

    cmd, err := riak.NewStoreValueCommandBuilder().
        WithBucketType("indexes").
        WithBucket("people").
        WithContent(obj).
        Build()
    if err != nil {
        return err
    }

    args := &riak.Async{
        Command: cmd,
        Wait:    wg,
    }
    if err := cluster.ExecuteAsync(args); err != nil {
        return err
    }
}

wg.Wait()
curl -v -XPUT localhost:8098/types/indexes/buckets/people/keys/larry \
  -H "x-riak-index-field1_bin: val1" \
  -H "x-riak-index-field2_int: 1001" \
  -d 'My name is Larry'

curl -v -XPUT localhost:8098/types/indexes/buckets/people/keys/moe \
  -H "x-riak-index-Field1_bin: val2" \
  -H "x-riak-index-Field2_int: 1002" \
  -d 'My name is Moe'

curl -v -XPUT localhost:8098/types/indexes/buckets/people/keys/curly \
  -H "X-RIAK-INDEX-FIELD1_BIN: val3" \
  -H "X-RIAK-INDEX-FIELD2_INT: 1003" \
  -d 'My name is Curly'

curl -v -XPUT 127.0.0.1:8098/types/indexes/buckets/people/keys/veronica \
  -H "x-riak-index-field1_bin: val4, val4, val4a, val4b" \
  -H "x-riak-index-field2_int: 1004, 1004, 1005, 1006" \
  -H "x-riak-index-field2_int: 1004" \
  -H "x-riak-index-field2_int: 1004" \
  -H "x-riak-index-field2_int: 1004" \
  -H "x-riak-index-field2_int: 1007" \
  -d 'My name is Veronica'

The above objects will end up having the following secondary indexes, respectively:

  • Larry - Binary index field1_bin and integer index field2_int
  • Moe - Binary index field1_bin and integer index field2_int (note that the index names are set to lowercase by Riak)
  • Curly - Binary index field1_bin and integer index field2_int (note again that the index names are set to lowercase)
  • Veronica - Binary index field1_bin with the values val4, val4a, and val4b and integer index field2_int with the values 1004, 1005, 1006, and 1007 (note that redundancies have been removed)

As these examples show, there are safeguards in Riak that both normalize the names of indexes and prevent the accumulation of redundant indexes.

Invalid Field Names and Types

The following examples demonstrate what happens when an index field is specified with an invalid field name or type. The system responds with 400 Bad Request and a description of the error.

Invalid field name:

// The Java client will not allow you to provide invalid index names,
// because you are not required to add "_bin" or "_int" to the end of
// those names
bucket = client.bucket_type('indexes').bucket('people')
obj = Riak::RObject.new(bucket, 'larry')
obj.indexes['field2_foo'] = [1001]

# The Ruby client will let you get away with this...at first. But when
# you attempt to store the object, you will get an error response such
# as this:

NoMethodError: undefined method 'map' for 1001:Fixnum
// throws \InvalidArgumentException
$object = (new \Basho\Riak\Object('{"user_data":{ ... }}', ['Content-type' => 'application/json']))
  ->addValueToIndex('twitter', 'jsmith123');
bucket = client.bucket_type('indexes').bucket('people')
obj = RiakObject(client, bucket, 'larry')
obj.add_index('field2_foo', 1001)

# Result:
riak.RiakError: "Riak 2i fields must end with either '_bin' or '_int'."
// The Riak .NET Client will not allow you to provide invalid index names,
// because you are not required to add "_bin" or "_int" to the end of
// those names
var cmd = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
    .withBucketType('indexes')
    .withBucket('people')
    .withIndexName('field2_foo')
    .withIndexKey('jsmith123')
    .withCallback(query_cb)
    .build();
client.execute(cmd);

// Produces the following stack trace (truncated):
error: query_cb err: 'Error processing incoming message: error:function_clause:[{riak_api_pb_server,
    send_error,
    [{unknown_field_type,
        <<"field2_foo">>},
    {state,
        {gen_tcp,inet},
        #Port<0.68338>,
        undefined,
        ...
        ...
        ...
Obj = riakc_obj:new(
    {<<"indexes">>, <<"people">>},
    <<"larry">>,
    <<"some data">>,
    <<"text/plain">>
),
MD1 = riakc_obj:get_update_metadata(Obj),
MD2 = riakc_obj:set_secondary_index(MD1, [{{foo_index, "field2"}, [1001]}]).

%% The Erlang client will return an error message along these lines:
** exception error: no function clause matching
                    riakc_obj:set_secondary_index( ... ).
cmd, err := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("users").
    WithIndexName("field2_foo").
    WithIndexKey("jsmith123").
    Build()
if err != nil {
    return err
}

if err := cluster.Execute(cmd); err != nil {
    fmt.Println("[DevUsing2i] field name error:", err)
} else {
    return errors.New("[DevUsing2i] expected an error!")
}

// Produces the following stack trace (truncated):
error: query_cb err: 'Error processing incoming message: error:function_clause:[{riak_api_pb_server,
    send_error,
    [{unknown_field_type,
        <<"field2_foo">>},
    {state,
        {gen_tcp,inet},
        #Port<0.68338>,
        undefined,
        ...
        ...
        ...
curl -XPUT 127.0.0.1:8098/types/indexes/buckets/people/keys/larry \
  -H "x-riak-index-field2_foo: 1001" \
  -d 'data1'

# Response
Unknown field type for field: 'field2_foo'.

Incorrect data type:

Location key = new Location(new Namespace("people"), "larry");
RiakObject obj = new RiakObject();
obj.getIndexes().getIndex(LongIntIndex.named("field2")).add("bar");

// The Java client will return a response indicating a type mismatch.
// The output may look something like this:

Error:(46, 68) java: no suitable method found for add(java.lang.String)
    method com.basho.riak.client.query.indexes.RiakIndex.add(java.lang.Long) is not applicable
      (argument mismatch; java.lang.String cannot be converted to java.lang.Long)
    method com.basho.riak.client.query.indexes.RiakIndex.add(java.util.Collection<java.lang.Long>) is not applicable
      (argument mismatch; java.lang.String cannot be converted to java.util.Collection<java.lang.Long>)
bucket = client.bucket_type('indexes').bucket('people')
obj = Riak::RObject.new(bucket, 'larry')
obj.indexes['field2_int'] = %w{ bar }

# The Ruby client will let you get away with this...at first. But when
# you attempt to store the object, you will get an error response such
# as this:

NoMethodError: undefined method 'map' for 1001:Fixnum
// throws \InvalidArgumentException
$object = (new \Basho\Riak\Object('{"user_data":{ ... }}', ['Content-type' => 'application/json']))
  ->addValueToIndex('twitter_int', 'not_an_int');

// throws \InvalidArgumentException
$object = (new \Basho\Riak\Object('{"user_data":{ ... }}', ['Content-type' => 'application/json']))
  ->addValueToIndex('twitter_int', ['arrays', 'are', 'not', 'strings']);

// does not throw an exception, it will just write ints as a string
// only requirement is that value is scalar (int, float, string, bool)
$object = (new \Basho\Riak\Object('{"user_data":{ ... }}', ['Content-type' => 'application/json']))
  ->addValueToIndex('twitter_bin', 12);
bucket = client.bucket_type('indexes').bucket('people')
obj = RiakObject(client, bucket, 'larry')
obj.add_index('field2_int', 'bar')

# The Python client will let you get away with this...at first. But when you
# attempt to store the object, you will get an error response such as this:
riak.RiakError: '{precommit_fail,[{field_parsing_failed,{<<"field2_int">>,<<"bar">>}}]}'
var id = new RiakObjectId("indexes", "people", "larry");
var obj = new RiakObject(id, "test value", "text/plain");
var intIdx = obj.IntIndex("test-int-idx");
intIdx.Add("invalid-value");

// The .NET client will throw a FormatException at this point
// The output may look something like this:

The value could not be parsed.
var riakObj = new Riak.Commands.KV.RiakObject();
riakObj.setContentType('text/plain');
riakObj.setBucketType('indexes');
riakObj.setBucket('people');
riakObj.setKey('larry');
riakObj.addToIndex('field2_int', 'bar');
try {
    client.storeValue({ value: riakObj }, function (err, rslt) {
        logger.error("incorrect_data_type err: '%s'", err);
    });
} catch (e) {
    logger.error("incorrect_data_type err: '%s'", e);
}

// Output:
buffer.js:67
    throw new TypeError('must start with number, buffer, array or string');
          ^
TypeError: must start with number, buffer, array or string
    at new Buffer (buffer.js:67:11)
Obj = riakc_obj:new(
    {<<"indexes">>, <<"people">>},
    <<"larry">>,
    <<"some data">>,
    <<"text/plain">>
),
MD1 = riakc_obj:get_update_metadata(Obj),
MD2 = riakc_obj:set_secondary_index(MD1, [{{integer_index, "field2"}, [<<"bar">>]}]).

%% The Erlang client will return an error message along these lines:
** exception error: bad argument
     in function  integer_to_list/1
        called as integer_to_list(<<"bar">>) ...
obj := &riak.Object{
    BucketType:      "indexes",
    Bucket:          "people",
    Key:             "larry",
    ContentType:     "text/plain",
    Charset:         "utf-8",
    ContentEncoding: "utf-8",
    Value:           []byte("My name is Larry"),
}
obj.AddToIndex("field2_int", "bar")

cmd, err := riak.NewStoreValueCommandBuilder().
    WithContent(obj).
    Build()
if err != nil {
    return err
}

if err := cluster.Execute(cmd); err != nil {
    fmt.Println("[DevUsing2i] index data type error:", err)
} else {
    return errors.New("[DevUsing2i] expected an error!")
}

// The riak.Error object will contain:
{precommit_fail,[{field_parsing_failed,{<<"field2_int">>,<<"bar">>}}]}
curl -XPUT 127.0.0.1:8098/types/indexes/buckets/people/keys/larry \
  -H "x-riak-index-field2_int: bar" \
  -d 'data1'

# Response
HTTP/1.1 400 Bad Request

Could not parse field 'field2_int', value 'bar'.

Querying

Note on 2i queries and the R parameter

For all 2i queries, the R parameter is set to 1, which means that queries that are run while handoffs and related operations are underway may not return all keys as expected.

To avoid issues such as the above, a new option has been added to the riak.conf file to allow you to disable or enable node participation in 2i queries. participate_in_coverage=disabled will prevent the node in question from participating. Recommended usage of this feature is to prevent newly added nodes to the cluster that have yet to receive all of their data from participating in 2i queries and generating non-consistent results. Changing the participate_in_coverage setting requires Riak to be restarted on that node for the change to take effect. The default setting is enabled.

Exact Match

The following examples perform an exact match index query.

Query a binary index:

Namespace myBucket = new Namespace("indexes", "people");
BinIndexQuery biq = new BinIndexQuery.Builder(myBucket, "field1", "val1").build();
BinIndexQuery.Response response = client.execute(biq);
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('field1_bin', 'val1')
(new \Basho\Riak\Command\Builder\QueryIndex($riak))
  ->buildBucket('people', 'indexes')
  ->withIndexName('field1_bin')
  ->withScalarValue('val1')
  ->build()
  ->execute()
  ->getResults();
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('field1_bin', 'val1')
var riakIndexId = new RiakIndexId("indexes", "people", "field1");
// Note: using a string argument indicates a binary index query:
var indexRiakResult = client.GetSecondaryIndex(riakIndexId, "val1");
var indexResult = indexRiakResult.Value;
var binIdxCmd = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
    .withBucketType('indexes')
    .withBucket('people')
    .withIndexName('field1_bin')
    .withIndexKey('val1')
    .withCallback(query_cb)
    .build();
client.execute(binIdxCmd);
{ok, Results} = riakc_pb_socket:get_index(
    Pid,
    {<<"indexes">>, <<"people">>}, %% bucket type and bucket name
    {binary_index, "field2"},
    <<"val1">>
).
c1, err := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("people").
    WithIndexName("field1_bin").
    WithIndexKey("val1").
    Build()
if err != nil {
    return err
}
curl localhost:8098/types/indexes/buckets/people/index/field1_bin/val1

Query an integer index:

Namespace myBucket = new Namespace("indexes", "people");
IntIndexQuery iiq = new IntIndexQuery.Builder(myBucket, "field2", 1001L)
        .build();
IntIndexQuery.Response response = client.execute(iiq);
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('field2_int', 1001)
(new \Basho\Riak\Command\Builder\QueryIndex($riak))
  ->buildBucket('people', 'indexes')
  ->withIndexName('field2_int')
  ->withScalarValue(1001)
  ->build()
  ->execute()
  ->getResults();
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('field2_int', 1001)
var riakIndexId = new RiakIndexId("indexes", "people", "field2");
// Note: using an integer argument indicates an int index query:
var indexRiakResult = client.GetSecondaryIndex(riakIndexId, 1001);
var indexResult = indexRiakResult.Value;
var intIdxCmd = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
    .withBucketType('indexes')
    .withBucket('people')
    .withIndexName('field2_int')
    .withIndexKey(1001)
    .withCallback(query_cb)
    .build();
client.execute(intIdxCmd);
{ok, Results} = riakc_pb_socket:get_index(
    Pid,
    {<<"indexes">>, <<"people">>}, %% bucket type and bucket name
    {integer_index, "field2"},
    1001
).
cmd, err := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("people").
    WithIndexName("field2_int").
    WithIntIndexKey(1001).
    Build()
if err != nil {
    return err
}
curl localhost:8098/types/indexes/buckets/people/index/field2_int/1001

The following example performs an exact match query and pipes the results into a MapReduce job:

curl -XPOST localhost:8098/mapred \
  -H "Content-Type: application/json" \
  -d @-<<EOF
{
  "inputs": {
    "bucket": "people",
    "index": "field2_bin",
    "key":"val3"
  },
  "query": [
    {
      "reduce": {
        "language":"erlang",
        "module": "riak_kv_mapreduce",
        "function": "reduce_identity",
        "keep": true
      }
    }
  ]
}
EOF

Range

The following examples perform a range query.

Query a binary index…

Namespace myBucket = new Namespace("indexes", "people");
BinIndexQuery biq = new BinIndexQuery.Builder(myBucket, "field1", "val2", "val4")
        .build();
BinIndexQuery.Response response = client.execute(biq);
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('field1_bin', 'val2'..'val4')
(new \Basho\Riak\Command\Builder\QueryIndex($riak))
  ->buildBucket('people', 'indexes')
  ->withIndexName('field1_bin')
  ->withRangeValue('val2', 'val4')
  ->build()
  ->execute()
  ->getResults();
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('field1_bin', 'val2', 'val4')
var riakIndexId = new RiakIndexId("indexes", "people", "field1");
var indexRiakResult = client.GetSecondaryIndex(riakIndexId, "val2", "val4");
var indexResult = indexRiakResult.Value;
var binIdxCmd = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
    .withBucketType('indexes')
    .withBucket('people')
    .withIndexName('field1_bin')
    .withRange('val2', 'val4')
    .withCallback(query_cb)
    .build();
client.execute(binIdxCmd);
{ok, Results} = riakc_pb_socket:get_index_range(
    Pid,
    {<<"indexes">>, <<"people">>}, %% bucket type and bucket name
    {binary_index, "field1"}, %% index name
    <<"val2">>, <<"val4">> %% range query for keys between "val2" and "val4"
).
c1, err := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("people").
    WithIndexName("field1_bin").
    WithRange("val2", "val4").
    Build()
if err != nil {
    return err
}
curl localhost:8098/types/indexes/buckets/people/index/field1_bin/val2/val4

Or query an integer index…

Namespace myBucket = new Namespace("indexes", "people");
IntIndexQuery iiq = new IntIndexQuery.Builder(myBucket, "field2", 1002L, 1004L)
        .build();
IntIndexQuery.Response response = client.execute(iiq);
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('field2_int', 1002..1004)
(new \Basho\Riak\Command\Builder\QueryIndex($riak))
  ->buildBucket('people', 'indexes')
  ->withIndexName('field2_int')
  ->withRangeValue(1002, 1004)
  ->build()
  ->execute()
  ->getResults();
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('field2_int', 1002, 1004)
var riakIndexId = new RiakIndexId("indexes", "people", "field2");
var indexRiakResult = client.GetSecondaryIndex(riakIndexId, 1002, 1004);
var indexResult = indexRiakResult.Value;
var intIdxCmd = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
    .withBucketType('indexes')
    .withBucket('people')
    .withIndexName('field2_int')
    .withRange(1002, 1004)
    .withCallback(query_cb)
    .build();
client.execute(intIdxCmd);
{ok, Results} = riakc_pb_socket:get_index_range(
    Pid,
    {<<"indexes">>, <<"people">>}, %% bucket type and bucket name
    {integer_index, "field2"}, %% index name
    1002, 1004 %% range query for keys between "val2" and "val4"
).
cmd, err := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("people").
    WithIndexName("field2_int").
    WithIntRange(1002, 1004).
    Build()
curl localhost:8098/types/indexes/buckets/people/index/field2_int/1002/1004

The following example performs a range query and pipes the results into a MapReduce job:

curl -XPOST localhost:8098/mapred\
  -H "Content-Type: application/json" \
  -d @-<<EOF
{
  "inputs": {
    "bucket": "people",
    "index": "field2_bin",
    "start": "1002",
    "end": "1004"
  },
  "query": [
    {
      "reduce": {
        "language": "erlang",
        "module": "riak_kv_mapreduce",
        "function": "reduce_identity",
        "keep": true
      }
    }
  ]
}
EOF

Range with terms

When performing a range query, it is possible to retrieve the matched index values alongside the Riak keys using return_terms=true. An example from a small sampling of Twitter data with indexed hash tags:

Namespace tweetsBucket = new Namespace("indexes", "tweets");
BinIndexQuery biq = new BinIndexQuery.Builder(tweetsBucket, "hashtags", "rock", "rocl")
        .withKeyAndIndex(true)
        .build();
BinIndexQuery.Response response = client.execute(biq);
bucket = client.bucket_type('indexes').bucket('tweets')
bucket.get_index('hashtags_bin', 'rock'..'rocl', return_terms: true)
(new \Basho\Riak\Command\Builder\QueryIndex($riak))
  ->buildBucket('tweets', 'indexes')
  ->withIndexName('hashtags')
  ->withRangeValue('rock', 'rocl')
  ->withReturnTerms()
  ->build()
  ->execute()
  ->getResults();
bucket = client.bucket_type('indexes').bucket('tweets')
bucket.get_index('hashtags_bin', 'rock', 'rocl', return_terms=True)
var riakIndexId = new RiakIndexId("indexes", "tweets", "hashtags");
var options = new RiakIndexGetOptions();
options.SetReturnTerms(true);
var indexRiakResult = client.GetSecondaryIndex(riakIndexId, "rock", "rocl", options);
var indexResult = indexRiakResult.Value;
var binIdxCmd = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
    .withBucketType('indexes')
    .withBucket('tweets')
    .withIndexName('hashtags_bin')
    .withRange('rock', 'rocl')
    .withReturnKeyAndIndex(true)
    .withCallback(query_cb)
    .build();
client.execute(binIdxCmd);
{ok, Results} = riakc_pb_socket:get_index_range(
    Pid,
    {<<"indexes">>, <<"people">>}, %% bucket type and bucket name
    {binary_index, "hashtags"},     %% index name
    <<"rock">>, <<"rocl">>          %% range query for keys between "val2" and "val4"
).
cmd, err := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("tweets").
    WithIndexName("hashtags_bin").
    WithRange("rock", "rocl").
    Build()
if err != nil {
    return err
}

if err := cluster.Execute(cmd); err != nil {
    return err
}
curl localhost:8098/types/indexes/buckets/tweets/index/hashtags_bin/rock/rocl?return_terms=true

Response:

{
  "results": [
    {
      "rock": "349224101224787968"
    },
    {
      "rocks": "349223639880699905"
    }
  ]
}

Pagination

When asking for large result sets, it is often desirable to ask the servers to return chunks of results instead of a firehose. You can do so using max_results=<n>, where n is the number of results you’d like to receive.

Assuming more keys are available, a continuation value will be included in the results to allow the client to request the next page.

Here is an example of a range query with both return_terms and pagination against the same Twitter data set.

Namespace tweetsBucket = new Namespace("indexes", "tweets");
BinIndexQuery biq = new BinIndexQuery.Builder(tweetsBucket, "hashtags", "ri", "ru")
        .withMaxResults(5)
        .build();
BinIndexQuery.Response response = client.execute(biq);
bucket = client.bucket_type('indexes').bucket('tweets')
bucket.get_index('hashtags_bin', 'ri'..'ru', max_results: 5)
(new \Basho\Riak\Command\Builder\QueryIndex($riak))
  ->buildBucket('tweets', 'indexes')
  ->withIndexName('hashtags')
  ->withRangeValue('ri', 'ru')
  ->withMaxResults(5)
  ->build()
  ->execute()
  ->getResults();
bucket = client.bucket_type('indexes').bucket('tweets')
bucket.get_index('hashtags_bin', 'ri', 'ru', max_results=5)
var idxId = new RiakIndexId("indexes", "tweets", "hashtags");
var options = new RiakIndexGetOptions();
options.SetMaxResults(5);
var rslt = client.GetSecondaryIndex(idxId, "ri", "ru", options);

options.SetContinuation(rslt.Continuation);
rslt = client.GetSecondaryIndex(idxId, "ri", "ru", options);
function do_query(continuation) {
    var binIdxCmdBuilder = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
        .withBucketType('indexes')
        .withBucket('tweets')
        .withIndexName('hashtags_bin')
        .withRange('ri', 'ru')
        .withMaxResults(5)
        .withCallback(pagination_cb);

    if (continuation) {
        binIdxCmdBuilder.withContinuation(continuation);
    }

    client.execute(binIdxCmdBuilder.build());
}

var query_keys = [];
function pagination_cb(err, rslt) {
    if (err) {
        logger.error("query_cb err: '%s'", err);
        return;
    }

    if (rslt.done) {
        query_keys.forEach(function (key) {
            logger.info("2i query key: '%s'", key);
        });
        query_keys = [];

        if (rslt.continuation) {
            do_query(rslt.continuation);
        }
    }

    if (rslt.values.length > 0) {
        Array.prototype.push.apply(query_keys,
            rslt.values.map(function (value) {
                return value.objectKey;
            }));
    }
}

do_query();
{ok, Results} = riakc_pb_socket:get_index_range(
    Pid,
    {<<"indexes">>, <<"tweets">>}, %% bucket type and bucket name
    {binary_index, "hashtags"}, %% index name
    <<"ri">>, <<"ru">>, %% range query from "ri" to "ru"
    {max_results, 5}
).
func doPaginatedQuery(cluster *riak.Cluster, continuation []byte) error {
  builder := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("tweets").
    WithIndexName("hashtags_bin").
    WithRange("ri", "ru").
    WithMaxResults(5)

  if continuation != nil && len(continuation) > 0 {
    builder.WithContinuation(continuation)
  }

  cmd, err := builder.Build()
  if err != nil {
    return err
  }

  if err := cluster.Execute(cmd); err != nil {
    return err
  }

  printIndexQueryResults(cmd)

  sciq := cmd.(*riak.SecondaryIndexQueryCommand)
  if sciq.Response == nil {
    return errors.New("[DevUsing2i] expected response but did not get one")
  }

  rc := sciq.Response.Continuation
  if rc != nil && len(rc) > 0 {
    return doPaginatedQuery(cluster, sciq.Response.Continuation)
  }

  return nil
}

func queryingPagination(cluster *riak.Cluster) error {
  return doPaginatedQuery(cluster, nil)
}
curl localhost:8098/types/indexes/buckets/tweets/index/hashtags_bin/ri/ru?max_results=5&return_terms=true

Here is an example JSON response (your client-specific response may differ):

{
  "continuation": "g2gCbQAAAAdyaXBqYWtlbQAAABIzNDkyMjA2ODcwNTcxMjk0NzM=",
  "results": [
    { "rice": "349222574510710785" },
    { "rickross": "349222868095217664" },
    { "ridelife": "349221819552763905" },
    { "ripjake": "349220649341952001" },
    { "ripjake": "349220687057129473" }
  ]
}

Take the continuation value from the previous result set and feed it back into the query.

Namespace tweetsBucket = new Namespace("indexes", "tweets");
BinIndexQuery biq = new BinIndexQuery.Builder(tweetsBucket, "hashtags", "ri", "ru")
        .withContinuation(BinaryValue.create("g2gCbQAAAAdyaXBqYWtlbQAAABIzNDkyMjA2ODcwNTcxMjk0NzM"))
        .withMaxResults(5)
        .withKeyAndIndex(true)
        .build();
BinIndexQuery.Response response = client.execute(biq);
bucket = client.bucket_type('indexes').bucket('tweets')
bucket.get_index(
  'hashtags_bin',
  'ri'..'ru',
  continuation: 'g2gCbQAAAAdyaXBqYWtlbQAAABIzNDkyMjA2ODcwNTcxMjk0NzM',
  max_results: 5,
  return_terms: true
)
(new \Basho\Riak\Command\Builder\QueryIndex($riak))
  ->buildBucket('tweets', 'indexes')
  ->withIndexName('hashtags')
  ->withRangeValue('ri', 'ru')
  ->withMaxResults(5)
  ->withContinuation('g2gCbQAAAAdyaXBqYWtlbQAAABIzNDkyMjA2ODcwNTcxMjk0NzM')
  ->build()
  ->execute()
  ->getResults();
bucket = client.bucket_type('indexes').bucket('tweets')
bucket.get_index(
    'hashtags_bin',
    'ri', 'ru',
    continuation='g2gCbQAAAAdyaXBqYWtlbQAAABIzNDkyMjA2ODcwNTcxMjk0NzM',
    max_results=5,
    return_terms=True
)
// rslt is the previous 2i fetch result
var idxId = new RiakIndexId("indexes", "tweets", "hashtags");
var options = new RiakIndexGetOptions();
options.SetMaxResults(5);
options.SetContinuation(rslt.Continuation);
rslt = client.GetSecondaryIndex(idxId, "ri", "ru", options);
// See above example
{ok, Results} = riakc_pb_socket:get_index_range(
    Pid,
    {<<"indexes">>, <<"tweets">>}, %% bucket type and bucket name
    {binary_index, "hashtags"}, %% index name
    <<"ri">>, <<"ru">>, %% range query from "ri" to "ru"
    [
        {continuation, <<"g2gCbQAAAAdyaXBqYWtlbQAAABIzNDkyMjA2ODcwNTcxMjk0NzM">>},
        {max_results, 5},
        {return_terms, true}
    ]
).
// See above example
curl localhost:8098/types/indexes/buckets/tweets/index/hashtags_bin/ri/ru?continuation=g2gCbQAAAAdyaXBqYWtlbQAAABIzNDkyMjA2ODcwNTcxMjk0NzM=&max_results=5&return_terms=true

The result:

{
  "continuation": "g2gCbQAAAAlyb2Jhc2VyaWFtAAAAEjM0OTIyMzcwMjc2NTkxMjA2NQ==",
  "results": [
    {
      "ripjake": "349221198774808579"
    },
    {
      "ripped": "349224017347100672"
    },
    {
      "roadtrip": "349221207155032066"
    },
    {
      "roastietime": "349221370724491265"
    },
    {
      "robaseria": "349223702765912065"
    }
  ]
}

Streaming

It is also possible to stream results:

// Available in Riak Java Client 2.1.0 and later
int pollTimeoutMS = 200;
Namespace ns = new Namespace("indexes", "tweets");
String indexName = "hashtags";

BinIndexQuery indexQuery =
    new BinIndexQuery.Builder(ns, indexName, "ri", "ru").build();

final RiakFuture<BinIndexQuery.StreamingResponse, BinIndexQuery> streamingFuture =
    client.executeAsyncStreaming(indexQuery, pollTimeoutMS);

// For streaming commands, the future's value will be available before
// the future is complete, so you may begin to pull results from the
// provided iterator as soon as possible.
final BinIndexQuery.StreamingResponse streamingResponse = streamingFuture.get();

for (BinIndexQuery.Response.Entry e : streamingResponse)
{
    // Do something with key...
}

streamingFuture.await();
Assert.assertTrue(streamingFuture.isDone());
bucket = client.bucket_type('indexes').bucket('people')
bucket.get_index('myindex_bin', 'foo', stream: true)
/*
  It is not currently possible to stream results using the PHP client
*/
bucket = client.bucket_type('indexes').bucket('people')
keys = []
for key in bucket.stream_index('myindex_bin', 'foo'):
    keys.append(key)
var riakIndexId = new RiakIndexId("indexes", "tweets", "hashtags");
var indexRiakResult = client.StreamGetSecondaryIndex(riakIndexId, "ri", "ru");
var indexResult = indexRiakResult.Value;
foreach (var key in indexResult.IndexKeyTerms)
{
    // Do something with key...
}
var binIdxCmd = new Riak.Commands.KV.SecondaryIndexQuery.Builder()
    .withBucketType('indexes')
    .withBucket('tweets')
    .withIndexName('hashtags_bin')
    .withRange('ri', 'ru')
    .withStreaming(true);
    .withCallback(query_cb) // See query_cb in other examples
    .build();
client.execute(binIdxCmd);
{ok, KeyStream} = riakc_pb_socket:get_index_eq(
    Pid,
    {<<"indexes">>, <<"people">>}, %% bucket type and bucket name
    {binary_index, "myindex"}, %% index name and type
    <<"foo">>, %% value of the index
    [{stream, true}] %% enable streaming
).
cmd, err := riak.NewSecondaryIndexQueryCommandBuilder().
    WithBucketType("indexes").
    WithBucket("tweets").
    WithIndexName("hashtags_bin").
    WithRange("ri", "ru").
    WithStreaming(true).
    WithCallback(streamingCallback).
    Build()
if err != nil {
    return err
}

if err := cluster.Execute(cmd); err != nil {
    return err
}
curl localhost:8098/types/indexes/buckets/people/index/myindex_bin/foo?stream=true

Streaming can also be combined with pagination and return_terms.

Sorting

As of Riak 1.4, the result set is sorted on index values (when executing range queries) and object keys. See the pagination example above: hash tags (2i keys) are returned in ascending order, and the object keys (Twitter IDs) for the messages which contain the ripjake hash tag are also returned in ascending order.

Retrieve all Bucket Keys via the $bucket Index

The following example retrieves the keys for all objects stored in the bucket people using an exact match on the special $bucket index.

curl localhost:8098/types/indexes/buckets/people/index/\$bucket/_

Count Bucket Objects via $bucket Index

The following example performs a secondary index lookup on the $bucket index like in the previous example and pipes this into a MapReduce that counts the number of records in the people bucket. In order to improve efficiency, the batch size has been increased from the default size of 20.

curl -XPOST localhost:8098/mapred\
  -H "Content-Type: application/json" \
  -d @-<<EOF
{
  "inputs": {
    "bucket": "people",
    "index": "\$bucket",
    "key":"people"
  },
  "query": [
    {
      "reduce": {
        "language": "erlang",
        "module": "riak_kv_mapreduce",
        "function": "reduce_count_inputs",
        "arg": {
          "reduce_phase_batch_size":1000
        }
      }
    }
  ]
}
EOF