Kafka quirks: topics with names too long

http://clipart-library.com/clipart/8cznLELEi.htm

IMPORTANT: This bug is fixed in versions 2.1.2, 2.2.2, 2.3.0 and newer.

One of those bugs that you normally don’t come across… until you do. Officially, Kafka topic names can be up to 249 characters. If you try to create a topic with a name longer than that, it will reject it:

$ kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic [topic name with 250 characters]
Error while executing topic command : org.apache.kafka.common.errors.InvalidTopicException: Topic name is illegal, it can't be longer than 249 characters, topic name: [topic name with 250 characters]

Pretty self-explanatory, right? Well, it’s not that simple…

249 characters? Actually… 209

While officially advertised as 249, in reality, you should not create topics with names longer than 209 characters. Why? One of the comments in the Jira ticket report explains it easily:

This limit ends up not being quite right since topic deletion ends up renaming the directory to the form topic-partition.uniqueId-delete as can be seen in LogManager.asyncDelete.

If we do the math:

  • 249 –
  • 1 character for the “.”
  • 32 characters for the uniqueId (a.k.a. UUID)
  • 7 characters for “-delete”

The result is 209. That is the longest your topic name can be if you don’t want to hit the bug.

What if I use a topic name longer than 209?

Well, terrible things… if you try to delete the topic. Otherwise nothing. But if you do try to delete the topic, the first error that you will encounter is this:

ERROR Error while renaming dir for [your long topic name here]-0 in log dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: /var/lib/kafka/data/[your long topic name here]-0 -> /var/lib/kafka/data/[your long topic name here]-0.abb0d0dcdfd94e2b9222335d8bcaebcd-delete: File name too long

What is worse, your broker will keep failing upon restart, trying to remove the folder that is still pending to get deleted.

Get me out of this trap!

There is hope though if you can use a bit of SSH and connect to the broker that is failing (or maybe all of them are failing because they were all hosting partitions for the deleted topic).

Go to the directory where Kafka stores its data (log.dirs=/var/lib/kafka/data normally) and remove the folders for the partitions associated with the topic you want to get rid of.

Once that is deleted, it is time to mangle a bit with Zookeeper data. Be very careful here, if you make a mistake you will probably lose data (and maybe the whole cluster).

Log in into Zookeeper using zookeeper-shell and execute a couple of delete instructions to get rid of all traces of the faulty topic.

$ zookeeper-shell localhost:2181
rmr /admin/delete_topics/[your long topic name here]
rmr /brokers/topics/[your long topic name here]
rmr /config/topics/[your long topic name here]
quit

Now, restart your broker(s) and they should have completely forgotten that this topic ever existed…

What about AWS Kafka (MSK)?

As of this writing, MSK supports three versions:

  • 1.1.1: this version is just “too old” (published in July, 2018) and I wouldn’t recommend using it. It suffers this bug too.
  • 2.1.1: definitively suffers this bug (personally experienced it…).
  • 2.3.1: is not affected by this bug.

Therefore, my recommendation is to run Kafka 2.3.1 and take advantage of this and other bug fixes and features.

Kafka quirks: tombstones that refuse to disappear

From https://www.clipart.email/clipart/tombstone-rip-clipart-83182.html

Recently in one of the clients I consult for, I came across a strange situation: tombstone records that “refused” to disappear.

The scenario was quite simple:

  1. Kafka Streams application that materializes some state (in RocksDB).
  2. From time to time, a punctuation kicks it, pulls all the accumulated records and sends them somewhere.
  3. Upon success, it deletes all the records and calls it a day.

However, when consuming the changelog topic, I notice that there were lots of tombstone records. Having some of them made sense, that is how a “delete” should be represented in a changelog topic. However, having so many that hadn’t cleared out was unexpected.

I applied a few strategies/changes until I finally made them “gone”.

Step 1 – Roll your segments more often

Compaction only happens when the file that contains your topic/partition data is rolled. Therefore, it is important to adjust when that happens if you want to influence the compaction process:

  • segment.ms: the segment can stay open for up to this value. By default, that is 7 days.
  • segment.bytes: the segment can stay open up to this number of bytes. The default here is 1 GB, which is too big for low traffic topics.

The defaults for these two settings have “big data” stamped on them. If you don’t have a “big data” topic, chances are the process won’t be responsive enough for you.

I tried setting them up to 60,000 ms (1 min) and 1,048,576 (1 MB) respectively… with no luck. Nothing changed; tombstones were still there.

Step 2 – Tolerate less dirtiness

It is also possible that, even if your segments are rolling regularly, the log compaction thread doesn’t pick up your topic/partition file because it is not dirty enough, meaning the ratio between entries that are candidates for compaction and those that aren’t is not meeting the configured threshold.

min.cleanable.dirty.ratio controls this threshold and it is 0.5 by default, meaning you need at least 50% of your topic/partition file with “dirty” entries for compaction to run. Anything below that, the thread doesn’t find it worth doing compaction on it.

My next step was to set this value to 0.01. This quite aggressive and I wouldn’t recommend it for most topics, unless you have low volume and you really, really want to keep your topic/partition spotless.

However, this didn’t do the trick either…

Step 3 – Be less nice with your replaying consumers

When a consumer is replaying a topic from the beginning, it might encounter this problem:

  1. Offset X contains a record with Key K and Value V.
  2. A few records “later” (maybe millions…), record with Key K again, bu with a Value null, AKA tombstone.
  3. If the consumer reads the first record, but compaction runs and gets rid of the second record (the tombstone), the consumer will never know that the record with Key K has been deleted.

To compensate for this scenario, Kafka has a config setting called delete.retention.ms that controls how long tombstones should be kept around for the benefit of these consumers. Its default: 1 day.

This is very useful, but it will also keep tombstones around unnecessarily if you don’t expect any replaying consumer to read a given topic or, at least, to take as long as 1 day.

My next attempt was to configure this down to 60,000 ms (1 minute)… but still not working.

Step 4 – It’s not a feature… it’s a bug

I ran out of options here so I thought that maybe this is one of those rare and unfortunate occasions when I hit one of Kafka bugs. Fired up a quick search on Google and… voila!

Tombstones can survive forever: https://issues.apache.org/jira/browse/KAFKA-8522

Long story short, under certain circumstances, tombstones will get their “timeouts” renew regularly, meaning they will not honor delete.retention.ms and stick around.

The only walkaround that seems to work is to set delete.retention.ms to zero, forcing the tombstones to be deleted immediately, instead of sticking around for the benefit of consumers replaying the topic.

However, this solution must be used with great care. For the scenario described at the beginning, a Kafka Streams app and a changelog topic, using this option can have unexpected side effects during the Restore phase, when the app reads its changelog topics to restore its state. If, while doing so, compaction kicked in, it might miss the tombstone records for entries that it has already consumed, keeping entries in its key/value store that should have been removed.

Unfortunately, until the bug is fixed, if your app needs all these tombstones evicted from the changelog, this seems to be the only option.

Getting started with AWS MSK

Amazon Managed Streaming for Apache Kafka

Apache Kafka is one of the technologies with the fastest popularity growth in the last 10 years. AWS, always vigilant for new tech to incorporate into its offering, launched its Kafka as a managed service in February 2019: Amazon MSK.

MSK follows the RDS model: customers choose how much hardware to provision (number of nodes, CPU, memory, etc.) and AWS manages the service for you. Since Kafka is a complex software that is not easy to operate, having AWS dealing with that for you is quite appealing, especially at the beginning of your journey with it.

In this article, we are going to look at how to provision your first MSK cluster, the different options that you encounter (and what they mean) and how to do a quick-and-dirty performance testing to understand how much the most humble cluster can actually process.

Creating the smallest cluster possible

The proces starts with logging into the AWS Console, selecting/searching MSK and clicking on “Create Cluster”. That leads you to a typically dry first screen with lots of options to select. Don’t be worry, we will see what they mean one by one.

Step 1 – Sofware version

Creating MSK cluster – step 1 – software version

Firstly, we are asked for a name for the new cluster. Choose your own adventure here.

Anybody familiar with AWS will recognize the option for VPC. The easiest (and least safe) option is to choose your default VPC, which will grant access to anything to everybody. After all, we are just testing here, right?

Finally, a more important choice: the Apache Kafka version. Since AWS MSK launched, they have consistently only supported x.y.1 versions, meaning 1.1.1, 2.1.1, 2.2.1, etc. Personally, I try to stay away from x.y.0 versions, especially for least mature components like Kafka Connect or Kafka Streams. Besides that rule, choose the newest version possible to stay away from annoying bugs like this.

Step 2 – Network options

Creating MSK cluster – step 2 – availability zones

MSK offers the option to deploy your Kafka brokers to as many as 3 availability zones, being also the recommended option for high availability.

Obviously, the more availability zones, the more brokers you will get provisioned (and the more expensive your cluster will be). For simplicity, let’s go with “3” and assign the default AZ and subnet existing in your default VPC.

Step 3 – Broker configuration

Creating MSK cluster – step 3 – broker configuration

Every Kafka Broker requires configuration for a number of properties. Apache Kafka comes with defaults for pretty much all of them. However, AWS MSK overrides some of them with its own defaults. In this section, it is possible to choose your own custom configuration for Apache Kafka, assuming you have created one. In a future post, we will see how to do that. For now, let’s run with the defaults.

Step 4 – Hardware and tags

Creating MSK cluster – step 4 – hardware and tags

Things are getting interesting now. You have to choose the hardware family/size of the EC2 instances that will power your Kafka cluster, plus how many of them to run per AZ (remember, we have chosen 3 AZs). For this example, let’s go with 1 broker per AZ.

Time to look at MSK pricing. For this example, I’m going to choose the cheapest options for both instance type and storage. That would cost me, on a monthly basis (eu-west-1 region, 30-days month):

  • kafka.m5.large: 168.48$ / month
  • 1000 GB storage: 0.11$ / month
  • Total: (168.48 * 3) + (0.11 * 3) = 505.77$ / month

For reference, EC2 instance m5.large cost 77.04$/month. AWS is charging you approx. 2x for managing your Kafka cluster.

UPDATE: I got good feedback on this point. When considering all costs involved (EC2 instances for Zookeeper nodes, EC2 instances for Broker nodes, replication traffic cost, etc., the overall cost of MSK is almost the same as running the cluster yourself (assuming your DevOps team works for free… which they don’t).

AWS has published a Pricing Calculator to size your MSK cluster correctly for your expected traffic; it also compares its cost with a self-managed option. Spoiler alert, you shouldn’t do it unless you really know what you’re doing (ample experience with both AWS and Kafka), and even then it is unclear to me why you would do that to yourself 🙂

WARNING: remember to delete your cluster once you are done with the tutorial or you will regret having followed it!!

Step 5 – Security options

Creating MSK cluster – step 5 – security options

In this section you choose a bunch of security-related options:

  1. Do you want to encrypt the communication between brokers? Yeah, why not!
  2. Do you want to force your clients to use SSL/TLS? For testing, probably allowing both TLS and plaintext is the best option. For production, you might want to restrict to TLS.
  3. Should I encrypt my data at rest? Definitively yes.
  4. Should I use TLS to authenticate clients? Well, you probably want to have some form of authentication for production environments, although depends on your security requirements. For testing your first cluster, leave it unticked.

We are almost there… one more step!

Step 6 – Monitoring

Creating MSK cluster – step 6 – monitoring options

You definitively want to monitor your cluster, even if this is a “managed” service. At the end of the day, your clients might have an impact on your cluster (load, resource consumption, etc.) that AWS will definitively not monitor or alert on.

You have two choices to make here:

  • What level of monitoring do you need? There are three options: basic, cluster level or topic level. Basic level, but cluster and topic level can save your day if the cluster starts acting weird. For instance, if you find one of your topics being really hot (lots of writes and/or reads).
  • Where do you want to send your metrics? For a test cluster, CloudWatch can be good enough. For a production cluster, consider Prometheus, especially if you are already using it.

Step 7 – Have a coffee (or tea)

Creating MSK cluster – step 7- waiting for new cluster

Just fly past the “Advanced Settings” section and click “Create Cluster” and… wait, a lot. Like 15 minutes… or more.

Step 8 – Provision the client

You are going to need a client that can connect to your newly created cluster, just to play a bit with it. Let’s provision a small EC2 instance, install Kafka command-line tools and give it a spin. I won’t go into too much detail here, I assume you already know how to do this with EC2:

  1. Navigate to EC2.
  2. Click on “Launch Instance” button.
  3. Select “Amazon Linux 2 AMI (HVM), SSD Volume Type”.
  4. Select “t2.micro” from the free tier.
  5. Keep all defaults for the rest of the options.
  6. Make sure that you have the key needed to SSH into the instance. Otherwise, create a new one.

Click on View Instance to go back to the EC2 Instances section. You should see your instance here. Select it and copy/paste its public IP.

Let’s SSH into this instance (don’t bother trying to connect to the IP in this example, by the time I publish this post I will have already deleted it :)). Make sure you have the key located and do:

ssh -i [path-to-your-key] ec2-user@[ec2-public-ip]

If ssh complains about the key permissions being too open, just do a chmod 600 [key-path]to make sure they are restricted enough to make ssh happy.

Step 9 – Installing Kafka command-line tools

We are going to need the command line tools to connect to our cluster. Luckily, you can easily curl all versions of Kafka from the official download page.

curl https://downloads.apache.org/kafka/2.3.1/kafka_2.12-2.3.1.tgz -o kafka.tgz
tar -xvzf kafka.tgz

Once the file is decompressed, you have a new folder like kafka_2.12-2.3.1. Navigate to the bin subfolder to find all the command-line tools there.

However, if we try to run any of the tools here, they will all fail because we don’t have Java installed in the machine. Let’s get that too:

sudo yum install java

You will be prompted with a summary of what is going to install. Accept and wait.

Step 10 – Connecting to your cluster

Once the installation is finished, let’s try to connect to our cluster. Head back to MSK main page, choose your cluster and click on the “View client information” button on the top-right side of the screen. A pop-up window opens with the details to connect to your cluster (TLS and/or plaintext) like the one in the picture below.

Connection details pop-up window

Let’s go back to the EC2 instance and we try to list topics with the following command:

./kafka-topics.sh --bootstrap-server [first-broker plaintext url] --list   

We launch the command, we wait, we wait a little bit more, even more… and eventually we get this error:

Step 11 – Opening the Kafka port

The EC2 instance is running in its own security group, created when the instance was launched. This group allows SSH traffic to the instances that belong to it, which is why we can connect from our computers to the instance.

The MSK cluster, on the other hand, is running in the VPC default security group. This group allows incoming traffic to any port when it originates in the group itself. However, it rejects the traffic coming from the security group where the EC2 is running.

security groups setup

The good news is it has an easy solution: change the default security group to accept traffic from the EC2 instance security group. Follow these steps:

  1. Head to the “Security Groups” section under EC2.
  2. Choose the “default” security group.
  3. Click on the “Edit” button.
  4. In the pop-up window, click on the “Add Rule” button.
  5. Choose:
    1. Type: Custom TCP Rule
    2. Protocol: TCP
    3. Port: 9092 (Kafka port)
    4. Source: Custom + the name of the EC2 security group
  6. Click on the “Save” button.

That is. Go back to the EC2 instance console and try the kafka-topics command again. This time it should return quickly, but without yielding a result (there isn’t any topic in the cluster yet).

Step 12 – Launching the performance producer tool

Let’s put some load through the system, just for fun. Firstly, we need to create a topic that we will use for performance testing.

./kafka-topics.sh --bootstrap-server [first-broker plaintext url] --create --topic performance-topic --partitions 4 --replication-factor 2

With this command, we are saying we want a topic with four partitions and that should be replicated twice.

description of the performance-topic topic

Once it is created, we can launch the performance producer.

./kafka-producer-perf-test.sh --topic performance-topic --num-records 1000000 --throughput 100 --producer-props bootstrap.servers=b-1.testing-cluster.34vag9.c4.kafka.eu-west-1.amazonaws.com:9092 acks=all --record-size 10240

What this command does is:

  1. Sends 1 million records of 10KB size.
  2. Awaits replication to complete (acks=all) up to the min.in.sync.replicas number (2 in this case).
sending load to the cluster

Step 13 – Launching the performance consumer tool

How can we know that these records are going somewhere? Well, we can obviously consume them back.

Run the following command from a separate SSH session.

./kafka-consumer-perf-test.sh --broker-list b-1.testing-cluster.34vag9.c4.kafka.eu-west-1.amazonaws.com:9092 --messages 1000000 --print-metrics --show-detailed-stats --topic performance-topic

What this command does is:

  1. Consumes 1 million records.
  2. Prints detailed stats while it does so.
consuming records from the cluster

Step 14 – Watching the metrics

We can also look at CloudWatch metrics to see them live, growing with the load we are sending to the cluster. Head to Cloud Watch in your AWS Console. Once there:

  1. Click on “Metrics”.
  2. Choose “AWS/Kafka”.
  3. Choose “Broker ID, Cluster Name, Topic”.

You will see that the only topic-level metrics available are for the topic just created (the cluster does not have any other topic at the moment). Click on “Bytes In” for the 3 brokers. You will see a growing graph like this one.

Cloud Watch metrics

Make sure to configure the “Metrics Period” to 1 minute (under “Graphed Metrics”) to have a more accurate visualization.

Step 15 – Delete everything if you don’t want to pay

Once you are satisfied with all your tests, it’s time to clean everything up and avoid nasty surprises when the AWS bill arrives at the end of the month.

Head to the EC2 section first to kill your EC2 instance and follow these steps:

  1. Select “Instances” on the left menu.
  2. Select your EC2 instance to kill.
  3. Click on the “Actions” button.
  4. Choose “Instant state” -> “Terminate”.
  5. In the pop-up window, click on “Yes, terminate”.

In a few minutes, your instance will be dead. Take this opportunity to also remove its orphan security group.

  1. Select “Security Groups” on the left menu.
  2. Select the old EC2 instance security group (something like launch-wizard).
  3. Click on the “Actions” button.
  4. Choose “Delete security group”.
  5. A pop-up window informs you that you can’t delete the security group because it is being referenced from another group (the default group, remember step 11).
  6. Choose the default group, click the “Edit” button and delete the Kafka related rule (TCP port 9092).
  7. Try to delete the EC2 security group again, this time the pop-up window displays a “Yes, Delete” button. Click it to remove the security group.

Last but not least, remove the Kafka cluster. Head to MSK and choose your instance there.

deleting cluster

Type “delete” on the pop-up window. Your cluster status will change to “deleting”. A minute later, it will be gone for good.

Conclusions

15 steps aren’t the simplest process possible, but if we think about it, we have covered a lot of ground:

  1. Created the cheapest possible cluster.
  2. Provisioned an EC2 instance with the Kafka command-line tools to test the cluster.
  3. Run performance producers and consumers.
  4. Monitored the cluster load with CloudWatch.

Even more important, with a really small cluster, we were sending 100 messages/s with a total load of 1 MB/s, from one single client, and our cluster didn’t even blink.

That is the power of Kafka, one of the fastest tools available in the market when it comes to moving data. And now, with AWS MSK, it is really easy to get a cluster up and running.