Skip to content

Latest commit

 

History

History
199 lines (126 loc) · 15.2 KB

RAC:-Reliable-Asynchronous-Clustering.asciidoc

File metadata and controls

199 lines (126 loc) · 15.2 KB

Author: Bela Ban, Oct 2012 (original document: RAC)

(related to Cross-Site Replication)

Overview

Recently, code has been added to Infinispan to provide replication of data between sites (a site being a local cluster). (I use the term 'replication' although technically speaking I’m referring to 'distribution mode' in Infinispan).

Changes can be replicated asynchronously and synchronously, when a transaction (TX) is committed or whenever the change happened. Asynchronous replication is faster than sync replication, as callers don’t have to wait for the acks of the receivers.

In this document, we’re only considering transactional updates, although similar arguments apply for non-transactional replication.

Async replication simply sends the updates collected in the TX to the remote site.

Sync replication runs a two-phase commit (2PC): when the TX is committed, a PREPARE is sent to the participating node, and - when all acks have been received - a COMMIT is sent, or a ROLLBACK when one or more nodes failed to apply the PREPARE, or when some nodes crashed or left (or were not reachable) in the process.

Sync replication across sites involves sending the PREPARE message also to the site master of the remote site, which in turn applies the PREPARE to its local cluster (forwarding the PREPARE to all nodes which have keys involved in the TX and waiting for their acks). When the site master has collected all PREPARE responses, it sends its ack back to the originator of the TX (in the originating site).

Problems with sync replication

There are 2 problems with sync replication:

  1. Latency

Usually, sites are connected via a high-throughput / high-latency link, e.g around 50-70 ms. Neglecting the cost of a local 2PC, a 2PC across 2 sites would cost (assuming 50 ms of latency) ca. 150 - 200 ms: a PREPARE call to the remote site plus the response (100 ms), then the same for the COMMIT or ROLLBACK (100 ms) == 200 ms. If the second phase is asynchronous, then we’d only incur 150 ms.

  1. More nodes == higher probability of failures

The more nodes we have participating, the higher the likelihood is that something goes wrong. Remember, a single node (either local or in the remote site) failing to apply the PREPARE means that the TX will be rolled back.

For example, if we have N participants in a TX, and the probability of 1 participant failing to apply a PREPARE is 2%, then the overall probability of the TX failing is (N * 2)%. If we have 5 participants, the failure probability is 10%, for 10 participants it is 20% and so on.

Compounding this is that the communication between the local and remote site includes multiple components, e.g. forwarding to a site master, sending via the bride, and dispatching to the right target at the remote site. So N should probably not just include the participants, but also these components.

Goals

It would be ideal if we could have the hard reliability of sync replication, but without the downsides listed above. More specifically, RAC should have the following properties:

High reliability

No updates are ever lost unless the entire primary site crashes. In this case, updates applied in the [lag] time period would be lost. Note that communication lost between primary and backup site(s) will not lead to data loss, but may trigger a resync of the backup site when connectivity is restored.

Correct ordering of updates

All updates should be applied at the backup site in the order in which they were sent.

Speed

It should take no more than a few milliseconds to commit a sync transaction - Configurable lag: the backup site(s) should be no more than N milliseconds (configurable) behind the primary site

In other words, we want the speed of async replication combined with the reliability of sync replication.

Approach

This is a high-level description only; see the implementation section below for details.

RAC targets sync replication but avoids sending the PREPARE message to the remote site(s) as part of the 2PC. This avoids problem #1 and reduces the number of participants, which mitigates problem #2.

Instead, RAC takes part in the 2PC but only stores the modified keys in 'stable storage'. An update process will then periodically grab the keys, fetch the associated values and send the updates to the remote site(s). This is all done in the background, so that sync TX complete quickly.

As 'stable storage', we could have picked the disk, but the problem is that if a machine crashes, that disk will not be accessible anymore. A shared file system wouldn’t help either, as this means we would incur the cost of RPCs (e.g. with NFS) to sync to disk.

So we chose memory as 'stable storage'. A number of nodes of a site (say 3 out of 10) act as 'updaters'. An updater is chosen based on its rank in the JGroups view. E.g. if we have {A,B,C,D,E,F,G}, then the first 3 could act as updaters: A, B and C. When a new view is installed, every member knows whether it is an updater or not, based on the view and its rank. Because everyone gets the same sequence of views, the decision whether or not a node is an updater is purely local and doesn’t require an election process (involving messages being sent around).

When a TX commits, as part of the PREPARE phase, the node which is committing the TX broadcasts an UPDATE message, including only the updated keys (not the values). Every key needs to be listed only once, i.e. if a TX modified K1 20 times, then K2 and K3, the list will be [K1,K2,K3]. Note that we don’t need to ship the type of operation (PUT, REMOVE, REPLACE) with the UPDATE message, as the updaters will fetch the associated value anyway later.

Everybody receives the UPDATE message, but only the updaters (A, B, C) store the keys. The updaters send an ack back to the sender and this completes the TX.

An updater 'owns' keys, i.e. it is responsible for replicating a certain range of keys to the backup site(s). The decision which updater owns which keys is made by using a consistent hash, mapping keys to updaters. E.g. if we have keys [1..10] updater A could process [1..3], B [4..7] and C [8..10].

As an alternative, we could also have a set of nodes (N) which store modified keys in memory and a set up updaters (K, where K < = N) which ship the modifications to the backup sites. If K == 1, then we’d be assured that all modifications arrive at the backup sites in total order. If K > 1, then we would still have FIFO order (per key).

Coming back to 'stable storage', as long as no more than N-1 updaters crash at exactly the same time, the keys to be processed (replicated) are not lost. This provides somewhat lesser guarantees than a disk, but should be good enough in most cases, as N is configurable and if only a few updaters crash, new ones are going to be picked immediately and the keys will be rebalanced to make sure they’re not lost.

OK, so now that we’ve stored the keys to be replicated in 'stable storage', we need to periodically send them to the backup site(s). To this end, every updater has a task which runs when the queue exceeds a certain number of elements, or a timer expires.

The updater then sends a REPLICATE message to the site master(s), which in turn apply the updates in their local site(s). This is done using the site’s configuration, i.e. asynchronously, synchronously, with or without a TX. When done, the site master sends back an ack to the sender, and this signals to the sender the successful completion of the update task.

When a site goes down, or updates fails for an extended period of time, a site can be marked offline. This means that state transfer would have to kick in when that site comes up again.

Advantages of RAC

  • Because we have N updaters, this provides a certain probability against node failures (a.k.a. 'stable storage').

  • N updaters shoulder all the updates, so the work is distributed.

  • The queues in the updaters only store a given key once. So if we have 1000 updates to a given key in a TX, the key will be updated only once in the backup sites and - more importantly - it will only be included once in the REPLICATE message which saves bandwidth and CPU cycles.

  • Cloud-friendly: because of the above point, we can better utilize either low bandwidth to the cloud, or costs associated with the bandwidth used to ship updates to a cloud.

  • Having to store only keys and not values in the updaters' queues means we don’t need a lot of memory.

  • Updaters can be stateless (beyond storing the keys), as they fetch the current value when triggering an update to a remote site (See above: an updater always fetches the current value).

  • Every updater is responsible for the same set of keys, so different updates to the same key end up in the same updater queue, so updates are sent in an ordered way. Caveat: view changes do affect this, but a good consistent hash to pick updaters should minimize the risk of unordered updates.

State transfer

This is needed when a site goes online and it needs to grab the state (sync) from an existing site.

Because RAC is rather simplistic, state transfer is also simple: all we need to do is to send an UPDATE for all the keys (in-memory and on disk, e.g. in cache loaders) in the primary site, and the mechanism already in place for replication will make sure that the updates are sent to the newly started site.

Conflict resolution (concurrent updates to the same data in different sites)

If we have updates to the same keys in different sites, we can use the following mechanism to prevent inconsistencies:

  • Every updater acquires (local) write locks (using the lock API) on they keys that are about to be replicated to the remote site

  • When done, the keys are released

Example

  • Updater A in LON and updater B in SFO, both want to replicate key K. A tries to set K=V1, B tries to set K=V2.

  • Updater A write-locks K in LON.

  • Updater B write-locks K in SFO and sends the REPLICATE(K,V2) message to LON

  • Updater A sends the REPLICATE(K,V1) message to SFO

  • B times out trying to acquire the lock for K in LON, releases the write-lock for K in SFO and sleeps (randomly) before retrying

  • A acquires the lock in SFO and sets K=V1 in SFO, then releases the remote lock in SFO and the local lock in LON

  • B wakes up and retries. It fetches the value for K and locks it locally in SFO. The value is now V1 (set by A)

  • B send the REPLICATE(K,V2) to LON, acquires the lock in LON and sets K=V2 both in LON and SFO

  • B then releases the remote and local lock

K is now V2 in LON and SFO. This is essentially a last-updater wins algorithm, but it guarantees that K will be consistent across all sites.

This algorithm should be off by default as one of the assumptions of cross-site replication is that updaters in different sites (in the active-active case) always update disjoint data sets. If this is not the case, the mechanism could be enabled, e.g. via configuration.

TBD

  • Do we broadcast the UPDATE on PREPARE or COMMIT? On PREPARE would mean we’d have to send a second COMMIT or ROLLBACK message. On COMMIT means we don’t block the TX at all, but the update might get lost when the node crashes before the COMMIT.

  • When an updater crashes, do we perform some sort of state transfer to the new updaters? Use case: we have 3 updaters. Now 2 of them crash at the same time. 2 new ones are picked, but they have no keys. If the only updater with keys crashes, too, its updates will be lost. So a rebalancing of the keys would be in order…​

  • State transfer: only update keys to the newly started site, not to an existing site!

  • Could RAC also be used for regular replication (within a local site only)?

  • Possibly use multiple site masters to mask transient failures of one site master. Requests could be load balanced to any site master, or we could always pick the first site master and failover if that one is down.

    • Problem: we need to change routing as there can only be one entry for each site.

References

Implementation

  • my-keys

  • all-keys

  • pending-keys

All 3 queues are FIFO ordered and duplicate-free, ie. a FIFO-ordered set

  • On reception of UPDATE(keys):

    • Check if current member is an updater. If not ⇒ discard message

    • If no update is in progress, add keys to all-keys, and add the keys I’m responsible for to my-keys

    • Else: add keys to pending-keys

  • Periodically (or when queue exceeds a certain size):

    • Block insertions into all-keys

    • Grab keys from my-keys and replicate them to the remote sites

    • If replication was successful: clear my-keys

    • Process pending-keys: add all to all-keys, add keys I should process to my-keys, clear pending-keys

    • Unblock insertions into all-keys

  • On view change V:

    • Determine which keys in all-keys needs to be processed by me and add them to my-keys

  • Replication to a remote site (keys):

    • Create a REPLICATE message containing a map with keys and values

    • For each key K:

      • Fetch the value for K from the local site (if the value is null, mark the value as removed (tombstone?) and add it to the map

    • Send the REPLICATE message to the site master(s) of the remote site(s), wait for the ack

    • If successful: broadcast a REMOVE(keys) message

  • On REMOVE(keys):

    • (only when updater): remove keys from all-keys (not from pending-keys as the K might have been updated again meanwhile!)

Notes from Meeting

Remove UPDATE message

The primary owner will be the updater for its own key and the 'stable storage' can be another Infinispan cache.

  • pros:

    • no special code when nodes join and leave (the state transfer will make sure that the update keys will not be lost)

    • no need to fetch the key remotely since the primary owner has the values locally

  • cons:

    • memory consumption by other cache.

Note: another alternative would be each owner (primary + backups) keeps a local set of updated keys.

New locking for RAC REPLICATE message

It was also discussed the idea is to use a synchronizer (similar with the one currently used by L1) to detect and solve possible conflicts between sites. In that way, each key will have a synchronizer when a REPLICATE message is sent/received to/from a remote site.

On REPLICATE sent to remote site:

  • for each key, create if absent a new synchronizer

    • if synchronizer exists, resolve conflict (or ignore the update for that key, since it was just overwritten by the other site value and we no longer has access to the old value)

    • else, send REPLICATE message

  • if synchronizer is still valid (see below), remove the key.

On REPLICATE from remote site:

  • for each key, create if absent a new synchronizer

    • if synchronizer exists, resolve conflict. Both sites sent the REPLICATE message so we need to decide which value will be used as final value.

  • remove the key if the synchronized is still valid.

On key updated by a local transaction:

  • if a synchronizer exists, mark it as invalid (we have a new value to replicate to the other site)

  • add the key (if it does not exists already)