UPDATE (Kafka client v2.0):
- Also as part of KIP-266, the default value of
request.timeout.mshas been changed to 30 seconds. The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take. Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from
max.poll.interval.msfor the request timeout. All other request types use the timeout defined by
IMPORTANT: This is information is based on Kafka and Kafka Streams 1.0.0. Past or future versions may defer.
As with any distributed system, Kafka relies on timeouts to detect failures. Those timeouts can be sent by clients and brokers that want to detect each other unavailability. The following is a description of the configuration values that control timeouts that both brokers and client will use to detect clients not being available.
The original design for the Poll() method in the Java consumer tried to kill two birds with one stone:
- Guarantee consumer liveness
- Guarantee progress as well, since a consumer could be alive but not moving forward
However, this design caused a few problems. The solution was to introduce separate configuration values and background thread based heartbeat mechanism.
Since Kafka 0.10.1.0, the heartbeat happens from a separate, background thread, different to the thread where Poll() runs. The description for the configuration value is:
The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
The default value is 3 seconds. This heartbeat will guarantee an early detection when the consumer goes down, maybe due to an unexpected exception killing the process. However, back pressure or slow processing will not affect this heartbeat.
Introduced with Kafka 0.10.1.0 as well, compensates for the background heart-beating but introducing a limit between Poll() calls. The description for the configuration value is:
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
The default value is 30 seconds, except for Kafka Streams, which increases it to
When the timeout expires, the consumer will stop heart-beating and will leave the consumer group explicitly. With this new configuration value, we can set an upper limit to how long we expect a batch of records to be processed. Together with
max.poll.record and the appropriate timeouts for third party calls, we should be able to determine fairly accurately how long an application may stay unresponsive while processing records.
max.poll.interval.ms has a role in rebalances. Since we know it represents how long processing a batch can take, it is also implicitly timeout for how long a client should be awaited in the event of a rebalance. Therefore, the client sends this value when it joins the consumer group. On the event of a rebalance, the broker will wait this timeout for a client to respond, before kicking it out of the consumer group.
Finally, while the previous values are used to get the client willingly out of the consumer group, this value controls when the broker can push it out itself. The description for this configuration value is:
The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.
The default is 10 seconds. Clients have to define a value between the range defined by
group.max.session.timeout.ms, which are defined in the broker side.
What does this all mean?
In a nutshell, it means that you have to configure two types of timeouts: heartbeat timeout and processing timeout. The former accounts for clients going down and the second for clients taking too long to make progress.
Heartbeating will be controlled by the expected
heartbeat.interval.ms and the upper limit defined by
Processing will be controlled by
max.poll.interval.ms. On the client side, kicking the client out of the consumer group when the timeout expires. On the server side, communicating to the broker what is the expected rebalancing timeout.
Integer.MAX_VALUE Kafka Streams default
max.poll.interval.ms default for Kafka Streams was changed to
Integer.MAX_VALUE in Kafka 0.10.2.1 to strength its robustness in the scenario of larga state restores. Fortunately, after changes to the library in 0.11 and 1.0, this large value is not necessary anymore.
In any case, it is still recommended to use a generous timeout in case of calls to external third parties from a stream topology. For a node that goes down,
session.timeout.ms will quickly be triggered since the background heartbeat will stop.
For a node that is simply taking too long to process records, the assumption is any other instance picking up those records would suffer the same delays with the third party.
session.timeout.ms allows a tighter control over applications going down with shorter
session.timeout.ms, while still giving them room for longer processing times with an extended
This is specially useful for Kafka Streams applications, where we can hook complicated, long-running, processing for every record.
- KIP-62: Allow consumer to send heartbeats from a background thread
- Kafka Mailist – Kafka Streams – max.poll.interval.ms defaults to Integer.MAX_VALUE
- Difference between session.timeout.ms and max.poll.interval.ms for Kafka 0.10.0.0 and later versions
- Kafka 0.10.1 heartbeat.interval.ms, session.timeout.ms and max.poll.interval.ms
3 thoughts on “Timeouts in Kafka clients and Kafka Streams”
I still am not getting the use of heartbeat.interval.ms. With Kafka 10.0.x heartbeat was only sent to the coordinator with the invocation of poll() and the max wait time is session.timeout.ms. Then, what is heartbeat.interval.ms used for?
The heartbeat runs on a separate thread from the polling thread. This PR introduced it in 0.10.1: https://github.com/apache/kafka/commit/40b1dd3f495a59abef8a0cba5450526994c92c04
The idea is the client will not be detected as dead by the broker when it’s making progress slowly. Before this PR, if a client polled 5 records and needed 1 sec to process each, it would have taken 5 seconds between heartbeats ran by the Poll() loop. The broker would have presumed the client dead and run a rebalance in the consumer group. With this new feature, it would still be kept alive and making progress normally.
Easy to understand and crisp information. Thanks a much…!!!