Tag Archives: kafka

Error tolerance in Kafka Connect (I)

Source: https://www.freeiconspng.com/img/9157

If you have done your homework and read the Confluent blog, you probably have seen this great deep-dive into Connect error handling and Dead Letter Queues post.

Robin Moffatt does a great job explaining how Kafka Connect manages error based on connectors configuration, including sending failed records to DLQs for Sink connectors. The blog post also introduces the concept of “Error tolerance” as something that can be configured per connector and Connect will honor:

We’ve seen how setting errors.tolerance = all will enable Kafka Connect to just ignore bad messages. When it does, by default it won’t log the fact that messages are being dropped. If you do set errors.tolerance = all, make sure you’ve carefully thought through if and how you want to know about message failures that do occur. In practice that means monitoring/alerting based on available metrics, and/or logging the message failures.

The Apache Kafka Documentation describes errors.tolerance in simple terms.

Behavior for tolerating errors during connector operation. ‘none’ is the default value and signals that any error will result in an immediate connector task failure; ‘all’ changes the behavior to skip over problematic records.

Despite the succinct description, there are two interesting points in it:

  • What errors will be skipped if we choose all?
  • What does it mean problematic records?

Not all errors are born equal

If we look into the Kafka codebase, we quickly find that the logic for this error control with tolerance is centralized in one class: RetryWithToleranceOperator.

There we can find this interesting fragment:

static {
    TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);

This static initialization controls what errors are considered tolerable, depending on when they occur. At the moment, errors are only tolerated when they happen in the key/value/header converter or during source/sink record transformation.

However, the Exception class does not represent all possible Java errors. Looking at the Java exception class hierarchy, we see other errors could escape this control mechanism.

Exception hierarchy in Java
Source: http://www.javawithus.com/tutorial/exception-hierarchy

That is expected, though. Reading the Javadoc for Error, this is the first paragraph:

An Error is a subclass of Throwable that indicates serious problems that a reasonable application should not try to catch. Most such errors are abnormal conditions.

Kudos to Kafka Connect for acting as a reasonable application and catching the right base exception 🙂

When is a record problematic?

Now that we know what errors are tolerated, when configured so, let’s dig a bit into what a “problematic record” represents.

As mentioned in the section above, the error tolerance mechanism is only applied during key/value/header converter and source/sink record transformation. However, there is much more to the lifecycle of a source/sink record than those steps. Once again, the Connect codebase reveals all the steps, codified in the Stage enum.

For a source task (one that takes information from a non-Kafka system and publishes it to Kafka), these are the steps:

Source task steps
Source task steps

Equally, for a sink task (takes records from Kafka and send them to a non-Kafka system), the steps are:

Sink task steps
Sink task steps

If we scrutinize these two diagrams, we will notice that the error control that Connect offers us is centered around the records manipulation. In other words, the steps that are wrapped by the RetryWithToleranceOperator class should perform almost entirely deterministic operations that are exempt from the perils of distributed systems. That is not true for the AvroConverter, which registers schemas with Schema Registry. However, that is a relatively straightforward interaction compared to the complexity that Kafka Producers and Consumers deal with, not to mention the bigger unknowns happening in the many Source and Sink Connector plugins that Kafka Connect support.

In a nutshell, a problematic error is one that fails by its data. There is another name for this pattern: poison pills


The error mechanism in Kafka Connect is focused on managing gracefully problematic records (a.k.a., poison pills) that cause all sorts of Exception errors. Anything else outside of this scope (e.g., issues when consuming or producing Kafka records) cannot be controlled by the user through the Connector configuration, as of today.

In a subsequent post, we will zoom into those other stages in the source and sink task pipelines that aren’t controlled by configuration. That is important to understand how a Kafka Connect connector can fail, independently of what connector plugin we are using.


Kafka quirks: topics with names too long


IMPORTANT: This bug is fixed in versions 2.1.2, 2.2.2, 2.3.0 and newer.

One of those bugs that you normally don’t come across… until you do. Officially, Kafka topic names can be up to 249 characters. If you try to create a topic with a name longer than that, it will reject it:

$ kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic [topic name with 250 characters]
Error while executing topic command : org.apache.kafka.common.errors.InvalidTopicException: Topic name is illegal, it can't be longer than 249 characters, topic name: [topic name with 250 characters]

Pretty self-explanatory, right? Well, it’s not that simple…

249 characters? Actually… 209

While officially advertised as 249, in reality, you should not create topics with names longer than 209 characters. Why? One of the comments in the Jira ticket report explains it easily:

This limit ends up not being quite right since topic deletion ends up renaming the directory to the form topic-partition.uniqueId-delete as can be seen in LogManager.asyncDelete.

If we do the math:

  • 249 –
  • 1 character for the “.”
  • 32 characters for the uniqueId (a.k.a. UUID)
  • 7 characters for “-delete”

The result is 209. That is the longest your topic name can be if you don’t want to hit the bug.

What if I use a topic name longer than 209?

Well, terrible things… if you try to delete the topic. Otherwise nothing. But if you do try to delete the topic, the first error that you will encounter is this:

ERROR Error while renaming dir for [your long topic name here]-0 in log dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: /var/lib/kafka/data/[your long topic name here]-0 -> /var/lib/kafka/data/[your long topic name here]-0.abb0d0dcdfd94e2b9222335d8bcaebcd-delete: File name too long

What is worse, your broker will keep failing upon restart, trying to remove the folder that is still pending to get deleted.

Get me out of this trap!

There is hope though if you can use a bit of SSH and connect to the broker that is failing (or maybe all of them are failing because they were all hosting partitions for the deleted topic).

Go to the directory where Kafka stores its data (log.dirs=/var/lib/kafka/data normally) and remove the folders for the partitions associated with the topic you want to get rid of.

Once that is deleted, it is time to mangle a bit with Zookeeper data. Be very careful here, if you make a mistake you will probably lose data (and maybe the whole cluster).

Log in into Zookeeper using zookeeper-shell and execute a couple of delete instructions to get rid of all traces of the faulty topic.

$ zookeeper-shell localhost:2181
rmr /admin/delete_topics/[your long topic name here]
rmr /brokers/topics/[your long topic name here]
rmr /config/topics/[your long topic name here]

Now, restart your broker(s) and they should have completely forgotten that this topic ever existed…

What about AWS Kafka (MSK)?

As of this writing, MSK supports three versions:

  • 1.1.1: this version is just “too old” (published in July, 2018) and I wouldn’t recommend using it. It suffers this bug too.
  • 2.1.1: definitively suffers this bug (personally experienced it…).
  • 2.3.1: is not affected by this bug.

Therefore, my recommendation is to run Kafka 2.3.1 and take advantage of this and other bug fixes and features.

Kafka quirks: tombstones that refuse to disappear

From https://www.clipart.email/clipart/tombstone-rip-clipart-83182.html

Recently in one of the clients I consult for, I came across a strange situation: tombstone records that “refused” to disappear.

The scenario was quite simple:

  1. Kafka Streams application that materializes some state (in RocksDB).
  2. From time to time, a punctuation kicks it, pulls all the accumulated records and sends them somewhere.
  3. Upon success, it deletes all the records and calls it a day.

However, when consuming the changelog topic, I notice that there were lots of tombstone records. Having some of them made sense, that is how a “delete” should be represented in a changelog topic. However, having so many that hadn’t cleared out was unexpected.

I applied a few strategies/changes until I finally made them “gone”.

Step 1 – Roll your segments more often

Compaction only happens when the file that contains your topic/partition data is rolled. Therefore, it is important to adjust when that happens if you want to influence the compaction process:

  • segment.ms: the segment can stay open for up to this value. By default, that is 7 days.
  • segment.bytes: the segment can stay open up to this number of bytes. The default here is 1 GB, which is too big for low traffic topics.

The defaults for these two settings have “big data” stamped on them. If you don’t have a “big data” topic, chances are the process won’t be responsive enough for you.

I tried setting them up to 60,000 ms (1 min) and 1,048,576 (1 MB) respectively… with no luck. Nothing changed; tombstones were still there.

Step 2 – Tolerate less dirtiness

It is also possible that, even if your segments are rolling regularly, the log compaction thread doesn’t pick up your topic/partition file because it is not dirty enough, meaning the ratio between entries that are candidates for compaction and those that aren’t is not meeting the configured threshold.

min.cleanable.dirty.ratio controls this threshold and it is 0.5 by default, meaning you need at least 50% of your topic/partition file with “dirty” entries for compaction to run. Anything below that, the thread doesn’t find it worth doing compaction on it.

My next step was to set this value to 0.01. This quite aggressive and I wouldn’t recommend it for most topics, unless you have low volume and you really, really want to keep your topic/partition spotless.

However, this didn’t do the trick either…

Step 3 – Be less nice with your replaying consumers

When a consumer is replaying a topic from the beginning, it might encounter this problem:

  1. Offset X contains a record with Key K and Value V.
  2. A few records “later” (maybe millions…), record with Key K again, bu with a Value null, AKA tombstone.
  3. If the consumer reads the first record, but compaction runs and gets rid of the second record (the tombstone), the consumer will never know that the record with Key K has been deleted.

To compensate for this scenario, Kafka has a config setting called delete.retention.ms that controls how long tombstones should be kept around for the benefit of these consumers. Its default: 1 day.

This is very useful, but it will also keep tombstones around unnecessarily if you don’t expect any replaying consumer to read a given topic or, at least, to take as long as 1 day.

My next attempt was to configure this down to 60,000 ms (1 minute)… but still not working.

Step 4 – It’s not a feature… it’s a bug

I ran out of options here so I thought that maybe this is one of those rare and unfortunate occasions when I hit one of Kafka bugs. Fired up a quick search on Google and… voila!

Tombstones can survive forever: https://issues.apache.org/jira/browse/KAFKA-8522

Long story short, under certain circumstances, tombstones will get their “timeouts” renew regularly, meaning they will not honor delete.retention.ms and stick around.

The only walkaround that seems to work is to set delete.retention.ms to zero, forcing the tombstones to be deleted immediately, instead of sticking around for the benefit of consumers replaying the topic.

However, this solution must be used with great care. For the scenario described at the beginning, a Kafka Streams app and a changelog topic, using this option can have unexpected side effects during the Restore phase, when the app reads its changelog topics to restore its state. If, while doing so, compaction kicked in, it might miss the tombstone records for entries that it has already consumed, keeping entries in its key/value store that should have been removed.

Unfortunately, until the bug is fixed, if your app needs all these tombstones evicted from the changelog, this seems to be the only option.

Getting started with AWS MSK

Amazon Managed Streaming for Apache Kafka

Apache Kafka is one of the technologies with the fastest popularity growth in the last 10 years. AWS, always vigilant for new tech to incorporate into its offering, launched its Kafka as a managed service in February 2019: Amazon MSK.

MSK follows the RDS model: customers choose how much hardware to provision (number of nodes, CPU, memory, etc.) and AWS manages the service for you. Since Kafka is a complex software that is not easy to operate, having AWS dealing with that for you is quite appealing, especially at the beginning of your journey with it.

In this article, we are going to look at how to provision your first MSK cluster, the different options that you encounter (and what they mean) and how to do a quick-and-dirty performance testing to understand how much the most humble cluster can actually process.

Creating the smallest cluster possible

The proces starts with logging into the AWS Console, selecting/searching MSK and clicking on “Create Cluster”. That leads you to a typically dry first screen with lots of options to select. Don’t be worry, we will see what they mean one by one.

Step 1 – Sofware version

Creating MSK cluster – step 1 – software version

Firstly, we are asked for a name for the new cluster. Choose your own adventure here.

Anybody familiar with AWS will recognize the option for VPC. The easiest (and least safe) option is to choose your default VPC, which will grant access to anything to everybody. After all, we are just testing here, right?

Finally, a more important choice: the Apache Kafka version. Since AWS MSK launched, they have consistently only supported x.y.1 versions, meaning 1.1.1, 2.1.1, 2.2.1, etc. Personally, I try to stay away from x.y.0 versions, especially for least mature components like Kafka Connect or Kafka Streams. Besides that rule, choose the newest version possible to stay away from annoying bugs like this.

Step 2 – Network options

Creating MSK cluster – step 2 – availability zones

MSK offers the option to deploy your Kafka brokers to as many as 3 availability zones, being also the recommended option for high availability.

Obviously, the more availability zones, the more brokers you will get provisioned (and the more expensive your cluster will be). For simplicity, let’s go with “3” and assign the default AZ and subnet existing in your default VPC.

Step 3 – Broker configuration

Creating MSK cluster – step 3 – broker configuration

Every Kafka Broker requires configuration for a number of properties. Apache Kafka comes with defaults for pretty much all of them. However, AWS MSK overrides some of them with its own defaults. In this section, it is possible to choose your own custom configuration for Apache Kafka, assuming you have created one. In a future post, we will see how to do that. For now, let’s run with the defaults.

Step 4 – Hardware and tags

Creating MSK cluster – step 4 – hardware and tags

Things are getting interesting now. You have to choose the hardware family/size of the EC2 instances that will power your Kafka cluster, plus how many of them to run per AZ (remember, we have chosen 3 AZs). For this example, let’s go with 1 broker per AZ.

Time to look at MSK pricing. For this example, I’m going to choose the cheapest options for both instance type and storage. That would cost me, on a monthly basis (eu-west-1 region, 30-days month):

  • kafka.m5.large: 168.48$ / month
  • 1000 GB storage: 0.11$ / month
  • Total: (168.48 * 3) + (0.11 * 3) = 505.77$ / month

For reference, EC2 instance m5.large cost 77.04$/month. AWS is charging you approx. 2x for managing your Kafka cluster.

UPDATE: I got good feedback on this point. When considering all costs involved (EC2 instances for Zookeeper nodes, EC2 instances for Broker nodes, replication traffic cost, etc., the overall cost of MSK is almost the same as running the cluster yourself (assuming your DevOps team works for free… which they don’t).

AWS has published a Pricing Calculator to size your MSK cluster correctly for your expected traffic; it also compares its cost with a self-managed option. Spoiler alert, you shouldn’t do it unless you really know what you’re doing (ample experience with both AWS and Kafka), and even then it is unclear to me why you would do that to yourself 🙂

WARNING: remember to delete your cluster once you are done with the tutorial or you will regret having followed it!!

Step 5 – Security options

Creating MSK cluster – step 5 – security options

In this section you choose a bunch of security-related options:

  1. Do you want to encrypt the communication between brokers? Yeah, why not!
  2. Do you want to force your clients to use SSL/TLS? For testing, probably allowing both TLS and plaintext is the best option. For production, you might want to restrict to TLS.
  3. Should I encrypt my data at rest? Definitively yes.
  4. Should I use TLS to authenticate clients? Well, you probably want to have some form of authentication for production environments, although depends on your security requirements. For testing your first cluster, leave it unticked.

We are almost there… one more step!

Step 6 – Monitoring

Creating MSK cluster – step 6 – monitoring options

You definitively want to monitor your cluster, even if this is a “managed” service. At the end of the day, your clients might have an impact on your cluster (load, resource consumption, etc.) that AWS will definitively not monitor or alert on.

You have two choices to make here:

  • What level of monitoring do you need? There are three options: basic, cluster level or topic level. Basic level, but cluster and topic level can save your day if the cluster starts acting weird. For instance, if you find one of your topics being really hot (lots of writes and/or reads).
  • Where do you want to send your metrics? For a test cluster, CloudWatch can be good enough. For a production cluster, consider Prometheus, especially if you are already using it.

Step 7 – Have a coffee (or tea)

Creating MSK cluster – step 7- waiting for new cluster

Just fly past the “Advanced Settings” section and click “Create Cluster” and… wait, a lot. Like 15 minutes… or more.

Step 8 – Provision the client

You are going to need a client that can connect to your newly created cluster, just to play a bit with it. Let’s provision a small EC2 instance, install Kafka command-line tools and give it a spin. I won’t go into too much detail here, I assume you already know how to do this with EC2:

  1. Navigate to EC2.
  2. Click on “Launch Instance” button.
  3. Select “Amazon Linux 2 AMI (HVM), SSD Volume Type”.
  4. Select “t2.micro” from the free tier.
  5. Keep all defaults for the rest of the options.
  6. Make sure that you have the key needed to SSH into the instance. Otherwise, create a new one.

Click on View Instance to go back to the EC2 Instances section. You should see your instance here. Select it and copy/paste its public IP.

Let’s SSH into this instance (don’t bother trying to connect to the IP in this example, by the time I publish this post I will have already deleted it :)). Make sure you have the key located and do:

ssh -i [path-to-your-key] ec2-user@[ec2-public-ip]

If ssh complains about the key permissions being too open, just do a chmod 600 [key-path]to make sure they are restricted enough to make ssh happy.

Step 9 – Installing Kafka command-line tools

We are going to need the command line tools to connect to our cluster. Luckily, you can easily curl all versions of Kafka from the official download page.

curl "https://www.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz" -o kafka.tgz
tar -xvzf kafka.tgz

Once the file is decompressed, you have a new folder like kafka_2.12-2.3.1. Navigate to the bin subfolder to find all the command-line tools there.

However, if we try to run any of the tools here, they will all fail because we don’t have Java installed in the machine. Let’s get that too:

sudo yum install java

You will be prompted with a summary of what is going to install. Accept and wait.

Step 10 – Connecting to your cluster

Once the installation is finished, let’s try to connect to our cluster. Head back to MSK main page, choose your cluster and click on the “View client information” button on the top-right side of the screen. A pop-up window opens with the details to connect to your cluster (TLS and/or plaintext) like the one in the picture below.

Connection details pop-up window

Let’s go back to the EC2 instance and we try to list topics with the following command:

./kafka-topics.sh --bootstrap-server [first-broker plaintext url] --list   

We launch the command, we wait, we wait a little bit more, even more… and eventually we get this error:

Step 11 – Opening the Kafka port

The EC2 instance is running in its own security group, created when the instance was launched. This group allows SSH traffic to the instances that belong to it, which is why we can connect from our computers to the instance.

The MSK cluster, on the other hand, is running in the VPC default security group. This group allows incoming traffic to any port when it originates in the group itself. However, it rejects the traffic coming from the security group where the EC2 is running.

security groups setup

The good news is it has an easy solution: change the default security group to accept traffic from the EC2 instance security group. Follow these steps:

  1. Head to the “Security Groups” section under EC2.
  2. Choose the “default” security group.
  3. Click on the “Edit” button.
  4. In the pop-up window, click on the “Add Rule” button.
  5. Choose:
    1. Type: Custom TCP Rule
    2. Protocol: TCP
    3. Port: 9092 (Kafka port)
    4. Source: Custom + the name of the EC2 security group
  6. Click on the “Save” button.

That is. Go back to the EC2 instance console and try the kafka-topics command again. This time it should return quickly, but without yielding a result (there isn’t any topic in the cluster yet).

Step 12 – Launching the performance producer tool

Let’s put some load through the system, just for fun. Firstly, we need to create a topic that we will use for performance testing.

./kafka-topics.sh --bootstrap-server [first-broker plaintext url] --create --topic performance-topic --partitions 4 --replication-factor 2

With this command, we are saying we want a topic with four partitions and that should be replicated twice.

description of the performance-topic topic

Once it is created, we can launch the performance producer.

./kafka-producer-perf-test.sh --topic performance-topic --num-records 1000000 --throughput 100 --producer-props bootstrap.servers=b-1.testing-cluster.34vag9.c4.kafka.eu-west-1.amazonaws.com:9092 acks=all --record-size 10240

What this command does is:

  1. Sends 1 million records of 10KB size.
  2. Awaits replication to complete (acks=all) up to the min.in.sync.replicas number (2 in this case).
sending load to the cluster

Step 13 – Launching the performance consumer tool

How can we know that these records are going somewhere? Well, we can obviously consume them back.

Run the following command from a separate SSH session.

./kafka-consumer-perf-test.sh --broker-list b-1.testing-cluster.34vag9.c4.kafka.eu-west-1.amazonaws.com:9092 --messages 1000000 --print-metrics --show-detailed-stats --topic performance-topic

What this command does is:

  1. Consumes 1 million records.
  2. Prints detailed stats while it does so.
consuming records from the cluster

Step 14 – Watching the metrics

We can also look at CloudWatch metrics to see them live, growing with the load we are sending to the cluster. Head to Cloud Watch in your AWS Console. Once there:

  1. Click on “Metrics”.
  2. Choose “AWS/Kafka”.
  3. Choose “Broker ID, Cluster Name, Topic”.

You will see that the only topic-level metrics available are for the topic just created (the cluster does not have any other topic at the moment). Click on “Bytes In” for the 3 brokers. You will see a growing graph like this one.

Cloud Watch metrics

Make sure to configure the “Metrics Period” to 1 minute (under “Graphed Metrics”) to have a more accurate visualization.

Step 15 – Delete everything if you don’t want to pay

Once you are satisfied with all your tests, it’s time to clean everything up and avoid nasty surprises when the AWS bill arrives at the end of the month.

Head to the EC2 section first to kill your EC2 instance and follow these steps:

  1. Select “Instances” on the left menu.
  2. Select your EC2 instance to kill.
  3. Click on the “Actions” button.
  4. Choose “Instant state” -> “Terminate”.
  5. In the pop-up window, click on “Yes, terminate”.

In a few minutes, your instance will be dead. Take this opportunity to also remove its orphan security group.

  1. Select “Security Groups” on the left menu.
  2. Select the old EC2 instance security group (something like launch-wizard).
  3. Click on the “Actions” button.
  4. Choose “Delete security group”.
  5. A pop-up window informs you that you can’t delete the security group because it is being referenced from another group (the default group, remember step 11).
  6. Choose the default group, click the “Edit” button and delete the Kafka related rule (TCP port 9092).
  7. Try to delete the EC2 security group again, this time the pop-up window displays a “Yes, Delete” button. Click it to remove the security group.

Last but not least, remove the Kafka cluster. Head to MSK and choose your instance there.

deleting cluster

Type “delete” on the pop-up window. Your cluster status will change to “deleting”. A minute later, it will be gone for good.


15 steps aren’t the simplest process possible, but if we think about it, we have covered a lot of ground:

  1. Created the cheapest possible cluster.
  2. Provisioned an EC2 instance with the Kafka command-line tools to test the cluster.
  3. Run performance producers and consumers.
  4. Monitored the cluster load with CloudWatch.

Even more important, with a really small cluster, we were sending 100 messages/s with a total load of 1 MB/s, from one single client, and our cluster didn’t even blink.

That is the power of Kafka, one of the fastest tools available in the market when it comes to moving data. And now, with AWS MSK, it is really easy to get a cluster up and running.

Java type hinting in avro-maven-plugin

Recently, somebody shared with me the following problem: an Avro schema in Schema Registry has magically evolved into a slightly different version, albeit still backward compatible.

Schemas changing…

The first version of the schema looked like this:

    "fields" [

After it changed, the schema looked like this:

    "fields" [

To add up to the confusing, this is a topic published by Kafka Connect using MySQL Debezium plugin. Neither the database schema nor Connect or Debezium versions had changed anywhere close to when the schema evolved.

How could this have happened?

The mystery guest…

Although nothing had changed in the stack that was polling record changes from the MySQL database and sending them to Kafka… there was a new element to consider.

After some conversations, it was apparent that there was a new application publishing records to the same topic, for testing. This application was:

  1. Downloading the schema from Schema Registry.
  2. Doing code-generation using avro-maven-plugin against the downloaded .asvc files from Schema Registry.
  3. Producing some records using the newly created Java POJO classes.

Those seem like the right steps. However, looking into the options of avro-maven-plugin, once stood up:

  /**  The Java type to use for Avro strings.  May be one of CharSequence,
   * String or Utf8.  CharSequence by default.
   * @parameter property="stringType"
  protected String stringType = "CharSequence";

Could it be the culprit?

stringType does more than you expect

While the description of the property suggests something as naive as instructing the Avro code generator what class to use for Avro strings… it does more than just that.

Comparing the code for POJOs generated using maven-avro-plugin two things are different. Firstly, fields like the UUID in the schema above change their type from java.lang.CharSequence to java.lang.String; this is as expected.

However, it also changes the internal Avro schema that every Java POJO stores in:

public static final org.apache.avro.Schema SCHEMA$;

Upon changing stringType to String the resulting schema in SCHEMA$ contains the extended type definition that we saw at the beginning. The Java POJOs define this property because it is sent to Schema Registry when producing records (only once, from there one it uses the returned schema id).

Since there is no canonical representation of an Avro schema, Schema Registry chooses to take the schema as is, ignoring that both schemas are semantically identical and it should not create a new version for it.

A solution?

Can we not use stringType = String? Yes, but then all POJOs are generated using CharSequence. In my opinion, that is the best option for mixed environments. After all, this extra hint in the schema only makes sense for Java consumers.

However, if you control the topic end to end (e.g., both producers and consumers), you might as well use with stringType = String by default and guarantee that every client uses String instead of CharSequence.

In any case, both schemas are backward compatible between themselves. A correct Avro library should result in the same schema representation in whatever language you have chosen to use.

Poison Pills in Kafka (I)

What is a poison pill?

A “poison pill” is a record that always fails when consumed, no matter how many times it is attempted. They come in different forms:

  • Corrupted records.
  • Records that make your consumer deserializer fail (e.g., an Avro record whose writer schema is not compatible with the consumer reader schema).

The problem with a poison pill is unless the consumer eventually handles it, it blocks the consumption of the topic/partition that contains it, halting the consumer progress.

What can we do with poison pills?

There are many different strategies to deal with poison pills. The Kafka Streams FAQ describes some of them:

  • Log an error and shut down the application
  • Log an error and skip the poisonous record
  • Send the record to a separate topic (e.g., Dead Letter Queue) for further inspection (likely by a human operator)
  • Transform the poisonous record into a sentinel value that identifies a record that couldn’t be interpreted, downstream

Kafka Streams to the rescue

A year ago Kafka Streams added options to handle corrupt data during deserialization. This PR set the foundations for dealing with poison pills in Kafka Streams:

  • Added the new interface DeserializationExceptionHandler.
  • Introduced two default implementations for LogAndContinue and LogAndFail.
  • Added a new default.deserialization.exception.handler configuration value for StreamsConfig. Its default value is LogAndFail.

How does it work?

The key in this implementation is RecordDeserializer. This class executes whatever exception handling we have configured for our Streams application.

kafka streams poison pills

In the diagram, the classes shaded in blue are exclusive to Kafka Streams, while the class is red is part of the common Kafka code base.

If the strategy is to continue, a return null bubbles up a null ConsumerRecord to the layer above, RecordQueue, which discards the record and continue with the next one.

On runtime, for LogAndContinue, a series of warnings are logged, and the consumption continues. On the other hand, for LogAndFail a StreamsException is thrown.

There is also a set of metrics that can be monitored to make sure we are not skipping too many records.

Metric Name Description JMX path
skipped-records-rate The average number of skipped records per second. kafka.streams:type=stream-metrics,client-id=([-.\w]+)
skipped-records-total The total number of skipped records. kafka.streams:type=stream-metrics,client-id=([-.\w]+)

What about Kafka Consumer?

If your application is not using Kafka Streams, there is no support for handling poison pills in the KafkaConsumer basic consumer.

However, not all is lost. In the next installment in this series, we will see how we can implement a very similar strategy leveraging the Serde interface that KafkaConsumer uses to deserialize records before delivering them to your application.

Kafka defaults that you should re-consider (I)

Image taken from http://www.htbdpodcast.com

There is a vast number of configuration options for Apache Kafka, mostly because the product can be fine-tuned to perform in various scenarios (e.g., low latency, high throughput, durability). These defaults span across brokers, producers and consumers (plus other sidecar products like Connect or Streams).

The guys at Kafka do their best to provide a comprehensive set of defaults that will just work, but some of them can be relatively dangerous if used blindly, as they might have unexpected side effects, or be optimized for a use case different to yours.

In this topic, I’d like to review the most obvious ones in the brokers’ side, explain what they do, why their default can be problematic and propose an alternative value.

Change these defaults


Defaults to ‘true’. You definitively want to change this one to false. Applications should be responsible for creating their topics, which the correct configuration settings for the various use cases.

If you keep it true, some other configuration values kick in to fulfill the default topic configuration:

  • log.retention.hours: by default, logs will be retained 7 days. Think carefully if this default is good enough. Any data older than that is not be available when replaying the topic.

  • min.insync.replicas: Default to 1. As the documentation mentions, a typical configuration is replication-factor minus 1, meaning with a replication factor of 3, min.insync.replicas should be 2. The problem with 1 is it puts you in a dangerous position, where the cluster accepts messages for which you only have 1 copy. On the other hand, a value equal to the replication factor means losing one node temporarily stops your cluster from accepting values until the missing partition has rebalanced to a healthy node.

  • default.replication.factor: Default to 1. This is a bad value since it effectively creates only one copy of an auto-created topic. If the disk that stores a partition of this topic dies, the data is lost. Even if there are backups, the consumers don’t benefit from automatic rebalancing to other brokers that have copies of the partition, resulting in consumption interruptions. I would suggest a value like 3 and then fine-tune topics that require more or less, independently.

  • num.partitions: Default to 1. Another bad value. If a topic only has one partition, it can be consumed by only one instance of an application at a time, hindering any parallelization that we might hope to achieve using Kadka. While partitions are not free and Kafka clusters have a limit on how many they can handle, a minimum value of 3 partitions per topic seems like a safer and more sensible default.


Defaults to 1400 minutes (24 hours). This is a dangerous default. Some applications might be idle over the weekend, meaning they don’t publish to Kafka during that period.

The morning after, if they restart before they consume from Kafka, the new instances don’t find any committed offsets for their consumer group, since they have expired.

At that point, the auto.offset.reset configuration in the consumer kicks in, sending the application to the earliest message, latest, or failing. In any case, this is not desirable.

The recommendation is to increase this value to something like 7 days for extra safeties.

Keep these defaults


Defaults to true. Unless you know what you’re doing, you don’t want to rebalance partitions manually. Let Kafka do it for you.


Defaults to true. If you find yourself in a highly regulated environment, you might not be allowed to delete anything, ever. Otherwise, allowing topic deletion guarantees that you can get rid of data quickly and easily.

That is especially useful in development clusters. Don’t set this to false there; you will shoot yourself on foot.


Default to ‘never’ (represented as a ridiculously long number of ms). Kafka is so performant because it enables zero-copy data transfers from producers to consumers.

While that is a fantastic mechanism for moving tons of data quickly, the durability aspect can be a concern. To account for that, Kafka proposes using replication across nodes to guarantee the information is lost, instead of explicitly flushing messages to disk as they come. The result of that is a lack of certainty about when the messages are actually written to the disk.

You could effectively force Kafka to flush to disk using this and other configuration properties. However, you would most likely kill Kafka performance in the process. Hence, the recommendation is to keep the default value.


Defaults to ‘-1’, which means messages are not acknowledged by a leader until they the min.in.sync.replicas value for the topic is honored.

That is a safe default, falling on the side of durability, versus lower latency. You should consider particular configurations at the topic level, dependent on the nature of the stored information (e.g., ‘logs’ been a lower value than ‘orders’).


Defaults to ’50’. Kafka automatically created the topic __consumer_offsets with this number of partitions. Since this is likely to be the busiest topic in your cluster, it’s a good idea to keep the number of partitions high so that the load is spread across as many nodes as possible.

__consumer_offsets cannot be changed for the lifetime of the cluster, so even if you are not planning to have 50 brokers in your cluster, it falls on the safe side to maintain this number as it is.


Defaults to ‘3’. Similar to the previous value, but to configure how many copies of your __consumer_offsets you want. 3 copies is a safe default and should probably only be changed to rise to a more significant number.

More copies of the topic would make your cluster more resilient in the event of broker failure since there would be more followers ready to that the role of the fallen leader.


Defaults to ‘false’. Used to be ‘true’ by default because it was optimized for availability. In the case of a leader dying without any follower been up to date, the cluster to continue operating if this value is set to ‘true’. Unfortunately, data loss would result..

However, after Aphyr roasted Kafka for this data loss scenario, Kafka introduced this configuration value and eventually changed it to ‘false’ to prevent data loss. With this default, the cluster stops operating until a follower that was up to date with the fallen leader arises (potentially, the fixed leader itself), preventing any loss.


There are many more configuration values that play essential roles in the broker side, and we haven’t even mentioned any of the values in the client side (e.g., consumers, producers). In following posts, I’ll jump into those and describe what sensible defaults are and what you should think twice before blindly embracing.

Timeouts in Kafka clients and Kafka Streams

Broken chain

UPDATE (Kafka client v2.0):

  • Also as part of KIP-266, the default value of request.timeout.ms has been changed to 30 seconds. The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take. Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from max.poll.interval.ms for the request timeout. All other request types use the timeout defined by request.timeout.ms

IMPORTANT: This is information is based on Kafka and Kafka Streams 1.0.0. Past or future versions may defer.

As with any distributed system, Kafka relies on timeouts to detect failures. Those timeouts can be sent by clients and brokers that want to detect each other unavailability. The following is a description of the configuration values that control timeouts that both brokers and client will use to detect clients not being available.

The original design for the Poll() method in the Java consumer tried to kill two birds with one stone:

  • Guarantee consumer liveness
  • Guarantee progress as well, since a consumer could be alive but not moving forward

However, this design caused a few problems. The solution was to introduce separate configuration values and background thread based heartbeat mechanism.


Since Kafka, the heartbeat happens from a separate, background thread, different to the thread where Poll() runs. The description for the configuration value is:

The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

The default value is 3 seconds. This heartbeat will guarantee an early detection when the consumer goes down, maybe due to an unexpected exception killing the process. However, back pressure or slow processing will not affect this heartbeat.


Introduced with Kafka as well, compensates for the background heart-beating but introducing a limit between Poll() calls. The description for the configuration value is:

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

The default value is 30 seconds, except for Kafka Streams, which increases it to Integer.MAX_VALUE.

When the timeout expires, the consumer will stop heart-beating and will leave the consumer group explicitly. With this new configuration value, we can set an upper limit to how long we expect a batch of records to be processed. Together with max.poll.record and the appropriate timeouts for third party calls, we should be able to determine fairly accurately how long an application may stay unresponsive while processing records.

Also, max.poll.interval.ms has a role in rebalances. Since we know it represents how long processing a batch can take, it is also implicitly timeout for how long a client should be awaited in the event of a rebalance. Therefore, the client sends this value when it joins the consumer group. On the event of a rebalance, the broker will wait this timeout for a client to respond, before kicking it out of the consumer group.


Finally, while the previous values are used to get the client willingly out of the consumer group, this value controls when the broker can push it out itself. The description for this configuration value is:

The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.

The default is 10 seconds. Clients have to define a value between the range defined by group.min.session.timeout.ms and group.max.session.timeout.ms, which are defined in the broker side.

What does this all mean?

In a nutshell, it means that you have to configure two types of timeouts: heartbeat timeout and processing timeout. The former accounts for clients going down and the second for clients taking too long to make progress.

Heartbeating will be controlled by the expected heartbeat.interval.ms and the upper limit defined by session.timeout.ms.

Processing will be controlled by max.poll.interval.ms. On the client side, kicking the client out of the consumer group when the timeout expires. On the server side, communicating to the broker what is the expected rebalancing timeout.

The Integer.MAX_VALUE Kafka Streams default

max.poll.interval.ms default for Kafka Streams was changed to Integer.MAX_VALUE in Kafka to strength its robustness in the scenario of larga state restores. Fortunately, after changes to the library in 0.11 and 1.0, this large value is not necessary anymore.

In any case, it is still recommended to use a generous timeout in case of calls to external third parties from a stream topology. For a node that goes down, session.timeout.ms will quickly be triggered since the background heartbeat will stop.

For a node that is simply taking too long to process records, the assumption is any other instance picking up those records would suffer the same delays with the third party.


Separating max.poll.interval.ms and session.timeout.ms allows a tighter control over applications going down with shorter session.timeout.ms, while still giving them room for longer processing times with an extended max.poll.interval.ms.

This is specially useful for Kafka Streams applications, where we can hook complicated, long-running, processing for every record.


Incompatible AVRO schema in Schema Registry

My company uses Apache Kafka as the spine for its next-generation architecture. Kafka is a distributed append-only log that can be used as a pub-sub mechanism. We use Kafka to publish events once business processes have completed successfully, allowing a high degree of decoupling between producers and consumers.

These events are encoded using Avro schemas. Avro is a binary serialization format that enables a compact representation of data, much more than, for instance, JSON. Given the high volume of events we publish to kafka, using a compact format is critical.

In combination with Avro we use Confluent’s Schema Registry to manage our schemas. The registry provides a RESTful API to store and retrieve schemas.

Compatibility modes

The Schema Registry can control what schemas get registered, ensuring a certain level of compatibility between existing and new schemas. This compatibility can be set to one of the next four modes:

  • BACKWARD: a new schema is allowed if it can be used to read all data ever published into the corresponding topic.
  • FORWARD: a new schema is allowed if it can be used to write data that all previous schemas would be able to read.
  • FULL: a new schema that fullfils both registrations.
  • NONE: a schema is allowed as long as it is valid Avro.

By default, Schema Registry sets BACKWARD compatibility, which is most likely your preferred option in PROD environment, unless you want to have a hard time with your consumers not quite understanding events published with a newer, incompatible version of the schema.

Incompatible schemas

In development phase it is perfectly fine to replace schemas with others that are incompatible. Schema Registry will prevent updating the existing schema to an incompatible newer version unless we change its default setting.

Fortunately Schema Registry offers a complete API that allows to register and retrieve schemas, but also to change some of its configuration. More specifically, it offers a /config endpoint to PUT new values for its compatibility setting.

The following command would change the compatibility setting to NONE for all schemas in the Registry:

curl -X PUT http://your-schema-registry-address/config 
     -d '{"compatibility": "NONE"}'
     -H "Content-Type:application/json"

This way next registration would be allowed by the Registry as long as the newer schema were valid Avro. The configuration can be set for an specific schema too, simply appending the name (i.e., /config/subject-name).

Once the incompatible schema has been registered, the setting should be set back to a more cautious value.


The combination of Kafka, Avro and Schema Registry is a great way to store your events in the most compact way possible, while still retains the ability to evolve the corresponding schemas.

However some of the limitations that the Schema Registry imposes make less sense on a development environment. On some occassions, making incompatible changes in a simple way is necessary and recommendable.

The Schema Registry API allows changing the compatibility setting to accept schemas that, otherwise, would be rejected.