Tag Archives: connect

Error tolerance in Kafka Connect (I)

Explosion
Source: https://www.freeiconspng.com/img/9157

If you have done your homework and read the Confluent blog, you probably have seen this great deep-dive into Connect error handling and Dead Letter Queues post.

Robin Moffatt does a great job explaining how Kafka Connect manages error based on connectors configuration, including sending failed records to DLQs for Sink connectors. The blog post also introduces the concept of “Error tolerance” as something that can be configured per connector and Connect will honor:

We’ve seen how setting errors.tolerance = all will enable Kafka Connect to just ignore bad messages. When it does, by default it won’t log the fact that messages are being dropped. If you do set errors.tolerance = all, make sure you’ve carefully thought through if and how you want to know about message failures that do occur. In practice that means monitoring/alerting based on available metrics, and/or logging the message failures.

The Apache Kafka Documentation describes errors.tolerance in simple terms.

Behavior for tolerating errors during connector operation. ‘none’ is the default value and signals that any error will result in an immediate connector task failure; ‘all’ changes the behavior to skip over problematic records.

Despite the succinct description, there are two interesting points in it:

  • What errors will be skipped if we choose all?
  • What does it mean problematic records?

Not all errors are born equal

If we look into the Kafka codebase, we quickly find that the logic for this error control with tolerance is centralized in one class: RetryWithToleranceOperator.

There we can find this interesting fragment:

static {
    TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
    TOLERABLE_EXCEPTIONS.put(Stage.HEADER_CONVERTER, Exception.class);
    TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);
    TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
}

This static initialization controls what errors are considered tolerable, depending on when they occur. At the moment, errors are only tolerated when they happen in the key/value/header converter or during source/sink record transformation.

However, the Exception class does not represent all possible Java errors. Looking at the Java exception class hierarchy, we see other errors could escape this control mechanism.

Exception hierarchy in Java
Source: http://www.javawithus.com/tutorial/exception-hierarchy

That is expected, though. Reading the Javadoc for Error, this is the first paragraph:

An Error is a subclass of Throwable that indicates serious problems that a reasonable application should not try to catch. Most such errors are abnormal conditions.

Kudos to Kafka Connect for acting as a reasonable application and catching the right base exception 🙂

When is a record problematic?

Now that we know what errors are tolerated, when configured so, let’s dig a bit into what a “problematic record” represents.

As mentioned in the section above, the error tolerance mechanism is only applied during key/value/header converter and source/sink record transformation. However, there is much more to the lifecycle of a source/sink record than those steps. Once again, the Connect codebase reveals all the steps, codified in the Stage enum.

For a source task (one that takes information from a non-Kafka system and publishes it to Kafka), these are the steps:

Source task steps
Source task steps

Equally, for a sink task (takes records from Kafka and send them to a non-Kafka system), the steps are:

Sink task steps
Sink task steps

If we scrutinize these two diagrams, we will notice that the error control that Connect offers us is centered around the records manipulation. In other words, the steps that are wrapped by the RetryWithToleranceOperator class should perform almost entirely deterministic operations that are exempt from the perils of distributed systems. That is not true for the AvroConverter, which registers schemas with Schema Registry. However, that is a relatively straightforward interaction compared to the complexity that Kafka Producers and Consumers deal with, not to mention the bigger unknowns happening in the many Source and Sink Connector plugins that Kafka Connect support.

In a nutshell, a problematic error is one that fails by its data. There is another name for this pattern: poison pills

Conclusion

The error mechanism in Kafka Connect is focused on managing gracefully problematic records (a.k.a., poison pills) that cause all sorts of Exception errors. Anything else outside of this scope (e.g., issues when consuming or producing Kafka records) cannot be controlled by the user through the Connector configuration, as of today.

In a subsequent post, we will zoom into those other stages in the source and sink task pipelines that aren’t controlled by configuration. That is important to understand how a Kafka Connect connector can fail, independently of what connector plugin we are using.

References

Java type hinting in avro-maven-plugin

Recently, somebody shared with me the following problem: an Avro schema in Schema Registry has magically evolved into a slightly different version, albeit still backward compatible.

Schemas changing…

The first version of the schema looked like this:

{
    "type":"record",
    "name":"Key",
    "namespace":"my-namespace",
    "fields" [
        {
            "name":"UUID",
            "type":"string"
        }
    ],
    "connect.name":"my-topic.Key"
}

After it changed, the schema looked like this:

{
    "type":"record",
    "name":"Key",
    "namespace":"my-namespace",
    "fields" [
        {
            "name":"UUID",
            "type":{
                "type":"string",
                "avro.java.string":"String"
            }
        }
    ],
    "connect.name":"my-topic.Key"
}

To add up to the confusing, this is a topic published by Kafka Connect using MySQL Debezium plugin. Neither the database schema nor Connect or Debezium versions had changed anywhere close to when the schema evolved.

How could this have happened?

The mystery guest…

Although nothing had changed in the stack that was polling record changes from the MySQL database and sending them to Kafka… there was a new element to consider.

After some conversations, it was apparent that there was a new application publishing records to the same topic, for testing. This application was:

  1. Downloading the schema from Schema Registry.
  2. Doing code-generation using avro-maven-plugin against the downloaded .asvc files from Schema Registry.
  3. Producing some records using the newly created Java POJO classes.

Those seem like the right steps. However, looking into the options of avro-maven-plugin, once stood up:

  /**  The Java type to use for Avro strings.  May be one of CharSequence,
   * String or Utf8.  CharSequence by default.
   *
   * @parameter property="stringType"
   */
  protected String stringType = "CharSequence";

Could it be the culprit?

stringType does more than you expect

While the description of the property suggests something as naive as instructing the Avro code generator what class to use for Avro strings… it does more than just that.

Comparing the code for POJOs generated using maven-avro-plugin two things are different. Firstly, fields like the UUID in the schema above change their type from java.lang.CharSequence to java.lang.String; this is as expected.

However, it also changes the internal Avro schema that every Java POJO stores in:

public static final org.apache.avro.Schema SCHEMA$;

Upon changing stringType to String the resulting schema in SCHEMA$ contains the extended type definition that we saw at the beginning. The Java POJOs define this property because it is sent to Schema Registry when producing records (only once, from there one it uses the returned schema id).

Since there is no canonical representation of an Avro schema, Schema Registry chooses to take the schema as is, ignoring that both schemas are semantically identical and it should not create a new version for it.

A solution?

Can we not use stringType = String? Yes, but then all POJOs are generated using CharSequence. In my opinion, that is the best option for mixed environments. After all, this extra hint in the schema only makes sense for Java consumers.

However, if you control the topic end to end (e.g., both producers and consumers), you might as well use with stringType = String by default and guarantee that every client uses String instead of CharSequence.

In any case, both schemas are backward compatible between themselves. A correct Avro library should result in the same schema representation in whatever language you have chosen to use.