Recently in one of the clients I consult for, I came across a strange situation: tombstone records that “refused” to disappear.
The scenario was quite simple:
Kafka Streams application that materializes some state (in RocksDB).
From time to time, a punctuation kicks it, pulls all the accumulated records and sends them somewhere.
Upon success, it deletes all the records and calls it a day.
However, when consuming the changelog topic, I notice that there were lots of tombstone records. Having some of them made sense, that is how a “delete” should be represented in a changelog topic. However, having so many that hadn’t cleared out was unexpected.
I applied a few strategies/changes until I finally made them “gone”.
Step 1 – Roll your segments more often
Compaction only happens when the file that contains your topic/partition data is rolled. Therefore, it is important to adjust when that happens if you want to influence the compaction process:
segment.ms: the segment can stay open for up to this value. By default, that is 7 days.
segment.bytes: the segment can stay open up to this number of bytes. The default here is 1 GB, which is too big for low traffic topics.
The defaults for these two settings have “big data” stamped on them. If you don’t have a “big data” topic, chances are the process won’t be responsive enough for you.
I tried setting them up to 60,000 ms (1 min) and 1,048,576 (1 MB) respectively… with no luck. Nothing changed; tombstones were still there.
Step 2 – Tolerate less dirtiness
It is also possible that, even if your segments are rolling regularly, the log compaction thread doesn’t pick up your topic/partition file because it is not dirty enough, meaning the ratio between entries that are candidates for compaction and those that aren’t is not meeting the configured threshold.
min.cleanable.dirty.ratio controls this threshold and it is 0.5 by default, meaning you need at least 50% of your topic/partition file with “dirty” entries for compaction to run. Anything below that, the thread doesn’t find it worth doing compaction on it.
My next step was to set this value to 0.01. This quite aggressive and I wouldn’t recommend it for most topics, unless you have low volume and you really, really want to keep your topic/partition spotless.
However, this didn’t do the trick either…
Step 3 – Be less nice with your replaying consumers
When a consumer is replaying a topic from the beginning, it might encounter this problem:
Offset X contains a record with Key K and Value V.
A few records “later” (maybe millions…), record with Key K again, bu with a Value null, AKA tombstone.
If the consumer reads the first record, but compaction runs and gets rid of the second record (the tombstone), the consumer will never know that the record with Key K has been deleted.
To compensate for this scenario, Kafka has a config setting called delete.retention.ms that controls how long tombstones should be kept around for the benefit of these consumers. Its default: 1 day.
This is very useful, but it will also keep tombstones around unnecessarily if you don’t expect any replaying consumer to read a given topic or, at least, to take as long as 1 day.
My next attempt was to configure this down to 60,000 ms (1 minute)… but still not working.
Step 4 – It’s not a feature… it’s a bug
I ran out of options here so I thought that maybe this is one of those rare and unfortunate occasions when I hit one of Kafka bugs. Fired up a quick search on Google and… voila!
Long story short, under certain circumstances, tombstones will get their “timeouts” renew regularly, meaning they will not honor delete.retention.ms and stick around.
The only walkaround that seems to work is to set delete.retention.ms to zero, forcing the tombstones to be deleted immediately, instead of sticking around for the benefit of consumers replaying the topic.
However, this solution must be used with great care. For the scenario described at the beginning, a Kafka Streams app and a changelog topic, using this option can have unexpected side effects during the Restore phase, when the app reads its changelog topics to restore its state. If, while doing so, compaction kicked in, it might miss the tombstone records for entries that it has already consumed, keeping entries in its key/value store that should have been removed.
Unfortunately, until the bug is fixed, if your app needs all these tombstones evicted from the changelog, this seems to be the only option.
Apache Kafka is one of the technologies with the fastest popularity growth in the last 10 years. AWS, always vigilant for new tech to incorporate into its offering, launched its Kafka as a managed service in February 2019: Amazon MSK.
MSK follows the RDS model: customers choose how much hardware to provision (number of nodes, CPU, memory, etc.) and AWS manages the service for you. Since Kafka is a complex software that is not easy to operate, having AWS dealing with that for you is quite appealing, especially at the beginning of your journey with it.
In this article, we are going to look at how to provision your first MSK cluster, the different options that you encounter (and what they mean) and how to do a quick-and-dirty performance testing to understand how much the most humble cluster can actually process.
Creating the smallest cluster possible
The proces starts with logging into the AWS Console, selecting/searching MSK and clicking on “Create Cluster”. That leads you to a typically dry first screen with lots of options to select. Don’t be worry, we will see what they mean one by one.
Step 1 – Sofware version
Firstly, we are asked for a name for the new cluster. Choose your own adventure here.
Anybody familiar with AWS will recognize the option for VPC. The easiest (and least safe) option is to choose your default VPC, which will grant access to anything to everybody. After all, we are just testing here, right?
Finally, a more important choice: the Apache Kafka version. Since AWS MSK launched, they have consistently only supported x.y.1 versions, meaning 1.1.1, 2.1.1, 2.2.1, etc. Personally, I try to stay away from x.y.0 versions, especially for least mature components like Kafka Connect or Kafka Streams. Besides that rule, choose the newest version possible to stay away from annoying bugs like this.
Step 2 – Network options
MSK offers the option to deploy your Kafka brokers to as many as 3 availability zones, being also the recommended option for high availability.
Obviously, the more availability zones, the more brokers you will get provisioned (and the more expensive your cluster will be). For simplicity, let’s go with “3” and assign the default AZ and subnet existing in your default VPC.
Things are getting interesting now. You have to choose the hardware family/size of the EC2 instances that will power your Kafka cluster, plus how many of them to run per AZ (remember, we have chosen 3 AZs). For this example, let’s go with 1 broker per AZ.
Time to look at MSK pricing. For this example, I’m going to choose the cheapest options for both instance type and storage. That would cost me, on a monthly basis (eu-west-1 region, 30-days month):
For reference, EC2 instance m5.large cost 77.04$/month. AWS is charging you approx. 2x for managing your Kafka cluster.
UPDATE: I got good feedback on this point. When considering all costs involved (EC2 instances for Zookeeper nodes, EC2 instances for Broker nodes, replication traffic cost, etc., the overall cost of MSK is almost the same as running the cluster yourself (assuming your DevOps team works for free… which they don’t).
AWS has published a Pricing Calculator to size your MSK cluster correctly for your expected traffic; it also compares its cost with a self-managed option. Spoiler alert, you shouldn’t do it unless you really know what you’re doing (ample experience with both AWS and Kafka), and even then it is unclear to me why you would do that to yourself 🙂
WARNING: remember to delete your cluster once you are done with the tutorial or you will regret having followed it!!
Step 5 – Security options
In this section you choose a bunch of security-related options:
Do you want to encrypt the communication between brokers? Yeah, why not!
Do you want to force your clients to use SSL/TLS? For testing, probably allowing both TLS and plaintext is the best option. For production, you might want to restrict to TLS.
Should I encrypt my data at rest? Definitively yes.
Should I use TLS to authenticate clients? Well, you probably want to have some form of authentication for production environments, although depends on your security requirements. For testing your first cluster, leave it unticked.
We are almost there… one more step!
Step 6 – Monitoring
You definitively want to monitor your cluster, even if this is a “managed” service. At the end of the day, your clients might have an impact on your cluster (load, resource consumption, etc.) that AWS will definitively not monitor or alert on.
You have two choices to make here:
What level of monitoring do you need? There are three options: basic, cluster level or topic level. Basic level, but cluster and topic level can save your day if the cluster starts acting weird. For instance, if you find one of your topics being really hot (lots of writes and/or reads).
Where do you want to send your metrics? For a test cluster, CloudWatch can be good enough. For a production cluster, consider Prometheus, especially if you are already using it.
Step 7 – Have a coffee (or tea)
Just fly past the “Advanced Settings” section and click “Create Cluster” and… wait, a lot. Like 15 minutes… or more.
Step 8 – Provision the client
You are going to need a client that can connect to your newly created cluster, just to play a bit with it. Let’s provision a small EC2 instance, install Kafka command-line tools and give it a spin. I won’t go into too much detail here, I assume you already know how to do this with EC2:
Navigate to EC2.
Click on “Launch Instance” button.
Select “Amazon Linux 2 AMI (HVM), SSD Volume Type”.
Select “t2.micro” from the free tier.
Keep all defaults for the rest of the options.
Make sure that you have the key needed to SSH into the instance. Otherwise, create a new one.
Click on View Instance to go back to the EC2 Instances section. You should see your instance here. Select it and copy/paste its public IP.
Let’s SSH into this instance (don’t bother trying to connect to the IP in this example, by the time I publish this post I will have already deleted it :)). Make sure you have the key located and do:
curl "https://www.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz" -o kafka.tgz
tar -xvzf kafka.tgz
Once the file is decompressed, you have a new folder like kafka_2.12-2.3.1. Navigate to the bin subfolder to find all the command-line tools there.
However, if we try to run any of the tools here, they will all fail because we don’t have Java installed in the machine. Let’s get that too:
sudo yum install java
You will be prompted with a summary of what is going to install. Accept and wait.
Step 10 – Connecting to your cluster
Once the installation is finished, let’s try to connect to our cluster. Head back to MSK main page, choose your cluster and click on the “View client information” button on the top-right side of the screen. A pop-up window opens with the details to connect to your cluster (TLS and/or plaintext) like the one in the picture below.
Let’s go back to the EC2 instance and we try to list topics with the following command:
We launch the command, we wait, we wait a little bit more, even more… and eventually we get this error:
Step 11 – Opening the Kafka port
The EC2 instance is running in its own security group, created when the instance was launched. This group allows SSH traffic to the instances that belong to it, which is why we can connect from our computers to the instance.
The MSK cluster, on the other hand, is running in the VPC default security group. This group allows incoming traffic to any port when it originates in the group itself. However, it rejects the traffic coming from the security group where the EC2 is running.
The good news is it has an easy solution: change the default security group to accept traffic from the EC2 instance security group. Follow these steps:
Head to the “Security Groups” section under EC2.
Choose the “default” security group.
Click on the “Edit” button.
In the pop-up window, click on the “Add Rule” button.
Type: Custom TCP Rule
Port: 9092 (Kafka port)
Source: Custom + the name of the EC2 security group
Click on the “Save” button.
That is. Go back to the EC2 instance console and try the kafka-topics command again. This time it should return quickly, but without yielding a result (there isn’t any topic in the cluster yet).
Step 12 – Launching the performance producer tool
Let’s put some load through the system, just for fun. Firstly, we need to create a topic that we will use for performance testing.
We can also look at CloudWatch metrics to see them live, growing with the load we are sending to the cluster. Head to Cloud Watch in your AWS Console. Once there:
Click on “Metrics”.
Choose “Broker ID, Cluster Name, Topic”.
You will see that the only topic-level metrics available are for the topic just created (the cluster does not have any other topic at the moment). Click on “Bytes In” for the 3 brokers. You will see a growing graph like this one.
Make sure to configure the “Metrics Period” to 1 minute (under “Graphed Metrics”) to have a more accurate visualization.
Step 15 – Delete everything if you don’t want to pay
Once you are satisfied with all your tests, it’s time to clean everything up and avoid nasty surprises when the AWS bill arrives at the end of the month.
Head to the EC2 section first to kill your EC2 instance and follow these steps:
Select “Instances” on the left menu.
Select your EC2 instance to kill.
Click on the “Actions” button.
Choose “Instant state” -> “Terminate”.
In the pop-up window, click on “Yes, terminate”.
In a few minutes, your instance will be dead. Take this opportunity to also remove its orphan security group.
Select “Security Groups” on the left menu.
Select the old EC2 instance security group (something like launch-wizard).
Click on the “Actions” button.
Choose “Delete security group”.
A pop-up window informs you that you can’t delete the security group because it is being referenced from another group (the default group, remember step 11).
Choose the default group, click the “Edit” button and delete the Kafka related rule (TCP port 9092).
Try to delete the EC2 security group again, this time the pop-up window displays a “Yes, Delete” button. Click it to remove the security group.
Last but not least, remove the Kafka cluster. Head to MSK and choose your instance there.
Type “delete” on the pop-up window. Your cluster status will change to “deleting”. A minute later, it will be gone for good.
15 steps aren’t the simplest process possible, but if we think about it, we have covered a lot of ground:
Created the cheapest possible cluster.
Provisioned an EC2 instance with the Kafka command-line tools to test the cluster.
Run performance producers and consumers.
Monitored the cluster load with CloudWatch.
Even more important, with a really small cluster, we were sending 100 messages/s with a total load of 1 MB/s, from one single client, and our cluster didn’t even blink.
That is the power of Kafka, one of the fastest tools available in the market when it comes to moving data. And now, with AWS MSK, it is really easy to get a cluster up and running.