Poison Pills in Kafka (I)

What is a poison pill?

A “poison pill” is a record that always fails when consumed, no matter how many times it is attempted. They come in different forms:

  • Corrupted records.
  • Records that make your consumer deserializer fail (e.g., an Avro record whose writer schema is not compatible with the consumer reader schema).

The problem with a poison pill is unless the consumer eventually handles it, it blocks the consumption of the topic/partition that contains it, halting the consumer progress.

What can we do with poison pills?

There are many different strategies to deal with poison pills. The Kafka Streams FAQ describes some of them:

  • Log an error and shut down the application
  • Log an error and skip the poisonous record
  • Send the record to a separate topic (e.g., Dead Letter Queue) for further inspection (likely by a human operator)
  • Transform the poisonous record into a sentinel value that identifies a record that couldn’t be interpreted, downstream

Kafka Streams to the rescue

A year ago Kafka Streams added options to handle corrupt data during deserialization. This PR set the foundations for dealing with poison pills in Kafka Streams:

  • Added the new interface DeserializationExceptionHandler.
  • Introduced two default implementations for LogAndContinue and LogAndFail.
  • Added a new default.deserialization.exception.handler configuration value for StreamsConfig. Its default value is LogAndFail.

How does it work?

The key in this implementation is RecordDeserializer. This class executes whatever exception handling we have configured for our Streams application.

kafka streams poison pills

In the diagram, the classes shaded in blue are exclusive to Kafka Streams, while the class is red is part of the common Kafka code base.

If the strategy is to continue, a return null bubbles up a null ConsumerRecord to the layer above, RecordQueue, which discards the record and continue with the next one.

On runtime, for LogAndContinue, a series of warnings are logged, and the consumption continues. On the other hand, for LogAndFail a StreamsException is thrown.

There is also a set of metrics that can be monitored to make sure we are not skipping too many records.

Metric Name Description JMX path
skipped-records-rate The average number of skipped records per second. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
skipped-records-total The total number of skipped records. kafka.streams:type=stream-metrics,client-id=([-.\w]+)

What about Kafka Consumer?

If your application is not using Kafka Streams, there is no support for handling poison pills in the KafkaConsumer basic consumer.

However, not all is lost. In the next installment in this series, we will see how we can implement a very similar strategy leveraging the Serde interface that KafkaConsumer uses to deserialize records before delivering them to your application.

Leave a Reply

Your email address will not be published. Required fields are marked *