Replication
Data replication is a core feature of Riak’s basic architecture. Riak was designed to operate as a clustered system containing multiple Riak nodes, which allows data to live on multiple machines at once in case a node in the cluster goes down.
Replication is fundamental and automatic in Riak, providing security
that your data will still be there if a node in your Riak cluster goes
down. All data stored in Riak will be replicated to a number of nodes in
the cluster according to the N value (n_val
) property set in a
bucket’s bucket type.
Note: Replication across clusters
If you’re interested in replication not just within a cluster but across multiple clusters, we recommend checking out our documentation on Riak’s Multi-Datacenter Replications capabilities.
Selecting an N value (n_val
)
By default, Riak chooses an n_val
of 3 default. This means that data
stored in any bucket will be replicated to 3 different nodes. For this
to be effective, you need at least 3 nodes in your cluster.
The ideal value for N depends largely on your application and the shape of your data. If your data is highly transient and can be reconstructed easily by the application, choosing a lower N value will provide greater performance. However, if you need high assurance that data is available even after node failure, increasing the N value will help protect against loss. How many nodes do you expect will fail at any one time? Choose an N value larger than that and your data will still be accessible when they go down.
The N value also affects the behavior of read (GET) and write (PUT) requests. The tunable parameters you can submit with requests are bound by the N value. For example, if N=3, the maximum read quorum (known as “R”) you can request is also 3. If some nodes containing the data you are requesting are down, an R value larger than the number of available nodes with the data will cause the read to fail.
Setting the N value (n_val
)
To change the N value for a bucket, you need to create a bucket
type with n_val
set to your desired value and
then make sure that the bucket bears that type.
In this example, we’ll set N to 2. First, we’ll create the bucket type
and call it n_val_of_2
and then activate that type:
riak-admin bucket-type create n_val_of_2 '{"props":{"n_val":2}}'
riak-admin bucket-type activate n_val_of_2
Now, any bucket that bears the type n_val_of_2
will propagate objects
to 2 nodes.
Note on changing the value of N
Changing the N value after a bucket has data in it is not recommended. If you do change the value, especially if you increase it, you might need to force read repair (more on that below). Overwritten objects and newly stored objects will automatically be replicated to the correct number of nodes.
Changing the N value (n_val
)
While raising the value of N for a bucket or object shouldn’t cause problems, it’s important that you never lower N. If you do so, you can wind up with dead, i.e. unreachable data. This can happen because objects’ preflists, i.e. lists of vnodes responsible for the object, can end up
Unreachable data is a problem because it can negatively impact coverage
queries, e.g. secondary index and
MapReduce queries. Lowering an object or bucket’s
n_val
will likely mean that objects that you would expect to
be returned from those queries will no longer be returned.
Active Anti-Entropy
Riak’s active anti-entropy (AAE) subsystem is a continuous background process that compares and repairs any divergent or missing object replicas. For more information on AAE, see the following documents:
Read Repair
Read repair occurs when a successful read occurs—i.e. when the target number of nodes have responded, as determined by R—but not all replicas of the object agree on the value. There are two possibilities here for the errant nodes:
- The node responded with a
not found
for the object, meaning that it doesn’t have a copy. - The node responded with a vector clock that is an ancestor of the vector clock of the successful read.
When this situation occurs, Riak will force the errant nodes to update the object’s value based on the value of the successful read.
Forcing Read Repair
When you increase the n_val
of a bucket, you may start to see failed
read operations, especially if the R value you use is larger than the
number of replicas that originally stored the object. Forcing read
repair will solve this issue. Or if you have active
anti-entropy enabled, your values will
eventually replicate as a background task.
For each object that fails read (or the whole bucket, if you like), read
the object using an R value less than or equal to the original number of
replicas. For example, if your original n_val
was 3 and you increased
it to 5, perform your read operations with R=3 or less. This will cause
the nodes that do not have the object(s) yet to respond with not
found
, invoking read repair.
So what does N=3 really mean?
N=3 simply means that three copies of each piece of data will be stored in the cluster. That is, three different partitions/vnodes will receive copies of the data. There are no guarantees that the three replicas will go to three separate physical nodes; however, the built-in functions for determining where replicas go attempts to distribute the data evenly.
As nodes are added and removed from the cluster, the ownership of partitions changes and may result in an uneven distribution of the data. On some rare occasions, Riak will also aggressively reshuffle ownership of the partitions to achieve a more even balance.
For cases where the number of nodes is less than the N value, data will likely be duplicated on some nodes. For example, with N=3 and 2 nodes in the cluster, one node will likely have one replica, and the other node will have two replicas.
Understanding replication by example
To better understand how data is replicated in Riak let’s take a look at
a put request for the bucket/key pair my_bucket
/my_key
. Specifically
we’ll focus on two parts of the request: routing an object to a set of
partitions and storing an object on a partition.
Routing an object to a set of partitions
- Assume we have 3 nodes
- Assume we store 3 replicas per object (N=3)
- Assume we have 8 partitions in our ring /(ring_creation_size=8)
Note: It is not recommended that you use such a small ring size. This is for demonstration purposes only.
With only 8 partitions our ring will look approximately as follows
(response from riak_core_ring_manager:get_my_ring/0
truncated for
clarity):
(dev1@127.0.0.1)3> {ok,Ring} = riak_core_ring_manager:get_my_ring().
[{0,'dev1@127.0.0.1'},
{182687704666362864775460604089535377456991567872, 'dev2@127.0.0.1'},
{365375409332725729550921208179070754913983135744, 'dev3@127.0.0.1'},
{548063113999088594326381812268606132370974703616, 'dev1@127.0.0.1'},
{730750818665451459101842416358141509827966271488, 'dev2@127.0.0.1'},
{913438523331814323877303020447676887284957839360, 'dev3@127.0.0.1'},
{1096126227998177188652763624537212264741949407232, 'dev1@127.0.0.1'},
{1278813932664540053428224228626747642198940975104, 'dev2@127.0.0.1'}]
The node handling this request hashes the bucket/key combination:
(dev1@127.0.0.1)4> DocIdx = riak_core_util:chash_key({<<"my_bucket">>, <<"my_key">>}).
<<183,28,67,173,80,128,26,94,190,198,65,15,27,243,135,127,121,101,255,96>>
The DocIdx hash is a 160-bit integer:
(dev1@127.0.0.1)5> <<I:160/integer>> = DocIdx.
<<183,28,67,173,80,128,26,94,190,198,65,15,27,243,135,127,121,101,255,96>>
(dev1@127.0.0.1)6> I.
1045375627425331784151332358177649483819648417632
The node looks up the hashed key in the ring, which returns a list of preferred partitions for the given key.
(node1@127.0.0.1)> Preflist = riak_core_ring:preflist(DocIdx, Ring).
[{1096126227998177188652763624537212264741949407232, 'dev1@127.0.0.1'},
{1278813932664540053428224228626747642198940975104, 'dev2@127.0.0.1'},
{0, 'dev1@127.0.0.1'},
{182687704666362864775460604089535377456991567872, 'dev2@127.0.0.1'},
{365375409332725729550921208179070754913983135744, 'dev3@127.0.0.1'},
{548063113999088594326381812268606132370974703616, 'dev1@127.0.0.1'},
{730750818665451459101842416358141509827966271488, 'dev2@127.0.0.1'},
{913438523331814323877303020447676887284957839360, 'dev3@127.0.0.1'}]
The node chooses the first N partitions from the list. The remaining partitions of the “preferred” list are retained as fallbacks to use if any of the target partitions are unavailable.
(dev1@127.0.0.1)9> {Targets, Fallbacks} = lists:split(N, Preflist).
{[{1096126227998177188652763624537212264741949407232, 'dev1@127.0.0.1'},
{1278813932664540053428224228626747642198940975104, 'dev2@127.0.0.1'},
{0,'dev1@127.0.0.1'}],
[{182687704666362864775460604089535377456991567872, 'dev2@127.0.0.1'},
{365375409332725729550921208179070754913983135744, 'dev3@127.0.0.1'},
{548063113999088594326381812268606132370974703616, 'dev1@127.0.0.1'},
{730750818665451459101842416358141509827966271488, 'dev2@127.0.0.1'},
{913438523331814323877303020447676887284957839360, 'dev3@127.0.0.1'}]}
The partition information returned from the ring contains a partition identifier and the parent node of that partition:
{1096126227998177188652763624537212264741949407232, 'dev1@127.0.0.1'}
The requesting node sends a message to each parent node with the object and partition identifier (pseudocode for clarity):
'dev1@127.0.0.1' ! {put, Object, 1096126227998177188652763624537212264741949407232}
'dev2@127.0.0.1' ! {put, Object, 1278813932664540053428224228626747642198940975104}
'dev1@127.0.0.1' ! {put, Object, 0}
If any of the target partitions fail, the node sends the object to one
of the fallbacks. When the message is sent to the fallback node, the
message references the object and original partition identifier. For
example, if dev2@127.0.0.1
were unavailable, the requesting node would
then try each of the fallbacks. The fallbacks in this example are:
{182687704666362864775460604089535377456991567872, 'dev2@127.0.0.1'}
{365375409332725729550921208179070754913983135744, 'dev3@127.0.0.1'}
{548063113999088594326381812268606132370974703616, 'dev1@127.0.0.1'}
The next available fallback node would be dev3@127.0.0.1
. The
requesting node would send a message to the fallback node with the
object and original partition identifier:
'dev3@127.0.0.1' ! {put, Object, 1278813932664540053428224228626747642198940975104}
Note that the partition identifier in the message is the same that was
originally sent to dev2@127.0.0.1
only this time it is being sent to
dev3@127.0.0.1
. Even though dev3@127.0.0.1
is not the parent node of
that partition, it is smart enough to hold on to the object until
dev2@127.0.0.1
returns to the cluster.
Processing partition requests
Processing requests per partition is fairly simple. Each node runs a
single process (riak_kv_vnode_master
) that distributes requests to
individual partition processes (riak_kv_vnode
). The
riak_kv_vnode_master
process maintains a list of partition identifiers
and corresponding partition processes. If a process does not exist for a
given partition identifier a new process is spawned to manage that
partition.
The riak_kv_vnode_master
process treats all requests the same and
spawns partition processes as needed even when nodes receive requests
for partitions they do not own. When a partition’s parent node is
unavailable, requests are sent to fallback nodes (handoff). The
riak_kv_vnode_master
process on the fallback node spawns a process to
manage the partition even though the partition does not belong to the
fallback node.
The individual partition processes perform hometests throughout the life
of the process. The hometest checks if the current node (node/0
)
matches the parent node of the partition as defined in the ring. If the
process determines that the partition it is managing belongs on another
node (the parent node), it will attempt to contact that node. If that
parent node responds, the process will hand off any objects it has
processed for that partition and shut down. If that parent node does not
respond, the process will continue to manage that partition and check
the parent node again after a delay. The hometest is also run by
partition processes to account for changes in the ring, such as the
addition or removal of nodes to the cluster.