Skip to content

Latest commit

 

History

History
executable file
·
244 lines (170 loc) · 12.8 KB

CM_Replication.adoc

File metadata and controls

executable file
·
244 lines (170 loc) · 12.8 KB

Chronicle Map Replication (Enterprise Edition feature)

Enterprise Edition

Chronicle Map Replication is part of Chronicle Map (Enterprise Edition); a commercially supported version of our successful open source Chronicle Map. Extended features include:

  • Replication to ensure real-time backup of all your map data.

  • Resilience support for robust fail-over and disaster recovery environments.

In addition, you will be fully supported by our technical experts.

For more information on Chronicle Map (Enterprise Edition), please contact [email protected].

To ensure that Chronicle Map’s multi-master replication works under extreme circumstances, we have built a stress test to ensure this using the principles introduced by Netflix with their "chaos monkey". For more information on the stress test see CM_Replication_Stress_Test.

Replication

Chronicle Map Replication is a multi-master replicated data store. Chronicle Map supports both TCP and UDP replication

TCP/IP Replication

TCP / UDP Background

TCP/IP is a reliable protocol. This means that, unless you have a network failure or hardware outage, the data is guaranteed to arrive. TCP/IP provides point-to-point connectivity. For example, if the message was sent to 100 hosts, the message would have to be sent 100 times.

With UDP, the message is only sent once. This is ideal if you have a large number of hosts, and you wish to broadcast the same data to each of them. However, one of the big drawbacks with UDP is that it is not a reliable protocol. This means, if the UDP message is broadcast onto the network, the hosts are not guaranteed to receive it; so they can miss data.

Some solutions attempt to build resilience into UDP, but arguably, this is in effect reinventing TCP/IP.

How to setup UDP Replication

On a good quality wired LAN, when using UDP, you will rarely miss messages. Nevertheless, this is a risk that we suggest you do not take. We suggest that whenever you use UDP replication, you use it in conjunction with throttled TCP replication. Therefore, if a host misses a message over UDP, it will pick it up later via TCP/IP.

TCP/IP Throttling

We are careful not to swamp your network with too much TCP/IP traffic. We do this by providing a throttled version of TCP replication. This works because Chronicle Map only broadcasts the latest update of each entry.

How Chronicle Map Replication works

Chronicle Map provides multi-master hash-map replication. This means that each remote map mirrors its changes over to another remote map. Neither map is considered to be the master store of data. Each map uses timestamps to reconcile changes.

We refer to an instance of a remote map as a node. Each node can be connected to up to 128 other nodes.

The data that is stored locally in each node becomes eventually consistent. So changes made to one node, for example by calling put(), will be replicated over to the other node.

To achieve a high level of performance and throughput, the call to put() will not block.

With ConcurrentHashMap, it is typical to check the return code of some methods to obtain the old value; for example, remove().

Due to the loose coupling, and lock-free nature of this multi-master implementation, the return value is only the old value on the node’s local data store. In other words, the nodes are only concurrent locally. Another node, performing exactly the same operation, may return a different value. However, reconciliation will ensure that all the maps will become eventually consistent.

Reconciliation

If two or more nodes receive a change to their maps for the same key, but with different values, say by a user of the maps, calling the put(key,value), then, initially each node will update its local store, and each local store will hold a different value.

The aim of multi-master replication is to provide eventual consistency across the nodes. Using multi-master replication, whenever a node is changed it will notify the other nodes of its change; we refer to this notification as an event.

The event will hold a timestamp indicating the time that the change occurred. It will also hold the state transition; in this case it was a put with a key and value.

Eventual consistency is achieved by looking at the timestamp from the remote node. If, for a given key, the remote node’s timestamp is newer than the local node’s timestamp, then the event from the remote node will be applied to the local node; otherwise, the event will be ignored.

Since none of the nodes is a primary, each node holds information about the other nodes. For a specific node, its own identifier is referred to as the 'localIdentifier'. The identifiers of other nodes are the 'remoteIdentifiers'.

On an update, or insert, of a key/value, the node pushes the information about the change to the remote nodes. The nodes use non-blocking Java NIO I/O, and all replication is done on a single thread.

However, there is a specific edge case. If two nodes update their map at precisely the same time with different values, we have to deterministically resolve which update wins. This is because eventual consistency mandates that both nodes should end up holding the same data locally.

Although it is rare that two remote nodes receive an update to their maps at exactly the same time, for the same key, we have to handle this edge case. We cannot therefore rely on timestamps alone to reconcile the updates.

Typically, the update with the newest timestamp should win, but in this example both timestamps are the same, and the decision made to one node should be identical to the decision made to the other. This dilemma is resolved by using a node identifier. The node identifier is a unique 'byte' value that is assigned to each node. When the timestamps are the same, the remote node with the smaller identifier will be preferred.

Multiple Processes on the same server with Replication

On a single server, if you have a number of Java processes, and then within each Java process you create an instance of a Chronicle Map which binds to the same underline 'file', they exchange data via shared memory, rather than by TCP or UDP replication.

If an instance of Chronicle Map, which is not performing TCP Replication, is updated, then this update can be picked up by another instance of Chronicle Map. This other Chronicle Map instance could be TCP replicated. In such an example, the TCP replicated Chronicle Map instance would then push the update to the remote nodes.

Likewise, if the TCP replicated Chronicle Map instance received an update from a remote node, then this update would be immediately available to all the instances of Chronicle Map on the server.

Identifier for Replication

If you are only replicating your Chronicle Map instances on the same server, then you do not have to set up TCP and UDP replication. You also do not have to set the identifiers; as the identifiers are only used for the resolution of conflicts amongst remote servers.

If however, you wish to replicate data between two or more servers, then all of the Chronicle Map instances, including those not actively participating in TCP or UDP replication, must have their identifiers set.

The identifier must be unique to each server. Each ChronicleMap on the same server must have the same identifier. The reason that all Chronicle Map instances must have the identifier set, is because the memory is laid out slightly differently when using replication, so even if a map is not actively performing TCP or UDP replication itself, if it wishes to replicate with one that is, it must have its memory laid out in the same way to be compatible.

If the identifiers are not set up uniquely, then the updates will be ignored. For example, a Chronicle Map instance that is set up with the identifier equal to '1', will ignore all events which contain the remote identifier of '1'. In other words, Chronicle Map replication ignores updates which have originated from itself. This is to avoid the circularity of events.

When setting up the identifier you can use values from 1 to 127.

The identifier is setup on the builder as follows:

TcpTransportAndNetworkConfig tcpConfig = ...
map = ChronicleMapBuilder
    .of(Integer.class, CharSequence.class)
    .replication(identifier, tcpConfig)
    .create();

Configuration

Configuration of map nodes is done either, programmatically, through configuration files, or by a combination of both.

Programmatically

The example below shows how to set up two map instances that use TCP to replicate entries between them:

// set up some socket aliases
TCPRegistry.createServerSocketChannelFor("host.port1", "host.port2");

// create replicated maps
final ReplicatedMap<Long, Long> replicaOne = createMap(1, "host.port1");
final ReplicatedMap<Long, Long> replicaTwo = createMap(2, "host.port2");

// start replication event loops
replicaOne.start();
replicaTwo.start();

// register maps for replication events
replicaOne.beginReplicationFromPeer("host.port2", (byte) 2);
replicaTwo.beginReplicationFromPeer("host.port1", (byte) 1);

// allow replication to establish
Jvm.pause(250L);

// access the underlying maps
final Map<Long, Long> mapOne = replicaOne.getMap();
final Map<Long, Long> mapTwo = replicaTwo.getMap();

// enter some data into mapOne
for (long i = 0; i < 100; i++) {
    mapOne.put(i, i * 2);
}

// enter some different data into mapTwo
for (long i = 100; i < 200; i++) {
    mapTwo.put(i, i * 7);
}

// wait for replication between nodes (conservative)
Jvm.pause(1000L);

System.out.printf("mapOne has %d entries, expected 200%n", mapOne.size());
System.out.printf("mapTwo has %d entries, expected 200%n", mapTwo.size());

Configuration file

The following example uses a basic yaml configuration file to define defaults for the map:

!ReplicatedMapCfg {
    hostId: 1, # host id must be unique within the cluster
    listenAddressHostPort: "localhost:9301", # address on which to listen for inbound replication requests
    entries: 10000, # entries hint suggesting the initial size of the map
    keyClass: !type long, # class of the key type
    valueClass: !type long, # class of the value type
    exampleKey: 17, # an example key (used to size the map)
    exampleValue: 37, # an example value (used to size the map)
    name: testLongToLongMap, # a name for the map (used to determine filename)
    locator: "csp://localhost:9301/map/test", # a locator, used for discovery
    mapFileDataDirectory: target/mapData, # directory where map file should be stored
    mapLogDirectory: target/mapReplicationLogs, # directory where replication events should be recorded
    keysAreConstantSize: true, # determines how keys are serialised
    valuesAreConstantSize: true # determines how values are serialised
}

After putting 100 entries into each distinct map, the example demonstrates that the entries have been replicated so that each map now contains 200 entries.

This example is available in the repository, MultiMasterMapReplicationExampleMain.java

Replication event logging

Chronicle Map Enterprise can be configured to log all replication events to a Chronicle Queue for auditing purposes.

Currently, a map can be configured to log all outgoing events that it sends to remote peers.

The example below shows the message flow for a map with a single remote peer receiving replication events:

.. header omitted

targetHostId: 2 # message destination
registerForReplication: {
  receiverHostId: 1, # local peer id
  keyClassName: java.lang.Long, # key type
  valueClassName: java.lang.Long, # value type
  sendAllEventsAfter: 0 # bootstrap timestamp
}

..

# corresponding registration message from remote peer
targetHostId: 1
registerForReplication: {
  receiverHostId: 2,
  keyClassName: java.lang.Long,
  valueClassName: java.lang.Long,
  sendAllEventsAfter: 0
}

..

# accept replication connection
targetHostId: 2
acceptingReplicationRequest: {
  acceptingHostId: 1
}

..

# encoded replication update sent to remote peer
targetHostId: 2
replicatedEntry: !!binary AYChq5LqwKXqFAFOEwAAAAAAAAD/////////fw==

This output can be generated by running the example program, ReplicationEventLoggingExampleMain.java


For the replication stress test, see CM_Replication_Stress_Test.