Stuck in the loop

Write . Fight resistance . Ship!

A magic error on Kafka

Last month we decided to upgrade our kafka cluster. It was still running version 0.10.2 of Kafka which have undergone lots of improvements over the years. It seemed a straightforward procedure, but we had some bumps on the road that allowed us to learn some interesting things on Kafka inner workings.

We decided to upgrade to version 2.4 and, after going through the Kafka documentation, we defined a migration plan:

  1. Upgrade client services to use >0.11 Kafka client
  2. Upgrade Apache Kafka version from 0.10.2 to 2.4
    1. Update the binary version
    2. Update inter.broker.protocol version
    3. Update log.message.format version

After upgrading the client version, we planned doing 3 sequential rolling updates on the broker side to avoid losing messages or any kind of version compatibility issues, just as per the documentation. However, looking at the plan now, it is clear that we misunderstood point 5 of the rolling upgrade documentation. We [mis]understood that we needed to upgrade all clients to version >0.11.0 before bumping log.message.format.version to the new version.

It seems obvious that we should not have clients in newer versions than the brokers. Having brokers in version 0.10 and a producer or consumer using a newer version can cause problems. In our case, while the 0.11 client would still work with the 0.10 broker, we evidently couldn’t expect to use 0.11 features, such as record headers, just like that. However, that is exactly what we did when we upgraded the spring-integration-kafka dependency in one of our components. As soon as we finish the deploy in our test environment we start having errors when trying to produce messages to the test topic. This new dependency version used the 0.11 client and the new record headers feature, which resulted in errors stating that Magic v1 does not support record headers.

INFO c.p.b.b.m.e.InboundMessageErrorHandler - error occurred in message handler 
  [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0];
  nested exception is java.lang.IllegalArgumentException: Magic v1 does not support record headers

What is this Magic v1? We did not know, but we were curious.

When the client wants to produce a new record it does so asynchronously, it does not block waiting for an acknowledgement, instead, it returns immediately once the record has been stored in a records buffer. After enough data has been accumulated or enough time has passed, the accumulated messages are removed from the buffer and sent to the broker by a background sender thread responsible for this lifecycle. Thus, the record is added to a RecordAccumulator which in its turn tries to append it to a ProducerBatch that internally calls this.recordsBuilder.append().

this.recordsBuilder is an instance of MemoryRecordsBuilder that is simply a buffer of records and which uses the magic value to validate if the record before appending it. In this case for verifying if it contains headers and if we support them or not.

// org.apache.kafka.common.record.MemoryRecordsBuilder :: appendWithOffset
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
  throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");

Where is the magic variable coming from?

The magic value is passed on the class constructor, called by the RecordAccumulator.

// org.apache.kafka.clients.producer.internals.RecordAccumulator :: append
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
  ...
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  ...
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);

Here it is calculated using the maxUsableProduceMagic() which returns

ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE));

Looking at requiredMagicForVersion(produceRequestVersion) we can conclude that a produceRequestVersion < 3 is responsible for the exception triggering in MemoryRecordsBuilder.

// org.apache.kafka.common.requests.ProduceRequest :: requiredMagicForVersion
public static byte requiredMagicForVersion(short produceRequestVersion) {
  switch (produceRequestVersion) {
    case 0:
    case 1:
      return RecordBatch.MAGIC_VALUE_V0;

    case 2:
      return RecordBatch.MAGIC_VALUE_V1;

    case 3:
    case 4:
    case 5:
    case 6:
      return RecordBatch.MAGIC_VALUE_V2;
...

How is the produce request API version calculated?

The client supported versions are hard coded, thus dependent on the Kafka binary version, but the actual used versions in runtime consider the broker supported versions. The client asks the broker for the list of its supported API versions and then uses the highest common version between both parties. That is just what versions.latestUsableVersion() returns.

$ kafka-broker-api-versions --bootstrap-server localhost:9092
(id: 1 rack: null) -> (
        Produce(0): 0 to 2 [usable: 2],
        Fetch(1): 0 to 5 [usable: 5],
        ...
        ...
)

As we can see, by checking the exposed broker’s API versions, the max supported ApiKeys.PRODUCE version on the broker is 2 which is also the highest usable version. This version, as we have already verified, translates to MAGIC_VALUE_V1 which triggers the client exception.

The next thing we did, expecting it would solve the problem, was to upgrade the brokers' Kafka version so they could use a higher version of the API, but again we receive an error when producing messages, although a different one this time.

ERROR [] [org.springframework.kafka.support.LoggingProducerListener] - 
Exception thrown when sending a message with key=891185542 and payload=this is a test! to topic test-topic: 
    org.apache.kafka.common.errors.UnknownServerException: 
    The server experienced an unexpected error when processing the request

This time the message was able to actually leave the client, but something went wrong on the broker side. Lets first understand why it reached the broker.

We were now using version 2.4 on the brokers and after setting the log level to debug on the client this was the response to our first ApiVersionRequest.

Recorded API versions for node -1: 
  (**Produce(0): 0 to 8 [usable: 3]**, ...

With this information we could confirm that the new broker version allows ApiKeys.PRODUCE up to version 8 and that the max usable version is 3, the highest supported by the 0.11 client. Thus, as expected, we were not getting the Magic v1 error on the client since version 3 maps to MAGIC_VALUE_V2. It seemed everything should be working, but then, why is there an error on the server side? Looking at the broker logs, we found a familiar error.

ERROR [ReplicaManager broker=2] Error processing append operation on partition test-topic-15 (kafka.server.ReplicaManager)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
  ...

Again the same exception, thrown by MemoryRecordsBuilder when failing the validation of the record headers support. Following the execution path upstream we reach Log.append(), called when the broker receives a new record batch and tries to append it to the log. Here, the magic value is passed down with config.messageFormatVersion.recordVersion.value.

// kafka.log.Log :: append
LogValidator.validateMessagesAndAssignOffsets(validRecords,
  topicPartition,
  ...,
  config.messageFormatVersion.recordVersion.value,
  ...
)

As stated before, we had upgraded the brokers to version 2.4 but, at the same time, we had also overwritten log.message.format.version with the current version 0.10.2, as suggested on the documentation. This configuration property expects a version that is then mapped to the corresponding record version, also known as magic value.

// kafka.api.ApiVersion
case object KAFKA_0_10_2_IV0 extends DefaultApiVersion {
  val shortVersion = "0.10.2"
  val subVersion = "IV0"
  val recordVersion = RecordVersion.V1
  val id: Int = 9
}

Looking at ApiVersion, we see that version 0.10.2 maps to MAGIC_VALUE_V1, which, again, triggers the exception on MemoryRecordsBuilder.

It seems there is something that is not completely correct here. The broker response to ApiVersionRequest returns an incorrect value. The broker is stating that it can use ApiKeys.PRODUCE version 8, meaning it supports MAGIC_VALUE_V2 format and therefore record headers, when in fact it does not. While internally Log.append() uses log.message.format.version to calculate the magic, the exposed API version does not consider this configuration at all, which can generate unexpected behavior.

Shouldn’t this version be obtained from a single place?
For example, instead of directly getting the version from the configuration, shouldn’t Log.append() use maxUsableProduceMagic() that in turn should consider log.message.format.version?

This appears to be an incorrect behavior, however, fixing it wouldn’t solve our initial migration problem. The client would receive a max usable Produce version of 2 and would just continue to fail on the client side. If we wanted to be able to handle messages with record headers, we needed to upgrade the brokers' log.message.format.version.

Note that the older Scala clients, which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs (or to take advantage of exactly once semantics), the newer Java clients must be used.

In the end, due to a misunderstanding of the documentation that lead us to think that the client upgrade was mandatory before changing log.message.format.version, we were actually causing the problem. Starting by using the older clients in a first iteration would not be a problem, however, they should be upgraded to avoid conversion costs and enable the use of newly introduced features. This fact was not that clear when we first read the documentation.