In this article, I am going to describe the internals of Kafka Connect, explain how it uses the Sink and Source Connectors, and how it tracks the offsets of the messages it has processed.

Table of Contents

  1. What is Kafka Connect
  2. The Architecture of Kafka Connect
  3. The Herder Interface
  4. The Worker Class
  5. The Worker Tasks

The text should be useful for those of you who want to know how that project works but don’t want to spend a few hours reading its source code.

What is Kafka Connect

Kafka Connect is a tool that facilitates the usage of Kafka as the centralized data hub by providing the feature of copying the data from external systems into Kafka and propagating the messages from Kafka to external systems.

Note that, Kafka Connect only copies the data. It should never be used to do stream processing on its own. To perform any operations on the content of a Kafka topic, we should use KSQL or custom applications that read from one Kafka stream, transform the values, and write the output into another stream.

We should use the data transformation feature provided by Kafka Connect only to convert the original data format into Kafka-compatible messages.

I must mention that Kafka Connect guarantees at least-once-delivery, which means that in some cases, we may read the same information multiple times from the source system or write the same message to the destination location.

The Architecture of Kafka Connect

First, we must take a look at the essential building blocks of the Kafka Connect API. When we implement a new connector, we must provide the code of either a SinkConnector or a SourceConnector and an implementation or a SinkTask or a SourceTask.

A Connector defines the task configuration (the name of the task implementation class and its parameters). Connectors return a collection of task configuration parameters and can notify Kafka Connect when those tasks need reconfiguration.

When we want to reconfigure the tasks, we must use the requestTaskReconfiguration method of ConnectorContext, which is passed as the parameter of the Connector initialize method.

Kafka Connect manages Tasks, and we don’t need to worry about creating Task instances. The developers must specify only the methods that read/write messages and keep track of the message offset. When Kafka Connect runs in distributed mode, the Tasks run on different machines, so the instances should be independent and not share any state.

The general advice is to use Connectors to perform a broad copy. Hence, it is better to have one Connector responsible for copying data from the whole database than to configure separate connectors for every table. Of course, the Connector implementation may divide the work between multiple Task instances.

The Herder Interface

The Herder is the main interface to make changes to the Kafka Connect cluster. Both REST API and CLI use it to configure Kafka Connect. In the source code, there are two implementations of the Herder: one for the single node cluster and one for the distributed mode.

In addition to start/stop methods that start or terminate the whole cluster, the Herder implementation contains methods that update or remove the connector configuration (putConnectorConfig and deleteConnectorConfig). Surprisingly, then we look at their source code in the StandaloneHerder class, we see that those methods not only change the configuration but also start/stop the connectors.

For me, the most interesting part of the StandaloneHerder implementation is the ConfigUpdateListener inner class, which reacts to connector status changes in the ConfigBackingStore. The inner class contains the onConnectorTargetStateChange method, which updates the internal status of the Worker that runs the code that does all of the heavy-lifting in Kafka Connect.

The Worker Class

The Javadoc describes this class as the “container of threads that run tasks.” That sentence accurately illustrates the content of the class. When we start a new Connector, the Worker takes the canonical name of the Connector class and uses reflection to instantiate it. After that, it passes the configuration to the class to initialize it and changes its state to STARTED.

As we already know, a change of state triggers the listener, and, as a consequence, starts new WorkerTasks.

The Worker Tasks

A WorkerTask consists of converters that turn message keys, values, and headers into bytes, which is a required step to send them to Kafka. In addition to those three objects, the WorkerTask gets an instance of the TransformationChain, which encapsulates all of the message transformers defined as Source/Sink configuration parameters. Moreover, the code passes the classes that provide access to metrics, configuration, current time, retry policy, etc.

If we create a WorkerSourceTask, the Worker class also passes classes used to track data offset (CloseableOffsetStorageReader and OffsetStorageWriter) and a `

Subscribe to the newsletter
Older post

Why my Airflow tasks got stuck in "no_status" and how I fixed it

A story about debugging an Airflow DAG that was not starting tasks

Newer post

The problem with software testing in data engineering

Why data engineers don't write unit tests?