What is a poison pill?
A “poison pill” is a record that always fails when consumed, no matter how many times it is attempted. They come in different forms:
- Corrupted records.
- Records that make your consumer deserializer fail (e.g., an Avro record whose writer schema is not compatible with the consumer reader schema).
The problem with a poison pill is unless the consumer eventually handles it, it blocks the consumption of the topic/partition that contains it, halting the consumer progress.
What can we do with poison pills?
There are many different strategies to deal with poison pills. The Kafka Streams FAQ describes some of them:
- Log an error and shut down the application
- Log an error and skip the poisonous record
- Send the record to a separate topic (e.g., Dead Letter Queue) for further inspection (likely by a human operator)
- Transform the poisonous record into a sentinel value that identifies a record that couldn’t be interpreted, downstream
Kafka Streams to the rescue
A year ago Kafka Streams added options to handle corrupt data during deserialization. This PR set the foundations for dealing with poison pills in Kafka Streams:
- Added the new interface
- Introduced two default implementations for
- Added a new
default.deserialization.exception.handlerconfiguration value for
StreamsConfig. Its default value is
How does it work?
The key in this implementation is
RecordDeserializer. This class executes whatever exception handling we have configured for our Streams application.
In the diagram, the classes shaded in blue are exclusive to Kafka Streams, while the class is red is part of the common Kafka code base.
If the strategy is to continue, a
return null bubbles up a null
ConsumerRecord to the layer above,
RecordQueue, which discards the record and continue with the next one.
On runtime, for
LogAndContinue, a series of warnings are logged, and the consumption continues. On the other hand, for
StreamsException is thrown.
There is also a set of metrics that can be monitored to make sure we are not skipping too many records.
|Metric Name||Description||JMX path|
|skipped-records-rate||The average number of skipped records per second.||kafka.streams:type=stream-metrics,client-id=([-.\w]+)|
|skipped-records-total||The total number of skipped records.||kafka.streams:type=stream-metrics,client-id=([-.\w]+)|
What about Kafka Consumer?
If your application is not using Kafka Streams, there is no support for handling poison pills in the KafkaConsumer basic consumer.
However, not all is lost. In the next installment in this series, we will see how we can implement a very similar strategy leveraging the
Serde interface that
KafkaConsumer uses to deserialize records before delivering them to your application.