1
 
 
Account
In your account you can view the status of your application, save incomplete applications and view current news and events
January 25, 2019

Mob Programming: Stateful Streaming

What is the article about?

On 09/29/18, a Never Code Alone event was held at the OTTO campus where over 30 Scala enthusiasts gathered to code together in interactive Mob Programming sessions.

There were a total of four sessions spread throughout the day. At the beginning of each session there was a short introduction to the topic. After that, the first brave person in the audience was handed a wireless keyboard and the mob programming started.

The design of the sessions was left up to the speakers, the only premise: The keyboard should go into the audience as fast as possible.

One of the four talks was from us: Sebastian Schröder and Frederik Mars

Our Session

The goal of our talk was to give the participants an insight into working with Apache Kafka and how to build a stateful-streaming service based on it. As an example, we picked the calculation of article page views per second from Otto.de. This metric should then be written into a metrics topic. A special detail here was to ensure an at-least-once delivery semantic (more on that later).

mob01
mob01

A few words about Kafka

The official Kafka documentation explains well and concisely the basic concepts of Apache Kafka in its intro.

To follow our implementation, you should have read something about partitions and topics in the Kafka documentation.

For simplicity, we worked with only one partition in our session. However, only minor adjustments would be necessary to consume, for example, 24 partitions of a topic.

We started our session with a short introduction to Kafka and then moved into the common implementation. The basis for this was a microservice we prepared, which consumes data from a Kafka topic and outputs it to the console. The data is available in the following structure:

case class Click(
  clickType: ClickType,    // Page || Action
  name: String,
  timestamp: Long,
  browser: String,
  tabId: Option[String] = None
){
    val epochTimestamp: Long = timestamp / 1000
}

In order to consume the data as easily as possible, we have prepared a poll loop.

while (running.get()) {
  consumer
    .poll(100)
    .flatMap(record => Click.parse(record.value()))
    .foreach(click => 
      // Hier wird im Folgenden die Logik implementiert.
      logger.info(s"$click")
    )
}

The first task now was to simply count the article page views. At this point we have now passed the keyboard to the audience.

Together, we first decided to simply create a global variable whose value we updated on each pass of the poll loop

var pageViewCounter: Long = 0
[...]
.foreach(click =>
  if(click.clickType == Page) {
    pageViewCounter += 1
  }
)

By means of a log output, we could now see how our counter counts the page impressions of Otto.de. However, the initial question required us to measure the page views per second, i.e. as a rate.

In the next step, we therefore modified the microservice so that we could measure this rate. To do this, we created a map from second to number of messages (Long => Int), which we updated with each message. To do this, we incremented the number of messages under the timestamp of the message.

var pageViewMap = Map.empty[Long, Int]
var currentTime: Long = 0
[...] 
.foreach(click =>
  if (click.clickType == Page) {
    val time = click.epochTimestamp
    val count = pageViewMap.getOrElse(time, 0)
    pageViewMap = pageViewMap + (time -> (count + 1)) // inkrementieren
    if (time != currentTime) { // abgeschlossene Sekunde
      val pageViewsOfSecond = pageViewMap.getOrElse(currentTime, 0)
      logger.info(pageViewsOfSecond)
      pageViewMap = pageViewMap - currentTime
      currentTime = time
    }
  }
)

We now want to make the determined metrics available to other consumers. To do this, we write the rates of each completed second into another Kafka topic. To detect the change to a new second, we check if the second of the current message exceeds a second boundary. To do this, we use the persisted time of the message from the previous run.


So now when a new second occurs, it means that we have counted all the page views from the last second. At that moment we want to write the metric into the other Kafka topic. Sending the message is done using Kafka Producer API:
producer.send(
  new ProducerRecord[String, String](
    metricsTopic,
    KafkaConfig.Partition,
    currentTime.toString, // Key
    pageViewsOfSecond.toString // Value
))

So we already have the measurement of the data in the quality we wanted. Now we still need to make sure that we successfully consume each message and include it in our measurement.


However, if the application crashes unexpectedly, we may lose data: Assuming we fetch one message per poll for simplicity, we get message n0 with the first poll. The next poll-loop fetches message n1 and commits the offsets for n0 (the default). n1 might now trigger a producer.send. Assuming our app crashed on this send, it would restart at n1 after it restarted, since the offsets were previously committed. Thus, we would have lost the data from n0.

To resolve this issue, we will disable the automatic management of offsets and do it manually.
producer.send(
  new ProducerRecord[String, String](
    metricsTopic, 
    KafkaConfig.Partition, 
    currentTime.toString, 
    pageViewsOfSecond.toString),
  new Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {						           
                                                      Option(exception) match {
        case None => 
          consumer.commitAsync(Map(new TopicPartition(sourceTopic, KafkaConfig.Partition) -> savedOffset))
        case Some(e) => logger.error(e.getMessage)
      }
    }
  })

Once a second is complete, we send the message to our destination topic. If the write was successful, we commit the read position of the previous message. Thus, when we restart, we always start at a second boundary and thus determine correct data.

If our service crashes after successfully sending the message and before committing the offset, messages are potentially written twice (at least once semantics), but we will never lose data.

After 90 minutes we reached the end of our session. The end result was something to behold: A microservice that measures the article page views of Otto.de and writes the measured values into another topic, so that other services could use these metrics.

Conclusion


We enjoyed the session very much. Everyone was very active in participating, programming and asking questions. It always leads to a cool result when many people coming from different directions think about the same problem and exchange ideas. The discussions that arose made it clear to us once again that a mob programming session is very suitable for imparting knowledge and thinking together about problems. We will take this experience back to our teams, with the goal of using mob-programming in such situations more often in the future.

A few voices about the event were captured here (in german).

Here is the code we wrote together in the session.

0No comments yet.

Write a comment
Answer to: Reply directly to the topic

Written by

Frederik Mars

Similar Articles

We want to improve out content with your feedback.

How interesting is this blogpost?

We have received your feedback.

Cookies erlauben?

OTTO und drei Partner brauchen deine Einwilligung (Klick auf "OK") bei einzelnen Datennutzungen, um Informationen auf einem Gerät zu speichern und/oder abzurufen (IP-Adresse, Nutzer-ID, Browser-Informationen).
Die Datennutzung erfolgt für personalisierte Anzeigen und Inhalte, Anzeigen- und Inhaltsmessungen sowie um Erkenntnisse über Zielgruppen und Produktentwicklungen zu gewinnen. Mehr Infos zur Einwilligung gibt’s jederzeit hier. Mit Klick auf den Link "Cookies ablehnen" kannst du deine Einwilligung jederzeit ablehnen.

Datennutzungen

OTTO arbeitet mit Partnern zusammen, die von deinem Endgerät abgerufene Daten (Trackingdaten) auch zu eigenen Zwecken (z.B. Profilbildungen) / zu Zwecken Dritter verarbeiten. Vor diesem Hintergrund erfordert nicht nur die Erhebung der Trackingdaten, sondern auch deren Weiterverarbeitung durch diese Anbieter einer Einwilligung. Die Trackingdaten werden erst dann erhoben, wenn du auf den in dem Banner auf otto.de wiedergebenden Button „OK” klickst. Bei den Partnern handelt es sich um die folgenden Unternehmen:
Google Inc., Meta Platforms Ireland Limited, elbwalker GmbH
Weitere Informationen zu den Datenverarbeitungen durch diese Partner findest du in der Datenschutzerklärung auf otto.de/jobs. Die Informationen sind außerdem über einen Link in dem Banner abrufbar.