This is an introduction to UAM project – management server for Universal appliances (new TPv4, eDee and SafeQube) and the Vert.x framework on which it is developed.

There was a need for easy installation and upgrade of OS and applications running on an UA so we needed a system to manage UAs with as little user interaction as possible.
The main requirements are robustness (we will deal with unrealiable networks and possible power outages), small executables (there is limited storage on the SafeQube) and little disk access (we don’t want to wear out our SafeQube flashdisks).

UAThe system has 2 parts:

  • UA management server – server application which will be deployed next to CML and will contain APIs for system administration
  • UA management server proxy – application cluster which will be deployed on SafeQubes and will cache application binaries and configuration

Both applications are developed using Vert.x framework. UAM server uses also database as configuration storage and proxy applications use Hazelcast for clustering.

Vert.x is an application framework for programming servers (especially web servers). It is built on top of Netty so no application container like Tomcat or Jetty is necessary.

I will show several examples of working with Vert.x so you can try out writing your own applications.

Starting a web server is simple:

vertx.createHttpServer()
    .requestHandler(request -> {
        System.out.println("Received request from " + request.remoteAddress().host());
    })
    .listen(80);

Using this approach is fast, but you have to handle which business logic to call for which endpoint and it can get quite messy.
Fortunately, we can do better using Vert.x web module:

Router router = Router.router(vertx);
router.route("/hello").handler(context -> {
    String name = context.request().getParam("name");
    context.response().end("Hello " + name);
});

router.get("/ping").handler(context -> {
    context.response().end("Pong!");
});

vertx.createHttpServer()
     .requestHandler(router::accept)
     .listen(80);

This way, we can easily separate business logic for our endpoints.

Do you need request logging or security? You can add more routes for same endpoints. Routes are then processed in the order of addition to the router and can call next route in the chain or just end the response.

Router router = Router.router(vertx);
router.route().handler(CookieHandler.create());
router.get("/secured-resource").handler(context -> {
    if ("password".equals(context.getCookie("securityToken"))) {
        context.next();
    } else {
        context.response().setStatusCode(401).end("sorry, you are not authorized to see the secured stuff");
    }
});

router.get("/secured-resource").handler(context -> {
    context.response().end("top secret stuff");
});

Do you want more than just APIs? There are template engines for Handlebars, MVEL, Thymeleaf or Jade if you want to use their functionality.

router.get().handler(TemplateHandler.create(JadeTemplateEngine.create()));

Things get more interesting when you configure Vert.x instances to form a cluster. Currently, the only production ready cluster manager is built on top of Hazelcast, so we will use it.

VertxOptions options = new VertxOptions();
options.setClustered(true);
options.setClusterHost("192.168.1.2");

Vertx.clusteredVertx(options, result -> {
    if (result.succeeded()) {
        Vertx vertx = result.result();
        //initialize what we need
    } else {
        LOGGER.error("Failed to start vertx cluster", result.cause());
    }
});

As you can see, starting Vert.x cluster isn’t that complicated. Just be careful to set correct cluster host address or you may find out that the cluster doesn’t behave how you would expect.
And what functionality is available for clustered applications? There is clustered event bus, where you can use point-to-point or publish-subscribe messaging. Just register your consumers and send them messages to logical addresses without ever caring on which node the consumer resides.

vertx.eventBus().consumer("/echo", message -> {
    message.reply(message.body());
});

vertx.eventBus().send("/echo", "ping");

I hope this short introduction to Vert.x will make you curious to look at its documentation and start writing your own applications on top of it.

Opting for a distributed solution of a problem is mostly motivated by two goals: resilience to failures (via redundancy) and better response time. In this post we will focus on the former. More specifically, we will investigate the conditions necessary to maintain a consistent distributed key-value store.

CAP

From the theoretical side, any possible solution is limited by the so-called CAP theorem. Informally, the theorem states that in an asynchronous network no database can remain

  • Consistent and
  • Available when
  • Partitioning of the network is possible.

A realistic environment of a distributed system must assume that server failures are possible and thus that there is no reasonable fix bound on the time needed to exchange messages between two servers. And while some may find the formulation of the theorem ambiguous, its inevitable corollary is that a distributed database cannot guarantee consistency of replicas and availability of read/write operations at the same time.

Consensus

Leslie Lamport’s Paxos protocol demonstrates the feasibility of maintaining consistency whenever a majority of servers is functioning. The above theorem thus states that any distributed system using the protocol cannot be always available. But there are some highly-available solution apart of Paxos.

Etcd

Etcd is an always-consistent highly-available key-value data store. The consistency is maintained by a majority consensus (quorum). The quorum is established and maintained by the Raft algorithm, i.e. Raft maintains a consistent replicated log among servers, each of which executes the commands stored in the log on the key-value store. As any other consensus protocol, Raft requires that among n servers at most f=⌊(n-1)⁄2⌋ is faulty. If more than f servers become faulty Raft times out all write operations but the read operations may work if consistency is not required.

Maintaining consistency is expensive even if only the possibility of partitioning has to be appreciated. Rather than describing the algorithm itself it might be more illustrative to show performance results of running etcd in real application. The figure below depicts the transaction throughput in a 5-node cluster.

Etcd throughput in a 5-node cluster.

The script that measured the performance increased the load in three steps: first a 2500 transaction was sent every second (for 30 seconds), then 5000 for another 30 seconds and then 10000. The data in the plot then report how many transaction successfully finished each second.

Leader Election

Partitions of the cluster are handled by a coordinated replication, i.e. a leader is elected who is responsible for all communication (both within the cluster and with the outside world). In the case of such a partition that separates the leader from most of the nodes, a new leader is elected. The leader election starts whenever the communication between the leader and a follower is lost for more than a randomly chosen time period called election timeout. After the timeout, the follower begins the election by broadcasting the request for vote. Everyone votes for the server that send the request first and the new leader is the server with a majority of votes. Together with the transaction throughput, the performance of leader election are the most important metrics to measure for a consensus protocol.

Etcd leader election time.

The graphs above show the time to detect and replace a crashed leader. Each line represents 1,000 trials (except for 100 trials for “150–150 ms”) and corresponds to a particular choice of election timeouts; for example, “150–155 ms” means that election timeouts were chosen randomly and uniformly between 150 ms and 155 ms. The steps that appear on the graphs show when split votes occur (the cluster must wait for another election timeout before a leader can be elected).

Etcd leader election -- number of packets.

Finally, above is the distribution of the number of packets needed to elect a leader. The figure  shows the number of packets sent to elect a leader in the 150–200 ms case. The minimum number of packets is six, representing a single node dispatching requests to the other four nodes and receiving votes from the first two. The plot clusters around multiples of eight: traces under eight represent the first node timing out and winning the election, traces between 8 and 16 represent two nodes timing out before a leader is elected.

Problem Definition

Let us assume that our servers need to share certain configuration. The servers can be restarted, thus becoming temporarily unavailable, and the network can have arbitrarily long delays. The stored data should be available whenever at least one server is alive. Some data must be consistent but other data suffice in possible stale version.

Case 1: Consistency of Logs

Some servers store logs data as a single continuous string. It is required that the logging information is extracted from this string, started from the last extraction point up until the last log. Each log must be accepted for exactly once. We can thus generalise the problem to implementing the following function:


string getLastLog( Server s, string rawLogs )

  • the string rawLogs is a sequence of log information, i.e. <(p1t1, ..), .. , (pntn, ..)>
  • the function returns a suffix lastLogs of rawLogs such that lastLogs=<(pi, ti, ..), .. , (pn, tn, ..)>, where ti-1<ts and ti >= ts, and ts is the time-stamp of the last retrieval
  • the function mus be thread-safe in accepting each log at most once regardless the number of concurrent calls of the function

Case 2: Quorum-less access

It is required that the configuration data that need not be consistent can be retrieved and stored even when the quorum cannot be established.

  • The get operation returns a value that was either last written by another node (while under quorum), or by this node (possibly without quorum).
  • The set operation remember the the new value and stores is when the quorum is reestablished.

Candidate: Etcd

The Etcd has limits in solving both the above cases:

  1. The CAS operation is implemented as a set operation which specifies the expected previous value that returns either a Boolean value, an exception, or an error. Yet when exception/error is returned we have no guarantees regarding the value in the data store, e.g. the request may have timed out, but afterwards was accepted by Raft and subsequently reflected in the data store.
  2. Etcd does allow get to operate without quorum but only if the leader is working (any or indeed all other servers may fail). The set operation is not allowed without quorum.

Requirements for Sufficient Solution

Three interface function are required and while the concrete implementation can vary we specify it exactly to avoid confusion.

type Response = { bool valid, bool timeout, string value }

Response get( string key, bool consistent, bool blocking )

  • consistent=true the returned value was stored on a majority of servers at the time when this request was accepted by the consensus protocol
  • consistent=false the returned value is the most recent value stored on the server that first received this request (other servers may have different value)
  • blocking=true the function returns only after the desired data is retrieved
  • blocking=false the function may timeout

void set( string key, string value, bool consistent, bool blocking )

  • consistent=true the new value is accepted by a majority of servers and until another set for the same key is requested, the value will be returned by any consistent get
  • consistent=false the new value is accepted by at least the server that first received this request, until another set is requested at this server, the value will be returned; furthermore once quorum is established the consistent set guarantees will be assumed

Response swap( string key, string new_value, string old_value, bool blocking )

  • consistent by default
  • blocking=true the function returns only after the quorum is established, the function returns true if the old_value was stored under key at the time of accepting this request on a majority AND the new_value will be returned by any consistent get until the value is rewritten; most importantly no other value can be returned by get between old_value and new_value. If old_value was not stored under key, the function does not modify the data store and returns false.
  • blocking=false same guarantees as above but the function may timeout without modifying the data store.

Any distributed data-store conforming to the above requirements would be sufficient to solve the above problem. Yet the cost of consistency is high and only a few data-stores offer adequate protection from partitioning. Etcd seems the closest candidate but its functionality needs to be extended.