Oct 2, 2018

Sharding for Low Latency Clusters

Scalability of a cluster is often perceived as a matter of being able to scale out, but in practice the constraint of latency may be at least as important; it does not really help making a system future proof in terms of data size capacity if it turns out that handling an increasing amount of data will render the system intolerably slow. This post gives a high level idea of the concept of sharding. We will elaborate on the principle in general, how it relates to affinity and speed and then present some code examples of sharding usage for low latency.

Sharding as Divide and Conquer

There are many problems that can be solved by dividing the problem into smaller subproblems and then assembling the subresults. In a cluster environment where several nodes cooperate to deliver a service, such a divide and conquer strategy may turn a single intractable problem into several smaller and therefore solvable problems. If the size of the data is too large for any of the cluster nodes, the cluster may still be able to handle the service if the data can be partitioned between the nodes.

The strategy of partitioning the data in a way that it lends itself for local computations is often referred to as sharding. A simple example would be a directory lookup service. Let us say we design a clustered lookup service which will return the phone number of a person given her name. Let us assume that the size of the lookup table is more than what we can fit into a single node. This means that the nodes will need to cooperate somehow, and divide the lookup table data between them.

In this example, there keys to the lookup table are totally ordered. Then it is easy to devise threshold values by which we partition the keys. We may for example say that the first node holds all names that appear before Charlie in the alphabet and the second all names between Charlie and Delta and so on. The name is then what we call a shard key which we use to determine node given key. Having a strategy for how to find the node given a key, it is easy to devise an edge service of our cluster that given a node routes the request to the proper node.

In the simplistic example of phone directory lookup, the problem can always be solved at a single node and the reply from that node will constitute the reply from the cluster. In a more realistic scenario, the nodes involved may be any subset of the nodes, perhaps all of them, the response from the service is then some kind of accumulation of the subresults. Further, in this example the shard key is identical to the actual input data but in a more realistic scenario the shard key may be a subset of the input data. The sharding function from shard key to nodes is in general a hash function, or more theoretically a surjective function from shard key to node.

What we have gained in the example of the phone directory is that we have transformed a problem that needs a large amount of data to be solved into several smaller ones by exploiting locality properties of the problem domain. Clearly, such an efficient divide and conquer strategy does not exist if the data cannot be partitioned in any meaningful way.

However, even without the nice property of the total order of keys in this problem, we could divide the data by some other shard key and still deliver the service by just asking all nodes to help. If we for some reason have partitioned the person data by age of the person, that partitioning will be of little help in directing the search by name. Without the help from an index, the service may still be able to deliver a result be sending the request to all nodes and then assembling the subresults. While this helps addressing the problem of the size of the data in each node, it is much less efficient than a shard key that helps narrowing down the search.

Speed by Affinity

Sharding helps combating overwhelming sizes of data, but it may also be a major factor in speeding up computations. Not only by narrowing down the search as demonstrated in the previous paragraph but also because reducing the amount of data needed will allow for lower latency. When dealing with low latency computation, affinity becomes paramount.

The time needed for a system to compute an answer is a function of many factors. Clearly, the design of the algorithm, the size of the data and the computing power of the computing engine are major factors. On a low level of abstraction, the computation speed is limited by the instruction set available to the processor and its operating clock frequency, i.e., the kind of operations it is able to perform and the rate at which those operations can be executed. A computer that can perform 10 complex multiplications per second is faster than a computer able to only to simple additions at the same or lower rate. (Yes, with those numbers we envision the original kind of computer; a human person performing arithmetic.)

The sheer computing power of the processor is however mostly not the only bottleneck. To consider the processor speed as a major limiting factor of the computation time one has to assume that data is readily available when needed, and in real-world situations that is often far from the truth. No matter how clever algorithm the designer of the system has devised, and no matter how fast the processors involved can process the data, the time needed to compute the result is bounded by the time needed to bring the input data to the processor.

In a computing setting, the term affinity is often used to refer to the action of associating a process to a particular processor, often called the kin processor. Having a kin processor, the process will benefit from having intermediate results and state readily available in the processor registers and caches when it gets scheduled to perform work. Ascending the level of abstraction from processors and caches, it also makes sense to use the term affinity for input data available to computing nodes in a cluster. In the following we use the term in this higher level of abstraction. A computing cluster where the nodes have all needed input data available in RAM at all times has the highest degree of affinity while a system where the input data needs to be fetched from remote nodes before computation can complete has a lower degree of affinity.

For a computing cluster with no data redundancy and with data randomly spread out over the nodes, the computing time as function of the number of nodes in the cluster will asymptotically behave as if no data is ever available where it is needed. When there are ten nodes, there is a chance of 1/10 that the needed data will be available, but this chance diminishes as the cluster grows. Since the time needed to transfer data between cluster nodes is several orders of magnitude larger than fetching it from local RAM, keeping the data close at hand is paramount to achieve really low latency.

The random distribution of data is a worst case example and there are many elaborate approaches to increasing the chances of having data available when needed in a computation cluster, but in some cases the problem domain per se actually allows proactive distribution of data that fits the usage pattern and then we may use sharding to achieve ultra low latency for really large data sizes.

The Two Major Dimensions of Cluster Scalability

Scalability of a cluster is often perceived as a matter of being able to scale out, i.e., adding more nodes to the cluster as the size of the data grows. While the ability to handle the sheer size of the problem clearly is a non-negotiable limitation of the system, it may not be the bottleneck. In a non-sharded system, the likelihood of having data available when needed asymptotically approaches zero as the size of the problem increases and this lack of affinity has an impact on computation speed.

All systems become obsolete at some point, and when that happens it may very well be because the system is unable to hold the increased set of data, but another common reason is that the system simply is too slow. Two major factors come into play; the increasing amount of data requiring more nodes with added data transfer latency as a result and also evolving requirements on the system - quite often new applications demand more complex computations. Wasting time on data transfer and being asked to do heavier work turns the once competent system into an obsolete object for a migration project.

We conclude that theoretically scalability is just one pice of the scalability puzzle. Latency as function of data size and problem complexity cannot be fully ignored in a real life scenario.

Sharding to Scale In-JVM-Memory Ultra Low Latency Computing

Focusing from general computation theory into the more specific domain of Java server side development, we use the term in-JVM-memory to refer to a computation strategy where each node has all its needed data directly available to the JVM without the need for fetching it, even from other local memory. Thus taking affinity to the extreme, this strategy allows for ultra-low latency. Clearly, this requires all the node data needed for the computation to fit in available memory and that clearly poses a limitation to the size of the problem that can be handled.

Since a low latency system using in-JVM-memory data needs all data to fit in the available memory of the node, sharding will be needed to address situations where the data is larger than what can fit in the memory of the node. Therefore, sharding support is a key feature of a scalable in-JVM-memory system.

Concluding this rather theoretical post about scalability of low latency clusters we give a few real examples of sharding support. We will use the Java stream ORM Speedment, which has an acceleration feature that relies on in-JVM-memory techniques.

Immutable Sharding

When the shard key values are known apriori, all sharding details can be given at startup time. Further described in the manual, the following example allows for creation of two sharded computation engine instances for shard keys A and B. Then, at other nodes, other shard keys can be used and in such a way the nodes of the cluster divide the data between them ensuring that the first node has all data needed for solving problems in the part of the space of data determined by shard keys A and B.

// Creates a builder from a shard key.
// In this example we are not considering the shard key for
// the builder itself.
Function builderMapper = shardKey ->
    new SpeedmentTestApplicationBuilder()

// Creates a ShardedSpeedment object with two keys "A" and "B"
// The content of the different shards are controlled by the given stream decorator
ShardedSpeedment shardedSpeedemnt = ShardedSpeedment.builder(String.class)
    .putStreamDecorator(CountryManager.IDENTIFIER, (shardKey, stream) -> 

// Loads all the shards into memory

// Prints all countries in the "A" shard

// Prints all countries in the "B" shard

// Closes all the shards

The node that runs this code will only handle data related to countries starting with letters A and B. Other nodes will be handling other countries. Now, this partitioning of the data is quite static and while it sometimes suffices, a more general approach is needed if a more flexible sharding strategy is needed. With Mutable Sharding we allow the cluster shards to be added dynamically.

Mutable Sharding

When the set of shard keys is unknown at startup, mutable sharding can be used as follows. In contrast to the example of immutable sharding, we here allow for adding new keys to the sharding scheme during the life cycle of the application. More details are to be found in the manual.

// Creator that, when applied, will create a Speedment instance for
// a given shard key
final Function creator = shardKey -> {
    SpeedmentTestApplication app = TestUtil.createSpeedmentBuilder().build();
            .withStreamDecorator(CountryManager.IDENTIFIER, s -> s.filter(Country.NAME.startsWith(shardKey)))
    return app;

// Creates a MutableShardedSpeedment
MutableShardedSpeedment shardedSpeedemnt = MutableShardedSpeedment.create(String.class);

// Acquires a Speedment instance for the shard key "A"
// (if the shard is already created, returns the shard,
// if the shard is not created, creates and returns a new
// shard that will be reused for subsequent calls for the
// same shard key.
SpeedmentTestApplication aApp = shardedSpeedemnt.computeIfAbsent("A", creator);

SpeedmentTestApplication bApp = shardedSpeedemnt.computeIfAbsent("B", creator);

final CountryManager aCountryManager = aApp.getOrThrow(CountryManager.class);
final CountryManager bCountryManager = bApp.getOrThrow(CountryManager.class);

// Prints all countries in an "A" shard

// Prints all countries in an "B" shard

// Closes all the shards

To download and try Speedment out, there is a Speedment Initailizer, you can find here.

Sep 11, 2018

A Java Stream to UPDATE a Subset of Columns

The Stream ORM Speedment has received a nice feature in the latest release. While traditionally focused on accelerating database reads, Speedment has always also had database write functionality. In the latest version a much anticipated feature has been added - the ability to determine a subset of columns to update.

The Streams API Goes Both Ways

Persisting data in Speedment follows the same intuitive stream approach as other Speedment data oriented operations. Just as querying the database is expressed as a stream of operations on data items, the POJO entities received from the database may be persisted to the database by simply terminating the stream in a database persister.

Simple retrieval of data can be expressed as

Optional<Film> longFilm = films.stream()
which will find a POJO representing a row in the underlying database for which the supplied predicate holds true. In this case, the user will get a film longer than two hours, if any such film exists. For reading data, Speedment thus supplies a stream source that represents the database, in this case the films instance which runs code generated by Speedment. Analogously, Speedment defines a stream termination that handles data writing which can be used as follows.

Stream.of("Italiano", "EspaƱol")
        .map(ln -> new LanguageImpl().setName(ln))
Here we create a stream of POJOs representing rows in the database that may not yet exist in the database. The languages instance on line 3 is a Manager just like films above and is implemented by code generated by Speedment. The persister() method returns a Consumer of POJOs that will persist the items to the database.

Notice the symmetry where Speedment has generated code handling reading and writing of database data as intuitive Stream operations.

Updating Data in a Single Stream

Since Speedment provides a consistent API of treating database operations as stream operations on POJOs, the reading and writing of data can be composed and combined freely. Thus, a POJO retrieved from a Speedment source is the very same kind of POJO that is needed for persistence. Therefore, it makes a lot of sense to use streams that have both source and termination defined by Speedment. If one for example would like to update some rows of a table of the database, this can be done in the following concise way.

Almost self-explanatory, at least compared to the corresponding JDBC operations, the code above will find any Language named “Deutsch” in the database and rename it to “German”. The terminating operation here is the updater which in contrast to the persister modifies existing rows of the database.

Selecting the Fields to Update

The basic updater will update all relevant columns of the row in question, which makes sense in many cases. However, for the case above when updating a single column of the database this behaviour may be wasteful or even prone to errors.

Even if the code above intends to update only the name of the language, since the updater updates all columns of the row it will actually update the name to a new value and also reset all other columns to the values they had when the POJO was created from the database. If this code is the sole actor modifying the database this may be a minor problem, but in a concurrent environment it may create undesired race conditions where this innocent update of a single field may happen to undo changes to other columns.

In Speedment 3.1.6 and later, the user may select which fields to update and persist by supplying a description of the desired fields of the POJO. To improve on the last example, the following code will update only the name of the Language.

   Updater<Language> updater = languages.updater(FieldSet.of(Language.NAME)); 
There are elaborate ways to express the set of fields to update and the interested reader is encouraged to learn more from the Speedment User Guide and to get a free license and example project scaffolding at the initializer.