Next-Gen Replication
Next Generation Replication - How it Works
Replication Actors
Each node in riak_kv
starts three processes that manage the inter-cluster replication. A tictac AAE full-sync manager, a replication queue source manager, and a replication queue sink manager. All processes are started by default (whether or not replication is enabled), but will only play an active role should replication be configured. Further details on the processes involved:
Tictac AAE Full-Sync Manager -
riak_kv_ttaaefs_manager
There is a single actor on each node that manages the full-sync reconciliation workload configured for that node.
Each node is configured with the details of a peer node at a remote cluster. Each manager is responsible for controlling cluster-wide hashtree exchanges between the local node and the peer node, and to prompt any repairs required across the cluster (not just on this node). The information is exchanged between the peers, but that information represents the data across the whole cluster. Necessary repairs are prompted through the replication queue source-side manager
riak_kv_replrtq_src
.Each node is configured with a schedule to determine how frequently this manager will run its reconcile and repair operations.
It is is an administrator responsibility to ensure the cluster AAE workload is distributed across nodes with sufficient diversity to ensure correct operation under failure. Work is not re-distributed between nodes in response to failure on either the local or remote cluster, so there must be other nodes already configured to share that workload to continue operation under failure conditions.
Each node can only full-sync with one other cluster (via the one peer node). If the cluster needs to full-sync with more than one cluster, then the administrator should ensure different nodes have the different configurations necessary to achieve this.
Scheduling of work to minimise concurrency of reconciliation operations is managed by this actor using a simple, coordination-free mechanism.
The administrator may at run-time suspend or resume the regular running of full-sync operations on any given node via the
riak_kv_ttaaefs_manager
.
Replication Queue Source-Side Manager
There is a single actor on each node that manages the queueing of replication object references to be consumed from other clusters. This actor runs a configurable number of queues, which contain pointers to data which is required to be consumed by different remote clusters.
The general pattern is that each delta within a cluster will be published once via the
riak_kv_replrtq_src
on a node local to the discovery of the change. Each queue which is a source of updates will have multiple consumers spread across multiple sink nodes on the receiving cluster - where each sink-side node’s consumers are being managed by ariak_kv_replrtq_snk
process on that node.Queues may have data filtering rules to restrict what changes are distributed via that queue. The filters can restrict replication to a specific bucket, or bucket type, a bucket name prefix or allow for any change to be published to that queue.
Real-time replication changes (i.e. PUTs that have just been co-ordinated on this node within the cluster), are sent to the
riak_kv_replrtq_src
in one of the following formats:{Bucket, Key, Clock, {tombstone, Object}};
{Bucket, Key, Clock, {object, Object}};
{Bucket, Key, Clock, to_fetch}.
Real-time replicated objects are the highest priority items to be queued, and are placed on every queue whose data filtering rules are matched by the object. If the priority queue has grown beyond a limited number of items (the number being defined in
riak_kv.replrtq_srcobjectlimt
), then any {object, Object} references is stripped and replaced withto_fetch
. This is to help limit the memory consumed by the queue during failure conditions i.e. when a sink has stopped consuming from the source queue.Changes identified by AAE full-sync replication processes run by the
riak_kv_ttaaefs
manager on the local node are sent to theriak_kv_replrtq_src
as references, and queued as the second highest priority. These changes are queued only on a single queue defined within the configuration ofriak_kv_ttaaefs_manager
. The changes queued are only references to the object (Bucket, Key and Clock) not the actual object.Changes identified by AAE fold operations for administrator initiated transition or repair operations (e.g. fold over a bucket or key-range, or for a given range of modified dates), are sent to the
riak_kv_replrtq_src
to be queued as the lowest priority onto a single queue defined by the administrator when initiating the AAE fold operation. The changes queued are only references to the object (Bucket, Key and Clock) not the actual object - and are only the changes discovered through the fold running on vnodes local to this node.Should the local node fail, all undelivered object references will be dropped.
Queues are bounded, with limits set separately for each priority. Items are consumed from the queue in strict priority order. So a backlog of non-real-time replication events cannot cause a backlog or failure in real-time events.
The queues are provided using the existing
riak_core_priority_queue
module in Riak.The administrator may at run-time suspend or resume the publishing of data to specific queues via the
riak_kv_replrtq_src
process.
Replication Queue Sink-Side Manager
riak_kv_replrtq_snk
There is a single actor on each node that manages the process of consuming from queues on the
riak_kv_replrtq_src
on remote clusters.The
riak_kv_replrtq_snk
can be configured to consume from multiple queues, across an open-ended number of peers. For instance if each node on Cluster A maintains a queue namedcluster_c_full
, and each node on Cluster B maintains a queue namedcluster_c_partial
- thenriak_kv_replrtq_snk
can be configured to consume from thecluster_c_full
from every node in Cluster A and fromcluster_c_partial
from every node in Cluster B.The
riak_kv_replrtq_snk
manages a finite number of workers for consuming from remote peers. Theriak_kv_replrtq_snk
tracks the results of work in order to back-off slightly from peers regularly not returning results to consume requests (in favour of those peers indicating a backlog by regularly returning results). Theriak_kv_replrtq_snk
also tracks the results of work in order to back-off severely from those peers returning errors (so as not to lock too many workers consuming from unreachable nodes).The administrator may at run-time suspend or resume the consuming of data from specific queues or peers via the
riak_kv_replrtq_snk
.
Real-time Replication - Step by Step
Previous replication implementations initiate replication through a post-commit hook. Post-commit hooks are fired from the riak_kv_put_fsm
after “enough” responses have been received from other vnodes (based on n, w, dw and pw values for the PUT). Without enough responses, the replication hook is not fired, although the client should receive an error and retry. This process of retrying may eventually fire the hook - although it is possible for a PUT to fail, the hook not to be fired, but a GET be locally successful (due to read-repair and anti-entropy) and there be no clue that the object has not been replicated.
In implementing the new replication solution, the point of firing off replication has been changed to the point that the co-ordinated PUT is completed. So the replication of the PUT to the clusters may occur in parallel to the replication of the PUT to other nodes in the source cluster. This is the first opportunity where sufficient information is known (e.g. the updated vector clock), and reduces the size of the time-window of inconsistency between the clusters, and also reduce the window of opportunity for a PUT to succeed but not have replication triggered.
Replication is fired within the riak_kv_vnode
actual_put/8
. On condition of the vnode being a co-ordinator of the put, and of riak_kv.replrtq_enablesrc
being set to enabled (true), the following work is done:
The object reference to be replicated is determined, this is the type of reference to be placed on the replication queue.
If the object is now a tombstone, the whole object is used as the replication reference. The whole object is used due to the small size of the object, and the need to avoid race conditions with reaping activity if
delete_mode
is notkeep
- the cluster may not be able to fetch the tombstone to replicate in the future. The whole object must be kept on the queue and not be filtered by theriak_kv_replrtq_src
to be replaced with ato_fetch
reference.If the object is below the
riak_kv.replrtq_srcobjectsize
(default 200KB) then whole object will be sent to theriak_kv_replrtq_src
, and it will be queued as a whole object as long as the current size of the priority real-time queue does not exceed theriak_kv.replrtq_srcobjectlimit
(default 1000). If an object is over the size limit ato_fetch
references will be sent instead of the object, and if the queue is too large theriak_kv_replrtq_src
will substitute ato_fetch
reference before queueing.
The
{Bucket, Key, Clock, ObjectReference}
is cast to theriak_kv_replrtq_src
and placed by theriak_kv_replrtq_src
on the priority queue.The queue has a configurable absolute limit, that is applied individually for each priority. The limit is configured via
riak_kv.replrtq_srcqueuelimit
and defaults to 300,000 references (5 minutes of traffic at 1,000 PUTs per second). When this limit is reached, new replication references are discarded on receipt rather than queued - these discarded references will need to eventually be re-replicated via full-sync.
The reference now needs to be handled by the riak_kv_replrtq_src
. The task list for this process is:
Assign a priority to the replication event depending on what prompted the replication (e.g. highest priority to real-time events received from co-ordinator vnodes).
Add the reference to the tail of the every matching queue based on priority. Each queue is configured to either match
any
replication event, no real-time events (using the configurationblock_rtq
), or a subset of events (using either a buckettype
filter or abucket
filter).
In order to replicate the object, it must now be fetched from the queue by a sink. A sink-side cluster should have multiple consumers, on multiple nodes, consuming from each node in the source-side cluster. These workers are handed work items by the riak_kv_replrtq_snk
, with a Riak client configured to communicate to the remote node, and the worker will initiate a fetch
from that node.
On receipt of the fetch
request the source node should:
Initiate a
riak_kv_get_fsm
, passing{queuename, QueueName}
in place of{Bucket, Key}
.The GET FSM should go directly into the
queue_fetch
state, and try and fetch the next replication reference from the given queue name via theriak_kv_replrtq_src
.If the fetch from the queue returns
queue_empty
this is relayed back to the sink-side worker, and ultimately theriak_kv_replrtq_snk
which may then slow down the pace at which fetch requests are sent to this node/queue combination. To reduce the volume of individual requests when queues are mainly empty, the queue is only considered empty if it has reported empty 8 times from requests 4ms apart.If the fetch returns an actual object, this is relayed back to the sink worker.
If the fetch returns a replication reference with the flag
to_fetch
, theriak_kv_get_fsm
will continue down the standard path os states starting withprepare
, and fetch the object which the will be returned to the sink worker.
If a successful fetch is relayed back to the sink worker it will replicate the PUT using a local
riak_client:push/4
. The push will complete a PUT of the object on the sink cluster - using ariak_kv_put_fsm
with appropriate options (e.g.asis
,disable-hooks
).- The code within the
riak_client:push/4
follows the behaviour of the existingriak_repl
on receipt of a replicated object.
- The code within the
If the fetch and push request fails, the sink worker will report this back to the
riak_kv_replrtq_snk
which should delay further requests to that node/queue so as to avoid rapidly locking sink workers up communicating to a failing node.
Full-Sync Reconciliation and Repair - Step by Step
The riak_kv_ttaaefs_manager
controls the full-sync replication activity of a node. Each node is configured with a single peer with which it is to run full-sync checks and repairs, assuming that across the cluster sufficient peers to sufficient clusters have been configured to complete the overall work necessary for that cluster. Ensuring there are sufficient peer relations is an administrator responsibility, there are no re-balancing or re-scaling scenarios during failure scenarios.
The riak_kv_ttaaefs_manager
is a source side process. It will not attempt to repair any discovered discrepancies where the remote cluster is ahead of the local cluster - the job of the process is to ensure that a remote cluster is up-to-date with the changes which have occurred in the local cluster. For mutual full-sync replication, there will be a need for an equivalent configuration on the peer cluster.
The riak_kv_ttaaefs_manager
has a schedule of work obtained from the configuration. The schedule has wants, the number of times per day that it is desired that this manager will:
Reconcile changes across all the whole cluster over all time;
Skip work for a schedule slot and do nothing;
Reconcile changes that have occurred in the past hour;
Reconcile changes that have occurred in the past day.
On startup, the manager looks at these wants and provides a random distribution of work across slots. The day is divided into slots evenly distributed so there is a slot for each want in the schedule. It will run work for the slot at an offset from the start of the slot, based on the place this node has in the sorted list of currently active nodes. So if each node is configured with the same total number of wants, work will be synchronised to have limited overlapping work within the cluster.
When, on a node, a scheduled piece of work comes due, the riak_kv_ttaaefs_manager
will start an aae_exchange
to run the work between the two clusters (using the peer configuration to reach the remote cluster). Once the work is finished, it will schedule the next piece of work - unless the start time for the next piece of work has already passed, in which case the next work is skipped. When all the work in the schedule is complete, a new schedule is calculated from the wants.
When starting an aae_exchange
the riak_kv_ttaaefs_manager
must pass in a repair function. This function will compare clocks from identified discrepancies, and where the source cluster is ahead of the sink, send the {Bucket, Key, Clock, to_fetch}
tuple to a configured queue name on riak_kv_replrtq_src
. These queued entries will then be replicated through being fetched by the riak_kv_replrtq_snk
workers, although this will only occur when there is no higher priority work to replicate i.e. real-time replication events prompted by locally co-ordinated PUTs.