The OTTO marketplace grows by around 10,000 products every week - and the trend is rising. It is therefore a particular challenge to deal with such large volumes of data and to enrich all systems with the data in a timely manner and keep it up to date at all times.
We are one of around 20 teams currently operating around 15 micro-services around the otto.de online store. In order to keep up with the growth of the OTTO marketplace, it is important to be able to scale our infrastructure accordingly. This can be done by creating additional micro-services that take on new functionalities or by scaling horizontally. In both cases, the database is becoming increasingly problematic.
One problem is the distribution of data (see Fig. 1): Due to the independence of the micro-services, it is more difficult to keep the separate databases at the same level as the number of product data increases and prices or availability are updated more frequently. This is very important, however, because otherwise product information will be displayed differently on different areas of the website.
Another problem is access to the data (see Fig. 2): The increase of product data means at the same time an increase of updates of these data (prices, availabilities, etc.) in the database. These increased write operations often affect the reading speed. Especially in combination with horizontal scaling, where more instances access the database in read mode, the database becomes a bottleneck. As a result, the response times and thus the page load time for the customer deteriorate.
We were able to solve both problems: A change in our data supply, away from a pull to a push principle, now makes it possible to distribute data to all relevant services almost simultaneously. The distribution is done by means of a messaging system. Each instance of each service is registered as a recipient and holds the processed data directly in memory. Requests can thus be answered quickly without a database query.
In the following, I will go into more detail about the two problems mentioned and then outline our solution.
If a certain service experiences a higher load, i.e. a higher number of calls, this can be intercepted in a targeted manner by horizontal scaling, i.e. the use of additional instances of this service. However, since the instances share a database, the possible load is limited by this database. Here, too, horizontal scaling can be achieved by increasing the number of nodes in the database cluster. So much for the theory.
We use MongoDB as our database and have had rather bad experiences with this type of scaling. The increased write accesses of the data import often lead to global locks, which then temporarily block read accesses and thus lead to high latencies for the customer.
For scalability, it is also important not only to be able to increase the load, but also the amount of data that can be distributed to services in a timely manner.
With our architecture, it also became increasingly difficult to keep the database synchronized across all services. Due to the deviating runtimes of the import processes, e.g. due to different system loads or different processing steps, there are conceptual temporary deviations between the services. With the ever-growing number of new products and product data changes, these inconsistencies are increasingly amplified.
Another factor influencing the scalability and robustness of distributed systems is the type of communication. The many services often do not communicate directly with each other, but instead asynchronously by sending messages over an event bus. According to the publish-subscribe principle, a message can have more than one recipient. The recipient(s) do not even have to be known.
This form of message distribution can of course also be used to supply data. We use it now, since it offers the following advantage in our case: There is only one central regular process that fetches modified product data and then distributes it to all recipients via the event bus (cf. Fig. 4). This happens with low latency and thus we manage to keep the services relatively synchronous despite large amounts of data.
However, this type of data sourcing alone does not replace a database because it is not persistent. A service would only know the data it receives from the time it registers on the event bus. Thus, each restart would result in data loss and require reprocessing of all messages to get back to the current state.
This corresponds to an essential part of how event sourcing works. Instead of storing the current state as in a database, all messages that led to that state are stored in sequence in a message log. The final state can then be restored by sequentially re-reading and processing all messages from the message log. At the same time, the message log is also the event bus through which new messages are distributed to all registered recipients.
By using event sourcing, we can run our services without a database. All data that we read from the message log at startup, as well as that we receive thereafter, is stored directly in the memory of the respective instance (see Fig. 5).
In order to save storage space, only the data of a product that is relevant for the service is stored, of course. Likewise, we do not use the Java heap but an off-heap memory. There are two reasons for this. On the one hand, it almost halves the required memory, since strings with ASCII characters are represented with only one byte (UTF-8) instead of two bytes (UTF-16) as in Java. Secondly (and much more importantly), it saves the Java garbage collector from having to manage several gigabytes of data, thus preventing unnecessary pausing of the virtual machine.
Eliminating database access improves service response times while allowing cost-efficient and true linear scaling. Linear means that twice as many instances can now handle exactly twice the load; 100 instances, a hundred times the load, etc. - as long as you neglect the limited network bandwidth. The database cluster is no longer a limiting factor, and it is cheaper to provide instances with lots of memory than to run additional nodes for a database cluster.
There are even more advantages. Since messages in a message log are immutable and append-only, not only the current state but any state can be restored for a point in time. One receives thereby quasi automatically a versioning. For debugging, each change to the system can be individually traced and repeated step by step.
Apart from the fact that event sourcing is rather unknown and not so widespread, there are no frameworks and the complexity is quite high, it violates an important characteristic of micro services: Short startup times (see Twelve-Factor Apps).
This is countered by the time-consuming reading of the entire event log to recover the dataset. Since only the current state is usually of interest to a service, there are various approaches to achieving this as quickly as possible:
We use variant "c" because fewer messages have to be read than with variant "a" - even if you knew exactly how far back you have to read. Variant "b" is ruled out because our goal was to do without a database. Also, a newly developed service would not receive messages before it went live, and the sender would have to resend them. This violates the publish-subscribe principle, since receiver and sender should not actually know about each other.
If Apache Kafka is used as middleware for the implementation of event sourcing or an event log, a so-called "log compaction" can be used. The functionality of a compaction is similar to that of a snapshot, so that only the latest version of a message is kept (see Fig. 6).
Since we at OTTO recently migrated our entire infrastructure to AWS, we wanted to gain experience first before running Kafka there on our own. For the implementation of event sourcing as a new data supply, we are therefore using exclusively AWS technologies for the time being, such as AWS-Kinesis and AWS-SQS together with a specially developed open source library called Synapse.
Synapse aims to facilitate the development of new micro services with an event sourcing connection. We will present this topic to you in more detail in a following blog post.
Erfahrungen nein - wir haben etwas gesucht, was mit "AWS Bordmitteln" funktioniert. Axon scheint da nicht zu passen.
Vielen Dank für den Beitrag.
Mit Axon (https://axoniq.io/) gibt es sehr wohl ein Framework für Event Sourcing.
Du hast natürlich Recht - und es gibt auch noch ein paar mehr: Eventuate, Lagom, Spring Cloud Stream...
Guten Tag,
Danke für den Artikel. Ich habe eine Frage.
Es gib ein even sourcing Framework Namens “Axon Frameworks”, habt ihr mal es untersucht? Wenn ja, könntet ihr mir bitte eure Erfahrungen teilen?
Danke
Interessanter Artikel, der Prozess ist unserem sehr ähnlich, nur dass wir für die Schnittstelle Datenbank -> Kafka Debezium statt eines Sync-Jobs nutzen.
Welche Technologie wird denn für die Off-Heap-Speicherung der Daten verwendet? RocksDB?
Finde ich gut, dass ihr als "AWS Neulinge" :) auf "managed services" (sofern das pricing für den Anwendungsfall passt) zurück greift und nicht mit selbst gebauten Komponenten auf EC2 beginnt.
Wir nutzen derzeit eine reine In-Memory-Map (<a href="https://github.com/OpenHFT/Chronicle-Map" rel="nofollow">Chronical Map</a>) als simplen Key-Value-Store. Komplizierte Abfragen wie bei einer Datenbank sind daher nicht möglich. Sofern dies nötig ist, kann natürlich individuell für einen Mirco-Service auch eine In-Memory-Database eingesetzt werden.
Neuerdings gibt es eine managed <a href="https://aws.amazon.com/jobs/de/kafka/" target="_blank" rel="nofollow">Kafka als AWS</a>. Allerdings vorerst nur in der Region US East (N. Virginia). Wir warten darauf, dass es nach Frankfurt kommt :)
We have received your feedback.