Skip to content

Cluster Tutorial

Dmytro Vyazelenko edited this page Aug 22, 2024 · 36 revisions

Aeron Cluster Tutorial

Important
This tutorial is currently a work in progress, information may be missing or incomplete.

This tutorial assumes that the user already has a basic working knowledge of Aeron Messaging.

1. Introduction

Aeron Cluster is a framework for high-performance in-memory fault-tolerant services. It implements the Raft Consensus Algorithm to provide log replication, to allow multiple nodes to maintain the same state and automated leader election to ensure that there is a single leader within the cluster. Systems built using Aeron Cluster have a linearizable consistency model and can cope with partial system failure, including failure of the leader node, as long as a quorum of (n / 2) + 1 nodes remain available.

1.1. Raft Basics

It is not the intention of this tutorial to go into a full description of the Raft Consensus Algorithm, however it is difficult explain Aeron Cluster without a few definitions up front. The key concepts are (some are Aeron specific):

  • Node, a physical server, container instance, VM or group of processes that represents a logical server within a cluster.

  • Leader, a node within the cluster responsible for replicating messages to the other nodes and waiting for acknowledgements.

  • Follower, other nodes within the cluster that receive messages replicated from the leader.

  • Election, process by which the cluster agrees on a new leader.

  • Client, node external to the cluster.

  • Ingress, the messages from a client into the cluster.

  • Egress, response messages from the cluster back to a client.

  • Snapshot, serialised representation of the application logic’s state at a point in time.

1.2. Replication and Recovery

Within Aeron Cluster, replication (ensuring that a follower node has the same state as the leader) and recovery (restoring a stopped node to its previous state)[1] are the same problem, or at least at similar enough, to use the same mechanism. They rely on three functions. Firstly, for some known initial state, this could either be the state when first provisioned (a 'null' or empty state), or some snapshot of state at a point in time. Secondly, an ordered log of all input messages, this is handled by Cluster’s Raft implementation. Thirdly, the application logic needs to be deterministic, i.e. it must derive all of its resulting state and output events from the initial state, and the input messages, such that for the same inputs it will always get the same result. This can be a trickier than it first seems as it excludes some functionality that we may normally take for granted.

Tip

Be careful to ensure determinism in how you code a clustered service, common pitfalls include:

  • Timestamps

  • Random Numbers

  • Reading from configuration files.

  • Iterating over some types of collections, e.g. HashMaps.

Time, as timestamps, is provided by Aeron Cluster (will be shown later). Random numbers are best avoided or use a seed fixed in the initial state or via a message. Configuration changes should be pushed into the application logic via messages. Iterating over collections should happen in a known order, with a collection that preserves ordering (e.g. TreeMap, LinkedHashMap) through sorting the values from the collection before sending them as messages. Alternatively, the Agrona collections can be used with their consistent iteration order and low allocation characteristics.

As it may be impractical to replay all data from the "beginning of time" the system should take snapshots periodically, e.g. daily or hourly. The frequency of snapshots should be determined by the volume of data into the system, the throughput of the business logic, and the desired mean time to recovery. It is not uncommon to have systems that may take an hour or two to recover from a days worth of messages, in those systems snapshotting every 30 minutes may be more appropriate.

1.3. Components of Aeron Cluster

One of the key design goals of Aeron is to build a system that is highly composable. For example, Aeron Archive which provides a means to persist the Raft log created by Aeron Cluster. Aeron Cluster sends messages via a Media Driver. Therefore, Aeron Cluster is an aggregation of a number of existing Aeron components and a few new ones. To successfully run a cluster node it is necessary to have one (or at least one in the case of ClusteredServiceContainer) of each of the Aeron components running. Because all communication between these components within a single node uses IPC they can be run all in the same process, in separate processes or any arbitrary combination.

1.3.1. MediaDriver

The Media Driver is the means by which the cluster communicates. Aeron Cluster reuses the Publication, Subscription, and Counter functionality in Aeron to handle all distributed and inter-process communication.

1.3.2. Archive

Raft is primarily a log replication protocol, so Aeron Cluster uses Aeron Archive to persist its log.

1.3.3. ConsensusModule

The Consensus Module is the key component in Aeron Cluster and ensures the nodes have a consistent copy of the replicated log. The Consensus Module will coordinate with the Archive to persist messages, replicate/ack messages to/from other nodes and deliver messages through to the Clustered Services.

1.3.4. ClusteredServiceContainer

This is the service running the developer supplied application logic. There can be one or more clustered services per node. Aeron Cluster provides a container for the application logic. There is a ClusteredService interface that must be implemented by the application to receive the events and messages from Cluster.

2. Getting Started Aeron Cluster

Install JDK 8, 11, or 17 and ensure that $JAVA_HOME is set and $JAVA_HOME/bin is on your $PATH.

The typical way to get started with Aeron in your own application is to use the aeron-all jar. However, for this tutorial we are going to recommend that you check out the source code from GitHub.

2.1. Using the Source From this Tutorial

In order to look at the examples and run the code from this tutorial you will need to checkout the full Aeron source code, use the appropriate release version and build the projects.

> git clone https://github.com/real-logic/aeron.git
> cd aeron
> git checkout -b my_tutorial 1.45.0
> ./gradlew

The location on your computer where this has been checked out to will be referred to as <AERON_HOME>.

This tutorial will include snippets of code from the working example. The scripts and classes for this tutorial are:

  • <AERON_HOME>/aeron-samples/src/main/java/io/aeron/samples/tutorial/cluster/

    • BasicAuctionClusteredService.java

    • BasicAuctionClusteredServiceNode.java

    • BasicAuctionClusterClient.java

  • <AERON_HOME>/aeron-samples/scripts/cluster/

    • basic-auction-cluster

    • basic-auction-client

You may wish to open these up in your favourite IDE or editor while we proceed with the tutorial.

3. Implementing a Clustered Service

The first step to setting up a cluster is to implement the application logic. For this tutorial we are going to implement simple auction service. It will have one auction and will track the best price, and an id for the customer who bid that price. To properly demonstrate the state management and recovery features of cluster it is important to have some functionality that is stateful, rather than something that is stateless like an echo service.

Note
In order to understandably show the various features of Cluster, we’ve kept the various aspects of the code (data serialisation, application logic, Cluster integration) very close together. It is unlikely you would do this in a real world application. You would probably want to have greater separation of concerns and perhaps use libraries to handle functionality like serialisation.

We must define the link between the application logic and Aeron Cluster. This is done by implementing the ClusteredService interface. The ClusteredService interface defines a number of callbacks which deliver messages and lifecycle events as they occur, as well as providing the hooks that our service can use to interact with the cluster (e.g. sending response messages and taking snapshots).

public class BasicAuctionClusteredService implements ClusteredService

3.1. Start Up

The clustered service container will notify the service it has started using the onStart callback. This will occur before input messages are received, either from log replay or live from a client. It is during this phase that we need to load the initial state of the service. Aeron Cluster passes in an Image that will contain the most recent valid snapshot of the service. The service should take care of deserializing the data from the image and initialise its state. We will come back to the details of how the snapshot is loaded after we’ve looked how messages are handled and how a snapshot is stored.

public void onStart(final Cluster cluster, final Image snapshotImage)
{
    this.cluster = cluster;                      // (1)
    this.idleStrategy = cluster.idleStrategy();  // (2)

    if (null != snapshotImage)                   // (3)
    {
        loadSnapshot(cluster, snapshotImage);
    }
}
  1. Take a reference to the cluster, so that we can have access to features of the cluster within the other callbacks.

  2. Take a reference to the cluster’s idle strategy. This will be used anytime we enter a busy loop within service.

  3. The snapshot can be null (this occurs the first time the service is started).

3.2. Handling Messages

The onSessionMessage callback is the main entry point for requests coming into the cluster. Messages reach here by being published to the Cluster’s ingress channel. This method will also be passed messages during log replay. It is from within this method we will interact with our application logic. Cluster also provides a reliable timestamp as a parameter to this method. As mentioned earlier this is one of the challenges of building deterministic systems. Use this value as the timestamp within your application state, so it will be consistent under replay.

private final Auction auction = new Auction();
public void onSessionMessage(
    final ClientSession session,
    final long timestamp,
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
{
    final long correlationId = buffer.getLong(offset + CORRELATION_ID_OFFSET);                   // (1)
    final long customerId = buffer.getLong(offset + CUSTOMER_ID_OFFSET);
    final long price = buffer.getLong(offset + PRICE_OFFSET);

    final boolean bidSucceeded = auction.attemptBid(price, customerId);                          // (2)

    if (null != session)                                                                         // (3)
    {
        egressMessageBuffer.putLong(CORRELATION_ID_OFFSET, correlationId);                       // (4)
        egressMessageBuffer.putLong(CUSTOMER_ID_OFFSET, auction.getCurrentWinningCustomerId());
        egressMessageBuffer.putLong(PRICE_OFFSET, auction.getBestPrice());
        egressMessageBuffer.putByte(BID_SUCCEEDED_OFFSET, bidSucceeded ? (byte)1 : (byte)0);

        idleStrategy.reset();
        while (session.offer(egressMessageBuffer, 0, EGRESS_MESSAGE_LENGTH) < 0)                 // (5)
        {
            idleStrategy.idle();                                                                 // (6)
        }
    }
}

For our input message we have 3 fields, a correlationId which is the customer supplied identifier for the message, customerId to identify the customer placing the bid, and a price (we’ve used a long for this, which represents the value in cents).

  1. Pull the data out of the message. This is similar to the pattern used in an Aeron Subscription’s onFragment callback when doing a poll.

  2. Execute the business logic, in our case this is applying the incoming bid to the auction to see if it is a winner.

  3. The ClientSession allows the service to get information about the calling client, but also provide a means to return responses back to the client.

  4. Serialise response message.

  5. Calling offer on the client session will send the response on the egress channel. Make sure to check the return value of offer as it is a non-blocking call and not guaranteed to succeed.

  6. When doing any busy loops within the clustered application use the Cluster’s IdleStrategy::idle(int) within the wait loop to allow the service to wait while allowing other threads to progress. It will take care of handling thread interrupts and ensure the node fails correctly.

3.3. Storing State

As was mentioned earlier we need to regularly take snapshots of the service’s state in order to reduce the mean time to recovery and facilitate release migration. There is a callback onTakeSnapshot that will be called when it is time to snapshot the state of the service.

Aeron Cluster provides an ExclusivePublication to write a snapshot as the serialised representation of the application logic’s state. For real world applications snapshotting can become tricky. The two big concerns you have includes: ensuring that you consistently write snapshots, and dealing with fragmentation of the application state across messages. For now, our state is so simple neither of those will impact us.

public void onTakeSnapshot(final ExclusivePublication snapshotPublication)
{
    snapshotBuffer.putLong(SNAPSHOT_CUSTOMER_ID_OFFSET, auction.getCurrentWinningCustomerId());  // (1)
    snapshotBuffer.putLong(SNAPSHOT_PRICE_OFFSET, auction.getBestPrice());

    idleStrategy.reset();
    while (snapshotPublication.offer(snapshotBuffer, 0, SNAPSHOT_MESSAGE_LENGTH) < 0)            // (2)
    {
        idleStrategy.idle();
    }
}
  1. Write the persistent part of the application logic to a message buffer. In our case, the currently winning customer id and bid price.

  2. Write the message to the publication, again we need to check the return from the offer call and use the Cluster’s IdleStrategy::idle inside the busy wait loop.

3.4. Loading State

Our onStart implementation contained a method to load the snapshot. Now we have seen how to take a snapshot, we can now look at how it is loaded. The Image provided to the onStart has a method that will indicate if there is no more data available. However, application code should also encode enough information that the end of the stream can be detected from the stored messages. The two approaches can be used to sanity check all the received data.

private void loadSnapshot(final Cluster cluster, final Image snapshotImage)
{
    final MutableBoolean isAllDataLoaded = new MutableBoolean(false);
    final FragmentHandler fragmentHandler = (buffer, offset, length, header) ->         // (1)
    {
        assert length >= SNAPSHOT_MESSAGE_LENGTH;                                       // (2)

        final long customerId = buffer.getLong(offset + SNAPSHOT_CUSTOMER_ID_OFFSET);
        final long price = buffer.getLong(offset + SNAPSHOT_PRICE_OFFSET);

        auction.loadInitialState(price, customerId);                                    // (3)

        isAllDataLoaded.set(true);
    };

    while (!snapshotImage.isEndOfStream())                                              // (4)
    {
        final int fragmentsPolled = snapshotImage.poll(fragmentHandler, 1);

        if (isAllDataLoaded.value)                                                      // (5)
        {
            break;
        }

        idleStrategy.idle(fragmentsPolled);                                             // (6)
    }

    assert snapshotImage.isEndOfStream();                                               // (7)
    assert isAllDataLoaded.value;
}
  1. Our snapshot is a stream of messages written to a Publication, we then use the Image::poll method for extracting data from the snapshot.

  2. Our total snapshot length (16 bytes) is going to be smaller than any reasonable MTU therefore we can assume that all the data will come in a single message fragment.

  3. Once all data is loaded, we can initialise the application logic state from the snapshot.

  4. The method Image::isEndOfStream can be used to determine if there is going to be any more input.

  5. Once we’ve loaded all the data for the application we can break out of the snapshot loading loop.

  6. Again make sure we use the Cluster’s IdleStrategy::idle in the tight loops. It could take time for the snapshot to be propagated to the service, so the number of fragments can be zero on any given invocation.

  7. It is also worthwhile having some sanity checks, these ensure that the snapshot store/load code and Aeron agree on where the end of the input data is. We’ve used asserts, but other mechanisms, (e.g. log message or exceptions) could also be used to indicate an issue.

3.5. Other events

There a number of other events received by the ClusteredService interface, such as timer events, clients connecting and disconnecting from the cluster, plus leadership and role changes. We won’t look at these in this tutorial.

4. Configuring a Cluster

Now we have our application implemented we can move onto running it in a Cluster. There are a number of moving parts to setting up a cluster node, one of the trickiest parts of using Aeron Cluster is getting the configuration correct. We are going to start with a static three-node cluster with a simplified configuration where all the components (MediaDriver, Archive, ConsensusModule, and ClusteredServiceContainer) for a single node within a single process. Later we will then start the cluster as three separate processes.

4.1. Running Multiple Nodes on the Same Host

In a production deployment, you will want to run the 3 instances on 3 separate servers, however for our example we want to run the services on a single machine. This does make port allocation a concern as each node within the cluster needs to bind to a number of ports, so we need to make sure are no clashes. We could do this with VMs or containers, but in the interest of simplicity we are going to specify a port range for each node. For example:

  • Node 0, ports: 9000-9099

  • Node 1, ports: 9100-9199

  • Node 2, ports: 9200-9299

private static final int PORT_BASE = 9000;
private static final int PORTS_PER_NODE = 100;
private static final int ARCHIVE_CONTROL_PORT_OFFSET = 1;
static final int CLIENT_FACING_PORT_OFFSET = 2;
private static final int MEMBER_FACING_PORT_OFFSET = 3;
private static final int LOG_PORT_OFFSET = 4;
private static final int TRANSFER_PORT_OFFSET = 5;
private static final int LOG_CONTROL_PORT_OFFSET = 6;
private static final int TERM_LENGTH = 64 * 1024;

static int calculatePort(final int nodeId, final int offset)
{
    return PORT_BASE + (nodeId * PORTS_PER_NODE) + offset;
}
private static String udpChannel(final int nodeId, final String hostname, final int portOffset)
{
    final int port = calculatePort(nodeId, portOffset);
    return new ChannelUriStringBuilder()
        .media("udp")
        .termLength(TERM_LENGTH)
        .endpoint(hostname + ":" + port)
        .build();
}

While we don’t really need 100 ports for each node (closer to 10), it does make it clear which ports are assigned to each service. Each endpoint will be an offset from the first port in a node’s range. E.g. the archive control request port has an offset of 1, so for Node 2 it will have port 9201.

To start the cluster node we are going define our own class with a main method. It will construct and start all the necessary components. As indicated above we are going to give each node of the cluster a unique id (0, 1, and 2). We are also going to use this value as the cluster member id.

public class BasicAuctionClusteredServiceNode
public static void main(final String[] args)
{
    final int nodeId = parseInt(System.getProperty("aeron.cluster.tutorial.nodeId"));               // (1)
    final String[] hostnames = System.getProperty(
        "aeron.cluster.tutorial.hostnames", "localhost,localhost,localhost").split(",");            // (2)
    final String hostname = hostnames[nodeId];

    final File baseDir = new File(System.getProperty("user.dir"), "node" + nodeId);                 // (3)
    final String aeronDirName = CommonContext.getAeronDirectoryName() + "-" + nodeId + "-driver";

    final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();                              // (4)
  1. Pass in the node id as a command line parameter to the service so that we can reuse the code for each instance of the service.

  2. Define a set of hostnames for the cluster. For now everything is running on localhost, but we could put actual hostnames or IP addresses in this list and run the same example across multiple servers.

  3. For each node we need a location on disk that will hold the persistent data. This will include the raft log, mark files for each service component, and the recording log used to track snapshots within the log. In a production system this location is likely to be fairly important as you will want to map this to a fast disk in order to get the best performance out the system.

  4. Create a shutdown barrier that will be used to trap exit signals and allow the service to exit cleanly.

4.2. Configuration

4.2.1. Media Driver

final MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
    .aeronDirectoryName(aeronDirName)
    .threadingMode(ThreadingMode.SHARED)
    .termBufferSparseFile(true)
    .multicastFlowControlSupplier(new MinMulticastFlowControlSupplier())
    .terminationHook(barrier::signal)
    .errorHandler(BasicAuctionClusteredServiceNode.errorHandler("Media Driver"));

Note we’ve specified the custom aeronDirName to allow multiple Media Drivers on the same host. There is nothing special in the configuration in the Media Driver specific to Cluster.

4.2.2. Archive

final Archive.Context archiveContext = new Archive.Context()
    .aeronDirectoryName(aeronDirName)
    .archiveDir(new File(baseDir, "archive"))
    .controlChannel(udpChannel(nodeId, hostname, ARCHIVE_CONTROL_PORT_OFFSET))
    .archiveClientContext(replicationArchiveContext)
    .localControlChannel("aeron:ipc?term-length=64k")
    .recordingEventsEnabled(false)
    .threadingMode(ArchiveThreadingMode.SHARED)
    .replicationChannel("aeron:udp?endpoint=" + hostname + ":0");

Again nothing special in the Archive configuration specific to Cluster. We’ve used the same aeronDirName as used by the Media Driver. The controlChannel uses node specific port in the construction of its UDP channel.

4.2.3. Archive Client

final AeronArchive.Context aeronArchiveContext = new AeronArchive.Context()
    .lock(NoOpLock.INSTANCE)
    .controlRequestChannel(archiveContext.localControlChannel())
    .controlResponseChannel(archiveContext.localControlChannel())
    .aeronDirectoryName(aeronDirName);

Because Cluster’s Consensus Module requires that we write a log file to support the Raft protocol, we need a client for the constructed Archive for it to use. As the Consensus Module should always be deployed on the same node as its Archive, we’re going to use the local (IPC) configuration. It is also worth noting that we haven’t changed the controlRequestStreamId in our setup. The defaults in this situation are the most appropriate (from Aeron version 1.33.0 onwards).

4.2.4. Consensus Module

final ConsensusModule.Context consensusModuleContext = new ConsensusModule.Context()
    .errorHandler(errorHandler("Consensus Module"))
    .clusterMemberId(nodeId)                                                                     // (1)
    .clusterMembers(clusterMembers(Arrays.asList(hostnames)))                                    // (2)
    .clusterDir(new File(baseDir, "cluster"))                                                    // (3)
    .ingressChannel("aeron:udp?term-length=64k")                                                 // (4)
    .replicationChannel(logReplicationChannel(hostname))                                         // (5)
    .archiveContext(aeronArchiveContext.clone());                                                // (6)

This is first of the two configuration sections that are specific to Cluster. The Consensus Module is responsible for handling the main aspects of the Raft protocol, e.g. leader election, ensuring consensus-based consistency of the data in the log and passing properly replicated messages onto the Cluster Container.

  1. Each Consensus Module needs an identifier within the Cluster, we are going to use the nodeId that we have used to separate each of the nodes.

  2. This is probably the trickiest part of the configuration. It specifies all the static members of the cluster along with all the endpoints that they require for the various operations of the Consensus Module. These are encoded into a single string of the form.

    0,ingress:port,consensus:port,log:port,catchup:port,archive:port|\
    1,ingress:port,consensus:port,log:port,catchup:port,archive:port|...

    Where each of the leading numeric values is a member id for the cluster (as specified in the clusterMemberId method). The values for each endpoint represent:

    • ingress, where the client will connect to for the ingress channel.

    • consensus, where other members of the cluster communicate with each other to achieve cluster consensus.

    • log, used to replicate logs from leader to followers.

    • catchup, used for a stream that members can use to catch up to a leader when behind the current log position.

    • archive, the same endpoint used to control the Archive running on this node.

      In our example we’ve used the same hostname for each of the endpoints (localhost), however each endpoint allows the specification of a host, so that traffic could potentially be separated if required, e.g. running consensus traffic on a separate network to the ingress traffic.

  3. Specify the data directory for the Consensus Module. Make sure it is node specific.

  4. Define the ingress channel for the cluster. Note this value does not need to be the full channel URI. It can be a template that specifies the parameters, but excludes the endpoint, which will be filled using value from the clusterMembersString as appropriate for this node.

  5. Specify the replication channel. This channel is the one that the local archive for a node will receive replication responses from other archives when the log or snapshot replication step is required. This was added in version 1.33.0 and is a required parameter. It is important in a production environment that this channel’s endpoint is not set to localhost, but instead a hostname/ip address that is reachable by the other nodes in the cluster.

  6. Clone the Archive Client context that the Consensus Module will use to talk to the Archive.

4.2.5. Clustered Service Container

The final part of the configuration is the component to run our application logic. It is possible to have multiple Clustered Service Containers per Consensus Module and have them talk to each other using IPC. In our simplified case we just have the one.

final ClusteredServiceContainer.Context clusteredServiceContext =
    new ClusteredServiceContainer.Context()
    .aeronDirectoryName(aeronDirName)                                                            // (1)
    .archiveContext(aeronArchiveContext.clone())                                                 // (2)
    .clusterDir(new File(baseDir, "cluster"))
    .clusteredService(new BasicAuctionClusteredService())                                        // (3)
    .errorHandler(errorHandler("Clustered Service"));
  1. Again we use the node specific Media Driver.

  2. Plus the node’s Archive, via the Archive Client configuration.

  3. This is the point where we bind an instance of our application logic to the cluster.

4.3. Running the Cluster

Now we have a configured cluster, we can start it running. The code for launching the service is as follows.

try (
    ClusteredMediaDriver clusteredMediaDriver = ClusteredMediaDriver.launch(
        mediaDriverContext, archiveContext, consensusModuleContext);                             // (1)
    ClusteredServiceContainer container = ClusteredServiceContainer.launch(
        clusteredServiceContext))                                                                // (2)
{
    System.out.println("[" + nodeId + "] Started Cluster Node on " + hostname + "...");
    barrier.await();                                                                             // (3)
    System.out.println("[" + nodeId + "] Exiting");
}
  1. Launches a ClusteredMediaDriver that includes instances of the Media Driver, Archive and Consensus Module.

  2. Immediately afterward we launch a ClusteredServiceContainer which is our application.

  3. Use the shutdown barrier to await a signal to shut down (e.g. using SIG_TERM or SIG_INT on Unix).

There is a script provide to launch the cluster. Assuming you’ve followed the Using the Source From this Tutorial step above you should be able to do the following:

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-cluster

This will spit out a lot of logging but should end with something like:

[4452.593894535] CLUSTER: STATE_CHANGE [22/22]: memberId=0 INIT -> ACTIVE
[4452.594377676] CLUSTER: ELECTION_STATE_CHANGE [23/23]: memberId=0 INIT -> CANVASS
[4452.599524774] CLUSTER: CANVASS_POSITION [28/28]: logLeadershipTermId=-1 leadershipTermId=-1 logPosition=0 followerMemberId=1
[4452.60341655] CLUSTER: CANVASS_POSITION [28/28]: logLeadershipTermId=-1 leadershipTermId=-1 logPosition=0 followerMemberId=2
[4452.603442309] CLUSTER: ELECTION_STATE_CHANGE [27/27]: memberId=0 CANVASS -> NOMINATE
[4452.612230425] CLUSTER: ELECTION_STATE_CHANGE [36/36]: memberId=0 NOMINATE -> CANDIDATE_BALLOT
[4452.612255072] CLUSTER: ROLE_CHANGE [29/29]: memberId=0 FOLLOWER -> CANDIDATE
[4452.6157387] CLUSTER: ELECTION_STATE_CHANGE [50/50]: memberId=0 CANDIDATE_BALLOT -> LEADER_LOG_REPLICATION
[4452.627815713] CLUSTER: ELECTION_STATE_CHANGE [47/47]: memberId=0 LEADER_LOG_REPLICATION -> LEADER_REPLAY
[4452.627835581] CLUSTER: ELECTION_STATE_CHANGE [36/36]: memberId=0 LEADER_REPLAY -> LEADER_INIT
[4452.627844468] CLUSTER: ROLE_CHANGE [27/27]: memberId=0 CANDIDATE -> LEADER
[4452.653330292] CLUSTER: ELECTION_STATE_CHANGE [35/35]: memberId=0 LEADER_INIT -> LEADER_READY
[4452.759977752] CLUSTER: ELECTION_STATE_CHANGE [30/30]: memberId=0 LEADER_READY -> CLOSED

We now have our new service running in a cluster.

You can shutdown the nodes using the following:

> pkill -f BasicAuctionClusteredServiceNode

You should also be able to see the stored data for each of the nodes in the working directory.

> ls
basic-auction-client  basic-auction-cluster  logs  node0  node1  node2  script-common

Let’s look briefly how the script launches the code.

function startNode() {
    ${JAVA_HOME}/bin/java \
        -cp ../../../aeron-all/build/libs/aeron-all-${VERSION}.jar \
        -javaagent:../../../aeron-agent/build/libs/aeron-agent-${VERSION}.jar \
        -XX:+UseBiasedLocking \
        -XX:BiasedLockingStartupDelay=0 \
        -XX:+UnlockExperimentalVMOptions \
        -XX:+TrustFinalNonStaticFields \
        -XX:+UnlockDiagnosticVMOptions \
        -XX:GuaranteedSafepointInterval=300000 \
        -XX:+UseParallelGC \
        -Daeron.event.cluster.log=all \
        -Daeron.event.cluster.log.disable=CANVASS_POSITION,APPEND_POSITION,COMMIT_POSITION \
        -Daeron.cluster.tutorial.nodeId=${1} \
        ${JVM_OPTS} ${ADD_OPENS} \
        io.aeron.samples.cluster.tutorial.BasicAuctionClusteredServiceNode > logs/cluster-${1}.log &
}

function startAll() {
  startNode 0
  startNode 1
  startNode 2

  tail -n 100 -F logs/cluster-*.log
}

The -javaagent:../../../aeron-agent/build/libs/aeron-agent-${VERSION}-all.jar allows the weaving of the Aeron’s logging agent into the running code. Specifying -Daeron.event.cluster.log=all tells the agent to log all the events relating to the cluster operation.

5. Using a Cluster Client

While we now have our cluster up and running, we can’t do anything with it until we have a client that will interact with the cluster.

5.1. Connecting to a Cluster

First off, we need a way to connect our client to the cluster. All the messaging between the client and the cluster happens via Aeron, but we have an additional layer called AeronCluster that handles connections to the independent nodes and provides methods for offering messages into the cluster and polling for responses.

public class BasicAuctionClusterClient implements EgressListener
try (
    MediaDriver mediaDriver = MediaDriver.launchEmbedded(new MediaDriver.Context()                      // (1)
        .threadingMode(ThreadingMode.SHARED)
        .dirDeleteOnStart(true)
        .dirDeleteOnShutdown(true));
    AeronCluster aeronCluster = AeronCluster.connect(
        new AeronCluster.Context()
        .egressListener(client)                                                                         // (2)
        .egressChannel("aeron:udp?endpoint=localhost:0")                                                // (3)
        .aeronDirectoryName(mediaDriver.aeronDirectoryName())
        .ingressChannel("aeron:udp")                                                                    // (4)
        .ingressEndpoints(ingressEndpoints)))                                                           // (5)
{
  1. Launch a media driver that will allow communication with the cluster nodes. We’re using an embedded instance here to make it easier to launch multiple drivers on the same machine for use in this tutorial, but it would be perfectly viable to launch a single media driver and have multiple clients on the same host sharing the same media driver. We’re also avoiding using the media drivers that each of the cluster nodes are using.

  2. This is where we bind our client application code to the responses from the cluster. This requires implementing the EgressListener interface, which provides callbacks for session messages, session related events (e.g. errors), and cluster events (e.g. newly elected leaders).

  3. Specify the channel to receive egress responses back from the cluster. In our case we are using a UDP unicast response, which will limit responses to the session that sent the ingress message. We are using port 0 to allow for multiple clients on the same machine with multiple media drivers and to avoid having to specify an actual port per cluster client.

    It is possible to specify a multicast address here if you wanted all clients to see all responses. Some systems may find this useful, e.g. an exchange with multiple gateways may wish to have the execution for a trade sent to a client that was on the passive side of a trade.

  4. Specify the ingress channel for the cluster. In our case as we are using multiple destinations we need to use the template style approach that was used when setting up the Consensus Module. We just identify that access to the cluster is via UDP.

  5. Identify the actual static endpoints for the cluster. Will be merged into the configuration specified for the ingress channel.

Note
This example is using UDP unicast for the ingress and egress channels. It is possible to use multicast, but you will need to be in an environment where multicast is supported. In cloud deployments UDP multicast is typically not supported or in very early stages, so unicast is currently recommended in those environments.

5.2. Publishing to the Cluster

Next we are going to push messages into the cluster itself. So that the client is able to fail over to different nodes, as nodes come and go, in the cluster the Aeron publications are contained in a class called AeronCluster. This is the second of the two items that are connected at start up. This class provides an API very similar to Publication.

private long sendBid(final AeronCluster aeronCluster, final long price)
{
    final long correlationId = nextCorrelationId++;
    actionBidBuffer.putLong(CORRELATION_ID_OFFSET, correlationId);            // (1)
    actionBidBuffer.putLong(CUSTOMER_ID_OFFSET, customerId);
    actionBidBuffer.putLong(PRICE_OFFSET, price);

    idleStrategy.reset();
    while (aeronCluster.offer(actionBidBuffer, 0, BID_MESSAGE_LENGTH) < 0)    // (2)
    {
        idleStrategy.idle(aeronCluster.pollEgress());                         // (3)
    }

    return correlationId;
}
  1. In much the same way that we would send messages to a Publication we put the data that we want to send into an Agrona DirectBuffer. The AeronCluster contains offer methods that behave in the same way as the Publication offer methods.

  2. Publish the data in the same way we would with a Publication. We must check the return value to ensure that the message has been sent because back-pressure could happen at any time.

  3. If we fail to send, and need to run an idle loop, we should utilise an IdleStrategy to ensure that we don’t inappropriately overuse the CPU. With cluster clients we should also be continually polling the egress stream to consume any messages that have been sent back from the cluster, including errors or session status messages.

5.3. Receiving Responses

As was mentioned previously, our client must have a class which implements the EgressListener interface to consume messages that come back from the cluster. The responses can be application messages, session events (e.g. error messages), or notifications of a new leader. In this tutorial there is not a lot to do, therefore we simply print out the responses.

public void onMessage(
    final long clusterSessionId,
    final long timestamp,
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
{
    final long correlationId = buffer.getLong(offset + CORRELATION_ID_OFFSET);
    final long customerId = buffer.getLong(offset + CUSTOMER_ID_OFFSET);
    final long currentPrice = buffer.getLong(offset + PRICE_OFFSET);
    final boolean bidSucceed = 0 != buffer.getByte(offset + BID_SUCCEEDED_OFFSET);

    lastBidSeen = currentPrice;

    printOutput(
        "SessionMessage(" + clusterSessionId + ", " + correlationId + "," +
        customerId + ", " + currentPrice + ", " + bidSucceed + ")");
}

/**
 * {@inheritDoc}
 */
public void onSessionEvent(
    final long correlationId,
    final long clusterSessionId,
    final long leadershipTermId,
    final int leaderMemberId,
    final EventCode code,
    final String detail)
{
    printOutput(
        "SessionEvent(" + correlationId + ", " + leadershipTermId + ", " +
        leaderMemberId + ", " + code + ", " + detail + ")");
}

/**
 * {@inheritDoc}
 */
public void onNewLeader(
    final long clusterSessionId,
    final long leadershipTermId,
    final int leaderMemberId,
    final String ingressEndpoints)
{
    printOutput("NewLeader(" + clusterSessionId + ", " + leadershipTermId + ", " + leaderMemberId + ")");
}

5.4. Using the Cluster

Now we have all the pieces in place we can start to run the cluster and client together and see how the cluster behaves. You will need at least three terminal windows as we will leave the one we used to start the cluster open showing logging information for the cluster.

Firstly, start up the cluster.

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-cluster

Wait until you see a message that indicates one of the nodes is now leader.

[57240.190127954] CLUSTER: ELECTION_STATE_CHANGE [29/29]: memberId=1, LEADER_READY -> CLOSE

Now in a second terminal window, run a client that will place some bids into our cluster hosted auction. The 10 parameter is the customer id to use for the client, change this number if you want to run multiple clients.

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-client 10

We should see some output similar to the following:

Sent(8474093274723075706, 10, 105) bidsRemaining=9
SessionMessage(1, 8474093274723075706, 10, 105, true)
Sent(8474093274723075707, 10, 109) bidsRemaining=8
SessionMessage(1, 8474093274723075707, 10, 109, true)
Sent(8474093274723075708, 10, 110) bidsRemaining=7
SessionMessage(1, 8474093274723075708, 10, 110,true)
Sent(8474093274723075709, 10, 119) bidsRemaining=6
SessionMessage(1, 8474093274723075709, 10, 119, true)
Sent(8474093274723075710, 10, 122) bidsRemaining=5
SessionMessage(1, 8474093274723075710, 10, 122, true)
Sent(8474093274723075711, 10, 127) bidsRemaining=4
SessionMessage(1, 8474093274723075711, 10, 127, true)
Sent(8474093274723075712, 10, 129) bidsRemaining=3
SessionMessage(1, 8474093274723075712, 10, 129, true)
Sent(8474093274723075713, 10, 135) bidsRemaining=2
SessionMessage(1, 8474093274723075713, 10, 135, true)
Sent(8474093274723075714, 10, 136) bidsRemaining=1
SessionMessage(1, 8474093274723075714, 10, 136,true)
Sent(8474093274723075715, 10, 142) bidsRemaining=0
SessionMessage(1, 8474093274723075715, 10, 142, true)

5.5. Failed Nodes and Leader Election

Now the cluster is up and running, and we are able to publish messages and subscribe to responses and events, we can look at some additional tools available for Cluster and experiment with the fault tolerance functionality.

First thing we are interested in is which of the three node is the leader, with this information we can try shutdown the leader and see if our client is able to continue.

> java -cp <AERON_HOME>/aeron-all/build/libs/aeron-all-1.45.0.jar \
  io.aeron.cluster.ClusterTool node0/cluster list-members

This will show a list of all the members in the cluster, similar to the following:

currentTimeNs=1579564309666000000, leaderMemberId=0, memberId=0, activeMembers=[ClusterMember{isBallotSent=false,
isLeader=false, hasRequestedJoin=false, id=0, leadershipTermId=5, logPosition=11872, candidateTermId=-1,
catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1, timeOfLastAppendPositionNs=1579564309666000000,
ingressEndpoint='localhost:9003', consensusEndpoint='localhost:9004', logEndpoint='localhost:9005',
catchupEndpoint='localhost:9006', archiveEndpoint='localhost:9001',
endpoints='localhost:9003,localhost:9004,localhost:9005,localhost:9006,localhost:9001', publication=null,
vote=null}, ClusterMember{isBallotSent=false, isLeader=false, hasRequestedJoin=false, id=1, leadershipTermId=5,
logPosition=11872, candidateTermId=-1, catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1,
timeOfLastAppendPositionNs=1579564309495000000, ingressEndpoint='localhost:9103',
consensusEndpoint='localhost:9104', logEndpoint='localhost:9105', catchupEndpoint='localhost:9106',
archiveEndpoint='localhost:9101',
endpoints='localhost:9103,localhost:9104,localhost:9105,localhost:9106,localhost:9101', publication=null,
vote=null}, ClusterMember{isBallotSent=false, isLeader=false, hasRequestedJoin=false, id=2, leadershipTermId=5,
logPosition=11872, candidateTermId=-1, catchupReplaySessionId=-1, correlationId=-1, removalPosition=-1,
timeOfLastAppendPositionNs=1579564309552000000, ingressEndpoint='localhost:9203',
consensusEndpoint='localhost:9204', logEndpoint='localhost:9205', catchupEndpoint='localhost:9206',
archiveEndpoint='localhost:9201',
endpoints='localhost:9203,localhost:9204,localhost:9205,localhost:9206,localhost:9201', publication=null,
vote=null}], passiveMembers=[]

The part we are interested in is the leaderMemberId, this allows us to identify the leader in order to shut it down. We can discover the pid for that member using the following command:

> java -cp <AERON_HOME>/aeron-all/build/libs/aeron-all-1.45.0.jar \
  io.aeron.cluster.ClusterTool node<leaderMemberId>/cluster pid
15060

Note the second argument to the ClusterTool is the directory where the Consensus Module’s stores its data, and we have substituted the leaderMemberId to locate the correct directory for the leader.

This will have printed out a simple number which is the pid of the leader process.

Next start the client again, but this time we will let it run a bit longer and attempt to trigger a leader election within the cluster while it is running.

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-client 10 1000 3000

Once the client is running, switch to a different terminal and kill the leader process using the pid from the earlier step.

> kill <pid>

The system will stall for a short while until the followers timeout after not receiving a heartbeat from the leader before deciding to elect a new one. The logs for the cluster will contain something similar to:

==> logs/cluster-0.log <==
[0] Exiting
Consensus Module
io.aeron.cluster.client.ClusterException: heartbeat timeout from leader
        at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884)
        at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288)
        at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283)
        at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
        at java.base/java.lang.Thread.run(Thread.java:834)
Consensus Module
io.aeron.cluster.client.ClusterException: heartbeat timeout from leader
        at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884)
        at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288)
        at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283)
        at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
        at java.base/java.lang.Thread.run(Thread.java:834)
...
[65188.006731058] CLUSTER: ELECTION_STATE_CHANGE [42/42]: memberId=1, FOLLOWER_BALLOT -> FOLLOWER_REPLAY
[65188.006791979] CLUSTER: ELECTION_STATE_CHANGE [54/54]: memberId=1, FOLLOWER_REPLAY -> FOLLOWER_CATCHUP_TRANSITION
[65188.009951316] CLUSTER: ELECTION_STATE_CHANGE [55/55]: memberId=1, FOLLOWER_CATCHUP_TRANSITION -> FOLLOWER_CATCHUP
[65188.028244817] CLUSTER: ELECTION_STATE_CHANGE [47/47]: memberId=1, FOLLOWER_CATCHUP -> FOLLOWER_TRANSITION
[65188.028355173] CLUSTER: ELECTION_STATE_CHANGE [45/45]: memberId=1, FOLLOWER_TRANSITION -> FOLLOWER_READY
[65188.02995347] CLUSTER: ELECTION_STATE_CHANGE [31/31]: memberId=1, FOLLOWER_READY -> CLOSE

==> logs/cluster-2.log <==
[65188.033401987] CLUSTER: ELECTION_STATE_CHANGE [29/29]: memberId=2, LEADER_READY -> CLOSE

Showing that a new leader has been elected to lead the cluster. At this point the cluster will not be able to tolerate another failure and continue.

The client should automatically resume sending messages into the cluster and receiving responses. There may, or may not, be an exception logged by the client depending on timing.

Sent(-7819190107498904370, 10, 12969) bidsRemaining=980
SessionMessage(20, -7819190107498904370, 10, 12969, true)
Sent(-7819190107498904369, 10, 12969) bidsRemaining=979
Sent(-7819190107498904368, 10, 12970) bidsRemaining=978
Sent(-7819190107498904367, 10, 12978) bidsRemaining=977
Sent(-7819190107498904366, 10, 12975) bidsRemaining=976
Sent(-7819190107498904365, 10, 12971) bidsRemaining=975
Consensus Module
io.aeron.cluster.client.ClusterException: heartbeat timeout from leader
        at io.aeron.cluster.ConsensusModuleAgent.slowTickWork(ConsensusModuleAgent.java:1884)
        at io.aeron.cluster.ConsensusModuleAgent.doWork(ConsensusModuleAgent.java:288)
        at org.agrona.concurrent.AgentRunner.doDutyCycle(AgentRunner.java:283)
        at org.agrona.concurrent.AgentRunner.run(AgentRunner.java:164)
        at java.lang.Thread.run(Thread.java:748)
NewLeader(20, 7, 1)
Sent(-7819190107498904364, 10, 12977) bidsRemaining=974
Sent(-7819190107498904363, 10, 12974) bidsRemaining=973
SessionMessage(20, -7819190107498904364, 10, 12977, true)
SessionMessage(20, -7819190107498904363, 10, 12977, false)
Important
If you look carefully at the correlationId values you will notice there are no SessionMessage log entries for a number of the messages that were sent. This is because when the leader fails some messages sent into the cluster can be lost. For this reason it is important for the client to track responses for its messages and provide timeouts to the caller if required.

We can restore the failed node back into service:

> cd <AERON_HOME>/aeron-samples/scripts/cluster
> ./basic-auction-cluster <leaderMemberId>

Where the leaderMemberId is the id of the member that was killed (not the new leader of the cluster). The cluster logs will show the member rejoining the cluster:

[0] Started Cluster Node on localhost...
[66250.672741109] CLUSTER: STATE_CHANGE [22/22]: memberId=0, INIT -> ACTIVE
[66250.687237677] CLUSTER: ELECTION_STATE_CHANGE [23/23]: memberId=0, INIT -> CANVASS
[66250.689468615] CLUSTER: NEW_LEADERSHIP_TERM [40/40]: logLeadershipTermId=6, leadershipTermId=6, logPosition=19424, timestamp=1579566538529, leaderMemberId=2, logSessionId=-1889384498
[66250.689493099] CLUSTER: ELECTION_STATE_CHANGE [34/34]: memberId=0, CANVASS -> FOLLOWER_REPLAY
onSessionOpen(ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false})
attemptBid(this=Auction{bestPrice=0, currentWinningCustomerId=-1}, price=123,customerId=132)
onSessionClose(ClientSession{id=1, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false})
onSessionOpen(ClientSession{id=2, responseStreamId=102, responseChannel='aeron:udp?endpoint=localhost:19132', encodedPrincipal=[], clusteredServiceAgent=io.aeron.cluster.service.ClusteredServiceAgent@19a05123, responsePublication=null, isClosing=false})

You can see that the new node looks like it is re-executing application messages. That is because it is. It is replaying the log of the messages that have been sent into the cluster up to this point in order to restore its in-memory state to what it was before the failure.

5.6. Snapshots

Earlier in the tutorial we looked at how a node can store a snapshot of its current state so that it would not need to replay every single application message since the beginning of time. We can look at the recording log for a node to see the logs, and snapshots, that are stored for a node in the cluster. We can also use the tool to trigger a snapshot.

Firstly lets look at the recording log.

> java -cp <AERON_HOME>/aeron-all/build/libs/aeron-all-1.45.0.jar \
  io.aeron.cluster.ClusterTool node0/cluster recording-log

Which should show a recording log with a single entry:

RecordingLog{entries=[Entry{recordingId=0, leadershipTermId=0, termBaseLogPosition=0, logPosition=-1, timestamp=1579569542227, serviceId=-1, type=0, isValid=true, entryIndex=0}], cacheIndex={0=0}}

We can trigger a snapshot on the leader.

> java -cp <AERON_HOME>/aeron-all/build/libs/aeron-all-{aeronVersion}.jar \
  io.aeron.cluster.ClusterTool node2/cluster snapshot

The recording log will now have additional entries.

RecordingLog{entries=[Entry{recordingId=0, leadershipTermId=0, termBaseLogPosition=0, logPosition=-1, timestamp=1579569542227, serviceId=-1, type=0, isValid=true, entryIndex=0}, Entry{recordingId=1, leadershipTermId=0, termBaseLogPosition=0, logPosition=1376, timestamp=1579570195606, serviceId=0, type=1, isValid=true, entryIndex=1}, Entry{recordingId=2, leadershipTermId=0, termBaseLogPosition=0, logPosition=1376, timestamp=1579570195606, serviceId=-1, type=1, isValid=true, entryIndex=2}], cacheIndex={0=0}}

There are two more entries. One for the snapshot just taken and a new term entry to indicate the point within the log the service should start recovering from where it to use the new snapshot on re-start.

6. Exercises

Here are some exercises that you can try out.

  1. Have multiple customers competing on the same auction.

  2. Use multicast for a generic egress channel (common to all clients) so that customers can see others prices to make informed bids.

  3. Implement a second message type that will allow customers to query for the current bid price of the auction.


1. Where stopping could include crashing or manually shutting down
Clone this wiki locally