On 09/29/18, a Never Code Alone event was held at the OTTO campus where over 30 Scala enthusiasts gathered to code together in interactive Mob Programming sessions.
There were a total of four sessions spread throughout the day. At the beginning of each session there was a short introduction to the topic. After that, the first brave person in the audience was handed a wireless keyboard and the mob programming started.
The design of the sessions was left up to the speakers, the only premise: The keyboard should go into the audience as fast as possible.
One of the four talks was from us: Sebastian Schröder and Frederik Mars
The goal of our talk was to give the participants an insight into working with Apache Kafka and how to build a stateful-streaming service based on it. As an example, we picked the calculation of article page views per second from Otto.de. This metric should then be written into a metrics topic. A special detail here was to ensure an at-least-once delivery semantic (more on that later).
The official Kafka documentation explains well and concisely the basic concepts of Apache Kafka in its intro.
To follow our implementation, you should have read something about partitions and topics in the Kafka documentation.
For simplicity, we worked with only one partition in our session. However, only minor adjustments would be necessary to consume, for example, 24 partitions of a topic.
We started our session with a short introduction to Kafka and then moved into the common implementation. The basis for this was a microservice we prepared, which consumes data from a Kafka topic and outputs it to the console. The data is available in the following structure:
case class Click(
clickType: ClickType, // Page || Action
name: String,
timestamp: Long,
browser: String,
tabId: Option[String] = None
){
val epochTimestamp: Long = timestamp / 1000
}
In order to consume the data as easily as possible, we have prepared a poll loop.
while (running.get()) {
consumer
.poll(100)
.flatMap(record => Click.parse(record.value()))
.foreach(click =>
// Hier wird im Folgenden die Logik implementiert.
logger.info(s"$click")
)
}
Together, we first decided to simply create a global variable whose value we updated on each pass of the poll loop
var pageViewCounter: Long = 0
[...]
.foreach(click =>
if(click.clickType == Page) {
pageViewCounter += 1
}
)
By means of a log output, we could now see how our counter counts the page impressions of Otto.de. However, the initial question required us to measure the page views per second, i.e. as a rate.
In the next step, we therefore modified the microservice so that we could measure this rate. To do this, we created a map from second to number of messages (Long => Int), which we updated with each message. To do this, we incremented the number of messages under the timestamp of the message.
var pageViewMap = Map.empty[Long, Int]
var currentTime: Long = 0
[...]
.foreach(click =>
if (click.clickType == Page) {
val time = click.epochTimestamp
val count = pageViewMap.getOrElse(time, 0)
pageViewMap = pageViewMap + (time -> (count + 1)) // inkrementieren
if (time != currentTime) { // abgeschlossene Sekunde
val pageViewsOfSecond = pageViewMap.getOrElse(currentTime, 0)
logger.info(pageViewsOfSecond)
pageViewMap = pageViewMap - currentTime
currentTime = time
}
}
)
We now want to make the determined metrics available to other consumers. To do this, we write the rates of each completed second into another Kafka topic. To detect the change to a new second, we check if the second of the current message exceeds a second boundary. To do this, we use the persisted time of the message from the previous run.
producer.send(
new ProducerRecord[String, String](
metricsTopic,
KafkaConfig.Partition,
currentTime.toString, // Key
pageViewsOfSecond.toString // Value
))
So we already have the measurement of the data in the quality we wanted. Now we still need to make sure that we successfully consume each message and include it in our measurement.
producer.send(
new ProducerRecord[String, String](
metricsTopic,
KafkaConfig.Partition,
currentTime.toString,
pageViewsOfSecond.toString),
new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
Option(exception) match {
case None =>
consumer.commitAsync(Map(new TopicPartition(sourceTopic, KafkaConfig.Partition) -> savedOffset))
case Some(e) => logger.error(e.getMessage)
}
}
})
Once a second is complete, we send the message to our destination topic. If the write was successful, we commit the read position of the previous message. Thus, when we restart, we always start at a second boundary and thus determine correct data.
If our service crashes after successfully sending the message and before committing the offset, messages are potentially written twice (at least once semantics), but we will never lose data.
After 90 minutes we reached the end of our session. The end result was something to behold: A microservice that measures the article page views of Otto.de and writes the measured values into another topic, so that other services could use these metrics.
We enjoyed the session very much. Everyone was very active in participating, programming and asking questions. It always leads to a cool result when many people coming from different directions think about the same problem and exchange ideas. The discussions that arose made it clear to us once again that a mob programming session is very suitable for imparting knowledge and thinking together about problems. We will take this experience back to our teams, with the goal of using mob-programming in such situations more often in the future.
A few voices about the event were captured here (in german).
Here is the code we wrote together in the session.
We have received your feedback.