1
 
 
Account
In your account you can view the status of your application, save incomplete applications and view current news and events
wMetadata:title
January 18, 2019

From Database to Memory

What is the article about?

The OTTO marketplace grows by around 10,000 products every week - and the trend is rising. It is therefore a particular challenge to deal with such large volumes of data and to enrich all systems with the data in a timely manner and keep it up to date at all times.

We are one of around 20 teams currently operating around 15 micro-services around the otto.de online store. In order to keep up with the growth of the OTTO marketplace, it is important to be able to scale our infrastructure accordingly. This can be done by creating additional micro-services that take on new functionalities or by scaling horizontally. In both cases, the database is becoming increasingly problematic.

One problem is the distribution of data (see Fig. 1): Due to the independence of the micro-services, it is more difficult to keep the separate databases at the same level as the number of product data increases and prices or availability are updated more frequently. This is very important, however, because otherwise product information will be displayed differently on different areas of the website.

Another problem is access to the data (see Fig. 2): The increase of product data means at the same time an increase of updates of these data (prices, availabilities, etc.) in the database. These increased write operations often affect the reading speed. Especially in combination with horizontal scaling, where more instances access the database in read mode, the database becomes a bottleneck. As a result, the response times and thus the page load time for the customer deteriorate.

database02
database02

We were able to solve both problems: A change in our data supply, away from a pull to a push principle, now makes it possible to distribute data to all relevant services almost simultaneously. The distribution is done by means of a messaging system. Each instance of each service is registered as a recipient and holds the processed data directly in memory. Requests can thus be answered quickly without a database query.

In the following, I will go into more detail about the two problems mentioned and then outline our solution.

Problems of the data supply

In a distributed system like ours, consisting of many small independent micro-services, each individual service is responsible for its own data supply. According to the "shared nothing" principle, it shares nothing with other services, and the system is thus particularly scalable and fail-safe.

Databases are therefore not shared between services either, and each service must therefore take care of their updates itself. In our case, this is done by a regular process that fetches new data asynchronously in the background via a pull mechanism from a central system (see Fig. 3).

database03
database03

If a certain service experiences a higher load, i.e. a higher number of calls, this can be intercepted in a targeted manner by horizontal scaling, i.e. the use of additional instances of this service. However, since the instances share a database, the possible load is limited by this database. Here, too, horizontal scaling can be achieved by increasing the number of nodes in the database cluster. So much for the theory.

We use MongoDB as our database and have had rather bad experiences with this type of scaling. The increased write accesses of the data import often lead to global locks, which then temporarily block read accesses and thus lead to high latencies for the customer.

For scalability, it is also important not only to be able to increase the load, but also the amount of data that can be distributed to services in a timely manner.

With our architecture, it also became increasingly difficult to keep the database synchronized across all services. Due to the deviating runtimes of the import processes, e.g. due to different system loads or different processing steps, there are conceptual temporary deviations between the services. With the ever-growing number of new products and product data changes, these inconsistencies are increasingly amplified.

Alternative data supply via messaging system


Another factor influencing the scalability and robustness of distributed systems is the type of communication. The many services often do not communicate directly with each other, but instead asynchronously by sending messages over an event bus. According to the publish-subscribe principle, a message can have more than one recipient. The recipient(s) do not even have to be known.

This form of message distribution can of course also be used to supply data. We use it now, since it offers the following advantage in our case: There is only one central regular process that fetches modified product data and then distributes it to all recipients via the event bus (cf. Fig. 4). This happens with low latency and thus we manage to keep the services relatively synchronous despite large amounts of data.

database05
database05

Event sourcing

However, this type of data sourcing alone does not replace a database because it is not persistent. A service would only know the data it receives from the time it registers on the event bus. Thus, each restart would result in data loss and require reprocessing of all messages to get back to the current state.

This corresponds to an essential part of how event sourcing works. Instead of storing the current state as in a database, all messages that led to that state are stored in sequence in a message log. The final state can then be restored by sequentially re-reading and processing all messages from the message log. At the same time, the message log is also the event bus through which new messages are distributed to all registered recipients.

By using event sourcing, we can run our services without a database. All data that we read from the message log at startup, as well as that we receive thereafter, is stored directly in the memory of the respective instance (see Fig. 5).

database06
database06


In order to save storage space, only the data of a product that is relevant for the service is stored, of course. Likewise, we do not use the Java heap but an off-heap memory. There are two reasons for this. On the one hand, it almost halves the required memory, since strings with ASCII characters are represented with only one byte (UTF-8) instead of two bytes (UTF-16) as in Java. Secondly (and much more importantly), it saves the Java garbage collector from having to manage several gigabytes of data, thus preventing unnecessary pausing of the virtual machine.

Eliminating database access improves service response times while allowing cost-efficient and true linear scaling. Linear means that twice as many instances can now handle exactly twice the load; 100 instances, a hundred times the load, etc. - as long as you neglect the limited network bandwidth. The database cluster is no longer a limiting factor, and it is cheaper to provide instances with lots of memory than to run additional nodes for a database cluster.

There are even more advantages. Since messages in a message log are immutable and append-only, not only the current state but any state can be restored for a point in time. One receives thereby quasi automatically a versioning. For debugging, each change to the system can be individually traced and repeated step by step.

Where there is light, there is also shadow

Apart from the fact that event sourcing is rather unknown and not so widespread, there are no frameworks and the complexity is quite high, it violates an important characteristic of micro services: Short startup times (see Twelve-Factor Apps).

This is countered by the time-consuming reading of the entire event log to recover the dataset. Since only the current state is usually of interest to a service, there are various approaches to achieving this as quickly as possible:

  • Variant a) The event log can be read backwards. However, it is extremely difficult to determine how far backward to read to get all the messages needed to get the correct state.
  • Variant b) The current state and the time of the last message is persisted locally in a database. After a restart, the event log does not have to be read from the beginning, but only from the stored time.
  • Variant c) A compact snapshot of the event log is created regularly (e.g. every two hours), which only contains the last version of a message (see Fig. 6). After a restart, only a reduced event log in the form of a snapshot and the messages from the time of snapshot creation need to be read.
database07
database07

We use variant "c" because fewer messages have to be read than with variant "a" - even if you knew exactly how far back you have to read. Variant "b" is ruled out because our goal was to do without a database. Also, a newly developed service would not receive messages before it went live, and the sender would have to resend them. This violates the publish-subscribe principle, since receiver and sender should not actually know about each other.

If Apache Kafka is used as middleware for the implementation of event sourcing or an event log, a so-called "log compaction" can be used. The functionality of a compaction is similar to that of a snapshot, so that only the latest version of a message is kept (see Fig. 6).

Since we at OTTO recently migrated our entire infrastructure to AWS, we wanted to gain experience first before running Kafka there on our own. For the implementation of event sourcing as a new data supply, we are therefore using exclusively AWS technologies for the time being, such as AWS-Kinesis and AWS-SQS together with a specially developed open source library called Synapse.
Synapse aims to facilitate the development of new micro services with an event sourcing connection. We will present this topic to you in more detail in a following blog post.

8Comments

  • Guido Steinacker
    19.01.2019 21:46 Clock

    Erfahrungen nein - wir haben etwas gesucht, was mit "AWS Bordmitteln" funktioniert. Axon scheint da nicht zu passen.

  • Markus Schwarz
    19.01.2019 21:35 Clock

    Vielen Dank für den Beitrag.
    Mit Axon (https://axoniq.io/) gibt es sehr wohl ein Framework für Event Sourcing.

  • Guido Steinacker
    19.01.2019 21:41 Clock

    Du hast natürlich Recht - und es gibt auch noch ein paar mehr: Eventuate, Lagom, Spring Cloud Stream...

  • Yuriy
    19.01.2019 07:12 Clock

    Guten Tag,

    Danke für den Artikel. Ich habe eine Frage.

    Es gib ein even sourcing Framework Namens “Axon Frameworks”, habt ihr mal es untersucht? Wenn ja, könntet ihr mir bitte eure Erfahrungen teilen?

    Danke

  • Dominik Sandjaja
    20.01.2019 22:20 Clock

    Interessanter Artikel, der Prozess ist unserem sehr ähnlich, nur dass wir für die Schnittstelle Datenbank -> Kafka Debezium statt eines Sync-Jobs nutzen.
    Welche Technologie wird denn für die Off-Heap-Speicherung der Daten verwendet? RocksDB?

  • 20.01.2019 22:30 Clock

    Finde ich gut, dass ihr als "AWS Neulinge" :) auf "managed services" (sofern das pricing für den Anwendungsfall passt) zurück greift und nicht mit selbst gebauten Komponenten auf EC2 beginnt.

  • Florian Torkler
    21.01.2019 12:30 Clock

    Wir nutzen derzeit eine reine In-Memory-Map (<a href="https://github.com/OpenHFT/Chronicle-Map" rel="nofollow">Chronical Map</a>) als simplen Key-Value-Store. Komplizierte Abfragen wie bei einer Datenbank sind daher nicht möglich. Sofern dies nötig ist, kann natürlich individuell für einen Mirco-Service auch eine In-Memory-Database eingesetzt werden.

  • Florian Torkler
    21.01.2019 12:44 Clock

    Neuerdings gibt es eine managed <a href="https://aws.amazon.com/jobs/de/kafka/" target="_blank" rel="nofollow">Kafka als AWS</a>. Allerdings vorerst nur in der Region US East (N. Virginia). Wir warten darauf, dass es nach Frankfurt kommt :)

Write a comment
Answer to: Reply directly to the topic

Written by

Florian Torkler

Similar Articles

We want to improve out content with your feedback.

How interesting is this blogpost?

We have received your feedback.

Allow cookies?

OTTO and three partners need your consent (click on "OK") for individual data uses in order to store and/or retrieve information on your device (IP address, user ID, browser information).
Data is used for personalized ads and content, ad and content measurement, and to gain insights about target groups and product development. More information on consent can be found here at any time. You can refuse your consent at any time by clicking on the link "refuse cookies".

Data uses

OTTO works with partners who also process data retrieved from your end device (tracking data) for their own purposes (e.g. profiling) / for the purposes of third parties. Against this background, not only the collection of tracking data, but also its further processing by these providers requires consent. The tracking data will only be collected when you click on the "OK" button in the banner on otto.de. The partners are the following companies:
Google Ireland Limited, Meta Platforms Ireland Limited, LinkedIn Ireland Unlimited Company
For more information on the data processing by these partners, please see the privacy policy at otto.de/jobs. The information can also be accessed via a link in the banner.