Nov 19, 2018

Hibernate Acceleration by Snapshots

A Google search for “Hibernate and performance” will yield innumerable articles describing how to fix performance issues. This post is not yet another performance improver. Instead we will demonstrate how to remove bottlenecks in your Hibernate project by using JPA support in Spring boot in tandem with in-JVM-memory snapshots which provide speedups of orders of magnitude. Let us use the Sakila sample database for the purpose of this post. The database contains, among other things, Films and Actors and relations between them.

A Straightforward Hibernate and Spring Application

A non-complicated way of interacting with a relational database is to use the Spring JPA plugin to allow Spring to handle dependency injection and setup of the project. This allows for a pure java implementation without any XML code to set up the ORM. For example, a properly annotated plain Film class is all that is needed to map the database table of films into the Java object domain.

@Entity
public class Film {
   @Id
   @Column(name="film_id")
   private int id;
   private String title;
   private String description;
   private int releaseYear;

   // ... more fields, getter and setters et c
}

The perhaps most straightforward way of retrieving such entities is by means of a Repository


@Repository
public interface FilmRepository extends CrudRepository {
}
which allows us to write an application with minimal boilerplate that operates on Films with code such as the following.

@SpringBootApplication
public class HibernateSpeedmentApplication implements CommandLineRunner {

   public static void main(String[] args) {
       SpringApplication.run(HibernateSpeedmentApplication.class, args);
   }

  @Autowired
  FilmRepository filmRepository;

  // ... code using the filmRepository
}

Here Spring helps us inject the filmRepository which can then be used as follows where we stream over all films and sum the film lengths.


public Long getTotalLengthHibernate() {
   return StreamSupport.stream(filmRepository.findAll().spliterator(), false)
       .mapToLong(com.example.hibernatespeedment.data.Film::getLength)
       .sum();
}

Clearly, this is an inefficient way of summing all the film lengths, since it entails fetching all film entities to the JVM and then summing a single property. Since we just retrieve a single value and are not interested in updating any data we would be better off with a Data Transfer Object that only contains the film length. That would require us to write some code that SELECTs the length column server side in the database. When we realize that we want some of the logic of this operation to be moved to the database, it makes a lot of sense to compute the whole sum in the database instead of transferring the film lengths. We then arrive at the following piece of code.


public Long getTotalLengthHibernateQuery() {
   EntityManager em = entityManagerFactory.createEntityManager();
   Query query = em.createQuery("SELECT SUM(length) FROM Film");
   return (Long) query.getSingleResult();
}

Now the application logic contains an explicit work split between JVM and database including a query language construct that is more or less opaque to the compiler.

Remove a Bottleneck with Speedment

While the stream construct with which we started out in the section above was very inefficient, it has appeal in the way that it abstracts away the details of the database operations. The ORM Speedment has a Streams based API allowing stream operations to be efficient. The Speedment based application code is very similar to the Hibernate example with the exception that the Repository is replaced by a Manager and this manager provides streams of entities. Thus, the corresponding Speedment application code would be as follows.

@Autowired
FilmManager filmManager;

public Long getTotalLengthSpeedment() {
   return filmManager.stream()
       .mapToLong(Film.LENGTH.asLong())
       .sum();
}

There are several advantages of deciding on the SQL details at runtime rather than in the application code, including type safety and lower maintenance cost for a more concise business logic code base. The perhaps most prominent advantage of the clean abstraction from database operations, however, is that it allows the runtime to provide acceleration. As a matter of setup configuration and with no modification of any application logic, an optional plugin to the Speedment runtime allows partial snapshots of the database to be prefetched to an in-memory data store, providing several orders of magnitude application speedup without rewriting any part of the application logic.

For this particular example, the Query based Hibernate solution was approximately 5 times faster than the naive approach of streaming over the full set of entities. The Speedment powered solution returned a result 50 times faster than the Query based Hibernate solution. If you try it out, your mileage may vary depending on setup, but clearly the in-memory snapshot will invariably be orders of magnitude faster than round tripping the database with an explicit query which in turn will be significantly faster than fetching the full table to the JVM which happens in the naive implementation.

Coexistence - Using the Right Tool for the Job

While in-memory acceleration does deliver unparallelled speed it is no panacea. For some tables, a speedup of several orders of magnitude may not yield any noticeable effect on the overall application. For other tables, querying an in-memory snapshot of data may not be acceptable due to transactional dependencies. For example, operating on a snapshot may be perfect for the whole dataset in a business intelligence system, a dashboard of KPIs or a tool for exploring historical trade data. On the other hand, the resulting balance after a bank account deposit needs to be immediately visible to the online bank and thus serving the bank account from a snapshot would be a terrible idea.

In many real-world scenarios one would need a solution where some data is served from a snapshot while data from other tables are always fetched from the database. In such a common hybrid case, the code that directly fetches data from the database may use the same Speedment API as the snapshot querying code but for projects already using Hibernate it works perfectly well to combine Speedment and Hibernate.

Since both Hibernate and Speedment rely on JDBC under the hood, they will ultimately use the same driver and may therefore work in tandem in an application. Having a Hibernate powered application, a decision to move to Speedment for bottlenecks can therefore be local to the part of the application that will benefit the most. The rest of the Hibernate application will coexist with the code that leverages Speedment.

It is easy to try this out for yourself. The Sakila database is Open Source and can be downloaded here. Speedment is available as a free version, use the Initializer to download.




Note: For the Spring autowire of the Speedment FilmManager to work, we also need a configuration class which may look as follows.

@Configuration
public class Setup {
    @Bean
    public SakilaApplication createApplication() {
        SakilaApplication app = new SakilaApplicationBuilder()
            .withBundle(DataStoreBundle.class)
            .withUsername("sakila")
            .withPassword("sakila")
            .build();
        app.getOrThrow(DataStoreComponent.class).load();
        return app;
    }

    @Bean
    public FilmManager createFilmMananger(SakilaApplication app) {
        return app.getOrThrow(FilmManager.class);
    }
}

Oct 2, 2018

Sharding for Low Latency Clusters

Scalability of a cluster is often perceived as a matter of being able to scale out, but in practice the constraint of latency may be at least as important; it does not really help making a system future proof in terms of data size capacity if it turns out that handling an increasing amount of data will render the system intolerably slow. This post gives a high level idea of the concept of sharding. We will elaborate on the principle in general, how it relates to affinity and speed and then present some code examples of sharding usage for low latency.

Sharding as Divide and Conquer

There are many problems that can be solved by dividing the problem into smaller subproblems and then assembling the subresults. In a cluster environment where several nodes cooperate to deliver a service, such a divide and conquer strategy may turn a single intractable problem into several smaller and therefore solvable problems. If the size of the data is too large for any of the cluster nodes, the cluster may still be able to handle the service if the data can be partitioned between the nodes.

The strategy of partitioning the data in a way that it lends itself for local computations is often referred to as sharding. A simple example would be a directory lookup service. Let us say we design a clustered lookup service which will return the phone number of a person given her name. Let us assume that the size of the lookup table is more than what we can fit into a single node. This means that the nodes will need to cooperate somehow, and divide the lookup table data between them.

In this example, there keys to the lookup table are totally ordered. Then it is easy to devise threshold values by which we partition the keys. We may for example say that the first node holds all names that appear before Charlie in the alphabet and the second all names between Charlie and Delta and so on. The name is then what we call a shard key which we use to determine node given key. Having a strategy for how to find the node given a key, it is easy to devise an edge service of our cluster that given a node routes the request to the proper node.

In the simplistic example of phone directory lookup, the problem can always be solved at a single node and the reply from that node will constitute the reply from the cluster. In a more realistic scenario, the nodes involved may be any subset of the nodes, perhaps all of them, the response from the service is then some kind of accumulation of the subresults. Further, in this example the shard key is identical to the actual input data but in a more realistic scenario the shard key may be a subset of the input data. The sharding function from shard key to nodes is in general a hash function, or more theoretically a surjective function from shard key to node.

What we have gained in the example of the phone directory is that we have transformed a problem that needs a large amount of data to be solved into several smaller ones by exploiting locality properties of the problem domain. Clearly, such an efficient divide and conquer strategy does not exist if the data cannot be partitioned in any meaningful way.

However, even without the nice property of the total order of keys in this problem, we could divide the data by some other shard key and still deliver the service by just asking all nodes to help. If we for some reason have partitioned the person data by age of the person, that partitioning will be of little help in directing the search by name. Without the help from an index, the service may still be able to deliver a result be sending the request to all nodes and then assembling the subresults. While this helps addressing the problem of the size of the data in each node, it is much less efficient than a shard key that helps narrowing down the search.

Speed by Affinity

Sharding helps combating overwhelming sizes of data, but it may also be a major factor in speeding up computations. Not only by narrowing down the search as demonstrated in the previous paragraph but also because reducing the amount of data needed will allow for lower latency. When dealing with low latency computation, affinity becomes paramount.

The time needed for a system to compute an answer is a function of many factors. Clearly, the design of the algorithm, the size of the data and the computing power of the computing engine are major factors. On a low level of abstraction, the computation speed is limited by the instruction set available to the processor and its operating clock frequency, i.e., the kind of operations it is able to perform and the rate at which those operations can be executed. A computer that can perform 10 complex multiplications per second is faster than a computer able to only to simple additions at the same or lower rate. (Yes, with those numbers we envision the original kind of computer; a human person performing arithmetic.)

The sheer computing power of the processor is however mostly not the only bottleneck. To consider the processor speed as a major limiting factor of the computation time one has to assume that data is readily available when needed, and in real-world situations that is often far from the truth. No matter how clever algorithm the designer of the system has devised, and no matter how fast the processors involved can process the data, the time needed to compute the result is bounded by the time needed to bring the input data to the processor.

In a computing setting, the term affinity is often used to refer to the action of associating a process to a particular processor, often called the kin processor. Having a kin processor, the process will benefit from having intermediate results and state readily available in the processor registers and caches when it gets scheduled to perform work. Ascending the level of abstraction from processors and caches, it also makes sense to use the term affinity for input data available to computing nodes in a cluster. In the following we use the term in this higher level of abstraction. A computing cluster where the nodes have all needed input data available in RAM at all times has the highest degree of affinity while a system where the input data needs to be fetched from remote nodes before computation can complete has a lower degree of affinity.

For a computing cluster with no data redundancy and with data randomly spread out over the nodes, the computing time as function of the number of nodes in the cluster will asymptotically behave as if no data is ever available where it is needed. When there are ten nodes, there is a chance of 1/10 that the needed data will be available, but this chance diminishes as the cluster grows. Since the time needed to transfer data between cluster nodes is several orders of magnitude larger than fetching it from local RAM, keeping the data close at hand is paramount to achieve really low latency.

The random distribution of data is a worst case example and there are many elaborate approaches to increasing the chances of having data available when needed in a computation cluster, but in some cases the problem domain per se actually allows proactive distribution of data that fits the usage pattern and then we may use sharding to achieve ultra low latency for really large data sizes.

The Two Major Dimensions of Cluster Scalability

Scalability of a cluster is often perceived as a matter of being able to scale out, i.e., adding more nodes to the cluster as the size of the data grows. While the ability to handle the sheer size of the problem clearly is a non-negotiable limitation of the system, it may not be the bottleneck. In a non-sharded system, the likelihood of having data available when needed asymptotically approaches zero as the size of the problem increases and this lack of affinity has an impact on computation speed.

All systems become obsolete at some point, and when that happens it may very well be because the system is unable to hold the increased set of data, but another common reason is that the system simply is too slow. Two major factors come into play; the increasing amount of data requiring more nodes with added data transfer latency as a result and also evolving requirements on the system - quite often new applications demand more complex computations. Wasting time on data transfer and being asked to do heavier work turns the once competent system into an obsolete object for a migration project.

We conclude that theoretically scalability is just one pice of the scalability puzzle. Latency as function of data size and problem complexity cannot be fully ignored in a real life scenario.

Sharding to Scale In-JVM-Memory Ultra Low Latency Computing

Focusing from general computation theory into the more specific domain of Java server side development, we use the term in-JVM-memory to refer to a computation strategy where each node has all its needed data directly available to the JVM without the need for fetching it, even from other local memory. Thus taking affinity to the extreme, this strategy allows for ultra-low latency. Clearly, this requires all the node data needed for the computation to fit in available memory and that clearly poses a limitation to the size of the problem that can be handled.

Since a low latency system using in-JVM-memory data needs all data to fit in the available memory of the node, sharding will be needed to address situations where the data is larger than what can fit in the memory of the node. Therefore, sharding support is a key feature of a scalable in-JVM-memory system.

Concluding this rather theoretical post about scalability of low latency clusters we give a few real examples of sharding support. We will use the Java stream ORM Speedment, which has an acceleration feature that relies on in-JVM-memory techniques.

Immutable Sharding

When the shard key values are known apriori, all sharding details can be given at startup time. Further described in the manual, the following example allows for creation of two sharded computation engine instances for shard keys A and B. Then, at other nodes, other shard keys can be used and in such a way the nodes of the cluster divide the data between them ensuring that the first node has all data needed for solving problems in the part of the space of data determined by shard keys A and B.

// Creates a builder from a shard key.
// In this example we are not considering the shard key for
// the builder itself.
Function builderMapper = shardKey ->
    new SpeedmentTestApplicationBuilder()
        .withPassword("speedment_test")
        .withBundle(InMemoryBundle.class);


// Creates a ShardedSpeedment object with two keys "A" and "B"
// The content of the different shards are controlled by the given stream decorator
ShardedSpeedment shardedSpeedemnt = ShardedSpeedment.builder(String.class)
    .withApplicationBuilder(builderMapper)
    .putStreamDecorator(CountryManager.IDENTIFIER, (shardKey, stream) -> 
        stream.filter(Country.NAME.startsWith(shardKey)))
    .putShardKey("A")
    .putShardKey("B")
    .build();

// Loads all the shards into memory
shardedSpeedemnt.load();

// Prints all countries in the "A" shard
shardedSpeedemnt
    .getOrThrow("A")
    .getOrThrow(CountryManager.class)
    .stream()
    .forEachOrdered(System.out::println);

// Prints all countries in the "B" shard
shardedSpeedemnt
    .getOrThrow("B")
    .getOrThrow(CountryManager.class)
    .stream()
    .forEachOrdered(System.out::println);

// Closes all the shards
shardedSpeedemnt.close();

The node that runs this code will only handle data related to countries starting with letters A and B. Other nodes will be handling other countries. Now, this partitioning of the data is quite static and while it sometimes suffices, a more general approach is needed if a more flexible sharding strategy is needed. With Mutable Sharding we allow the cluster shards to be added dynamically.

Mutable Sharding

When the set of shard keys is unknown at startup, mutable sharding can be used as follows. In contrast to the example of immutable sharding, we here allow for adding new keys to the sharding scheme during the life cycle of the application. More details are to be found in the manual.

// Creator that, when applied, will create a Speedment instance for
// a given shard key
final Function creator = shardKey -> {
    SpeedmentTestApplication app = TestUtil.createSpeedmentBuilder().build();
    app.getOrThrow(DataStoreComponent.class).reload(
        ForkJoinPool.commonPool(),
        StreamSupplierComponentDecorator.builder()
            .withStreamDecorator(CountryManager.IDENTIFIER, s -> s.filter(Country.NAME.startsWith(shardKey)))
            .build()
        );
    return app;
};

// Creates a MutableShardedSpeedment
MutableShardedSpeedment shardedSpeedemnt = MutableShardedSpeedment.create(String.class);

// Acquires a Speedment instance for the shard key "A"
// (if the shard is already created, returns the shard,
// if the shard is not created, creates and returns a new
// shard that will be reused for subsequent calls for the
// same shard key.
SpeedmentTestApplication aApp = shardedSpeedemnt.computeIfAbsent("A", creator);

SpeedmentTestApplication bApp = shardedSpeedemnt.computeIfAbsent("B", creator);

final CountryManager aCountryManager = aApp.getOrThrow(CountryManager.class);
final CountryManager bCountryManager = bApp.getOrThrow(CountryManager.class);

// Prints all countries in an "A" shard
aCountryManager.stream().forEach(System.out::println);

// Prints all countries in an "B" shard
bCountryManager.stream().forEach(System.out::println);

// Closes all the shards
shardedSpeedemnt.close();

To download and try Speedment out, there is a Speedment Initailizer, you can find here.

Sep 11, 2018

A Java Stream to UPDATE a Subset of Columns

The Stream ORM Speedment has received a nice feature in the latest release. While traditionally focused on accelerating database reads, Speedment has always also had database write functionality. In the latest version a much anticipated feature has been added - the ability to determine a subset of columns to update.

The Streams API Goes Both Ways

Persisting data in Speedment follows the same intuitive stream approach as other Speedment data oriented operations. Just as querying the database is expressed as a stream of operations on data items, the POJO entities received from the database may be persisted to the database by simply terminating the stream in a database persister.

Simple retrieval of data can be expressed as


Optional<Film> longFilm = films.stream()
    .filter(Film.LENGTH.greaterThan(120))
    .findAny();
which will find a POJO representing a row in the underlying database for which the supplied predicate holds true. In this case, the user will get a film longer than two hours, if any such film exists. For reading data, Speedment thus supplies a stream source that represents the database, in this case the films instance which runs code generated by Speedment. Analogously, Speedment defines a stream termination that handles data writing which can be used as follows.

Stream.of("Italiano", "EspaƱol")
        .map(ln -> new LanguageImpl().setName(ln))
        .forEach(languages.persister());
Here we create a stream of POJOs representing rows in the database that may not yet exist in the database. The languages instance on line 3 is a Manager just like films above and is implemented by code generated by Speedment. The persister() method returns a Consumer of POJOs that will persist the items to the database.

Notice the symmetry where Speedment has generated code handling reading and writing of database data as intuitive Stream operations.

Updating Data in a Single Stream

Since Speedment provides a consistent API of treating database operations as stream operations on POJOs, the reading and writing of data can be composed and combined freely. Thus, a POJO retrieved from a Speedment source is the very same kind of POJO that is needed for persistence. Therefore, it makes a lot of sense to use streams that have both source and termination defined by Speedment. If one for example would like to update some rows of a table of the database, this can be done in the following concise way.

  languages.stream()
        .filter(Language.NAME.equal("Deutsch"))
        .map(Language.NAME.setTo("German"))
        .forEach(languages.updater());
Almost self-explanatory, at least compared to the corresponding JDBC operations, the code above will find any Language named “Deutsch” in the database and rename it to “German”. The terminating operation here is the updater which in contrast to the persister modifies existing rows of the database.

Selecting the Fields to Update

The basic updater will update all relevant columns of the row in question, which makes sense in many cases. However, for the case above when updating a single column of the database this behaviour may be wasteful or even prone to errors.

Even if the code above intends to update only the name of the language, since the updater updates all columns of the row it will actually update the name to a new value and also reset all other columns to the values they had when the POJO was created from the database. If this code is the sole actor modifying the database this may be a minor problem, but in a concurrent environment it may create undesired race conditions where this innocent update of a single field may happen to undo changes to other columns.

In Speedment 3.1.6 and later, the user may select which fields to update and persist by supplying a description of the desired fields of the POJO. To improve on the last example, the following code will update only the name of the Language.


   Updater<Language> updater = languages.updater(FieldSet.of(Language.NAME)); 
   languages.stream()
        .filter(Language.NAME.equal("Deutsch"))
        .map(Language.NAME.setTo("German"))
        .forEach(updater);
There are elaborate ways to express the set of fields to update and the interested reader is encouraged to learn more from the Speedment User Guide and to get a free license and example project scaffolding at the initializer.