Stuck in the loop

Write . Fight resistance . Ship!

Losing Kafka messages (largestTime=0)

Continuing the Kafka saga. In my last post I talked about the migration problems and the investigation around the Magic Version errors. This one is about what happened after we solved those issues and finally deployed the cluster in our test environments. While verifying if everything was working properly we noticed that some messages had disappeared. After publishing some test messages to a topic and triggering a cluster redeploy we verified that, when consuming the said topic from the beginning we were not receiving all the messages we had published.

We checked the Kafka logs (operational logs, not to be confused with where Kafka stores the actual messages) during the Kafka initialization process and we found that some of the log segments were being scheduled for deletion. To make it worse this issue was not happening with messages from every topic nor even in a consistent manner every time we deployed.

What are log segments?

Within each kafka topic we can have multiple partitions and each partition stores records in a log structured format. Conceptually we can imagine each record being stored sequentially in an infinitely long file. However, in reality, each partition does not keep all the records sequentially in a single file. Instead, each of the partitions is broken into smaller sized segments named after its base offset, with Segment N containing the most recent records and Segment 1 containing the oldest retained records. Besides, log segments cannot be kept forever on the machine or else it would fill up the disk. To deal with this we can configure topic retention policies, limits after which the segments are deleted, which can be a file size limit (for example, 1 GB), a time limit (for example, 1 day), or both.

Why were our log segments being unexpectedly deleted?

We started by analyzing the kafka operational logs.

INFO [Log partition=test-0, dir=/var/kafkadata/data01/data]
    Found deletable segments with base offsets [1] due to retention time 86400000ms breach (kafka.log.Log)

We saw that Kafka found log segments that should be deleted due to a retention time breach, which means that it believed that the log file was already old enough to be discarded, it had reach the configured retention time - how long the segment should be kept after the last message was inserted on it. This was not expected at all as we had just inserted the messages a few minutes before this message was logged and thus we were certain that the segment was not 86400000ms (1 day) old.

INFO [Log partition=test-0, dir=/var/kafkadata/data01/data] 
    Scheduling segments for deletion List(LogSegment(baseOffset=1, size=73, lastModifiedTime=1587465763000, largestTime=0)) (kafka.log.Log)

Then we found another interesting log message, this time with information about the actual segment that was scheduled for deletion. This one was also puzzling, we see lastModifiedTime=1587465763000 and largestTime=0, which does not make much sense. How can this be? We went to check the source code and understand when could this behavior happen.

private def deleteRetentionMsBreachedSegments(): Int = {
  if (config.retentionMs < 0) return 0
  val startMs = time.milliseconds
  deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
    reason = s"retention time ${config.retentionMs}ms breach")
}

This function is responsible for deleting segments that have expired due to violation of the time based retention policy. It checks if the time passed since the segment’s largest timestamp is bigger than the configured retention time and schedules them to be deleted by the cleaning thread. How does the largestTimestamp of the segment is calculated?

def largestTimestamp = if (maxTimestampSoFar >= 0) maxTimestampSoFar else lastModified

We can establish that to get largestTimestamp == 0, the value that we were getting on the log message, one of these assertions need to be true, or maxTimestampSoFar == 0 or maxTimestampSoFar < 0 && lastModified == 0.

def maxTimestampSoFar: Long = {
  if (_maxTimestampSoFar.isEmpty)
    _maxTimestampSoFar = Some(timeIndex.lastEntry.timestamp)
  _maxTimestampSoFar.get
}

At startup, when _maxTimestampSoFar is still empty, the timestamp is obtained from the segment’s timeIndex file.

There is one timeIndex file for each log segment, each one keeping a list of entries in the format Timestamp -> Offset, which mean that in the corresponding log segment any message whose timestamp is greater than Timestamp was inserted after Offset. These files are lazily loaded and were introduced to improve query time and the reliability of some timestamp based functionality (for example searching messages by timestamp).

With this we concluded that something was wrong with the way kafka was reading these files as the lastEntry.timestamp value was not a valid timestamp but an unexpected 0 or a -1 error code. With this information we moved to check what could be wrong with the timeIndex files. In order to read their content we used the DumpLogSegments tool that comes with Kafka.

$ kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files <path>

...
Dumping ./newtopic-3/00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :/usr/local/var/lib/kafka-logs/./newtopic-3/00000000000000000000.timeindex
  Index timestamp: 0, log timestamp: 1588583643582

Dumping ./newtopic-4/00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :/usr/local/var/lib/kafka-logs/./newtopic-4/00000000000000000000.timeindex
  Index timestamp: 0, log timestamp: 1587716141887

Dumping ./newtopic-5/00000000000000000000.timeindex
timestamp: 0 offset: 0
The following indexed offsets are not found in the log.
Indexed offset: 0, found log offset: 28
...

We discovered that the great majority of them add error messages like these. Initially, this felt like it could be the problem we were looking for, but after some investigation we found out that apparently this is a normal output when using the DumpLogSegment tool on the active (last) log segment. When created, the timeindex file is preallocated with timestamp: 0 offset: <last rolled out offset>, thus we can understand both errors can be due to us trying to get information before the files have any relevant data on them and DumpLogSegment being unable to match it to the content of the corresponding log segment file.

After some time getting nowhere, running and analyzing more and more tests, we noticed that we were only losing messages from segments whose largestTimestamp could not be loaded due to file access problems. During the rolling update and before kafka completely recovered the files to an “healthy” state we confirmed that we were unable to open some of the timeindex files due to Permission denied errors, specifically the ones belonging to segments that lost its messages.

At this point we started suspecting there was a problem with the mounting/unmounting of the volumes during the deployment process.

We use external volumes to store Kafka data. Every time we do a deploy these volumes are unmounted from the old cluster machines and mounted on the new ones. The problem that we found was that during this process these files permissions were being set incorrectly. When Ansible, which is responsible for the rolling update, mounted the volumes on the new machines it was setting default root permissions. When Kafka needed them to initialize it was not able to read the correct largestTimestamp from the file, thus defaulting to 0 which then resulted in segment deletion and we losing all the messages that were written in these log segments.

After this long and painful investigation, the solution was a simple change on a configuration file that fixed the permissions when mounting the volumes.

As a final note, we still have one detail that we could not fully understand or get a definite answer to - Why was this problem not systematic? A possible reason can be due to the rolling update process. We do not shutdown all the nodes at the same time and start up 3 new instances. Instead we shutdown one node and we bring a new one up, only then we continue with the next node. This results in multiple cluster rebalances happening during the deploy. Besides, the partition leaders are not always the same and the message distribution was also not deterministic. These are several variables that could explain why the issue was not happening consistently at every deploy, but just wild guesses.