In this article, I am going to explain the concept of log compaction in Apache Kafka. After that, I describe the configuration parameters related to log compaction. In the end, I am going to look at the Kafka source code and explain how the log compaction works under the hood.
What is Kafka log compaction?
When we send a message to a Kafka topic, we have to specify the key and the content of the message. Kafka uses the key to select the partition which stores the message. It is possible because Kafka calculates the hashcode of the provided key.
A hashcode of a constant value always remains the same. Therefore, if we send multiple messages with the same key, all of them end up in the same partition. Because of that, log compaction happens within a single Kafka broker (+ partition replicas) and does not require coordination with other nodes of the Kafka cluster.
Periodically, the Kafka broker scans the log to remove old messages. In general, Kafka removes messages when they are too old or when the log contains too much data. In the Kafka configuration, we can specify two parameters
log.retention.ms, which configures the time-based retention policy and
log.retention.bytes that controls the log size-based retention.
Log compaction is a separate kind of retention policy that we can enable in addition to the feature that removes old messages. It allows limiting the size of the Kafka log by overwriting old messages with their new version.
According to the Kafka documentation, it retains at least the last known value for each message key within the log for a single topic partition. Later, in the “how does log compaction work” section, I am going to explain why we get “at least the last known value,” instead of “only the last value.”
How to identify message updates
While doing the log compaction, Kafka identifies the new versions of messages by comparing the message key and the offset. If we send multiple messages with the same key to a topic that configured to use compaction, Kafka retains only the message with the highest offset.
Assume that the Kafka log contains five such messages in this order:
key: 1234, value: version_1, offset: 1 key: 5678, value: version_2, offset: 2 key: 1234, value: version_3, offset: 3 key: 1234, value: version_4, offset: 4 key: 5678, value: version_5, offset: 5
When Kafka executes log compaction, we are guaranteed to still have messages those messages in the log:
key: 1234, value: version_4, offset: 4 key: 5678, value: version_5, offset: 5
In some cases, the log may still contain the old versions of messages in addition to the latest ones. It happens because we are promised to get “at least the last known value.”
Note that the order of messages and their offsets never changes! Also, we can still request the message with offset 3, but instead of it, Kafka returns the next existing offset, and we get the message with offset 4.
It is possible to remove a message from a Kafka topic by sending a new message with the key of the message we want to delete and null content. In this case, Kafka log compaction overwrites the existing version with null.
Such deletion markers are a special case in Kafka code because the message with null content does not stay in the topic forever. When the time defined by the
delete.retention.ms parameter passes, Kafka is going to remove the deletion marker from the topic even if it is the most recent version of the event identified by the given key.
We can use Kafka log compaction to maintain a full snapshot of the final value for every key while implementing an event sourcing application.
In addition to that, we can use it to keep a backup copy of data processed by downstream consumers. In case of a consumer failure, we can restore its state by replaying the snapshot from a Kafka topic.
Such an implementation is used in the Apache Samza framework to keep a backup copy of its in-memory database. Apache Samza uses the LevelDB database for state management when executing joins and aggregates. Because LevelDB is an in-memory database, Samza flushes data changes to a Kafka topic and uses it for data recovery after a failure.
Last but not least, the log compaction feature lets us fulfill GDPR requirements while using Kafka. GDPR was one of the reasons why the maximum log compaction lag parameter was added to Kafka (KIP-354). Now, we can overwrite personal data with a null and be sure that after some time, Kafka permanently removes the data from the disc.
How to configure it?
First, we have to configure the log retention policy by setting the
log.cleanup.policy parameter. It is set to
delete by default. We may replace the default policy with
compact or use both by settings its value to:
When we have log compaction enabled, we also should specify four more parameters:
min.compaction.lag.ms- the minimal time that has to pass before the message can be compacted. That is one of the reasons why we cannot expect to see only the most recent version of the message in the topic.
min.cleanable.dirty.ratio- the minimal percentage (expressed as a ratio) of the partition log that must be “dirty” before Kafka attempts to compact messages.
max.compaction.lag.ms- the maximum delay between the time a message is written and the time the message becomes eligible for compaction. This configuration parameter overwrites
min.cleanable.dirty.ratioand forces a log segment to become compactable even if the “dirty ratio” is lower than the threshold.
delete.retention.ms- how long to retain deletion markers in the cleaned log if the empty content (
null) is the most recent version of a given message key.
A few words of caution regarding delete.retention.ms
If we send a message with null content to remove it from the topic, the segment of the topic that holds the message gets compacted, and the delete retention time passes, the deletion marker gets removed from the log. Because of that, slow consumers may miss the deletion marker!
We may end up with an inconsistent state across many applications if the consumer uses the deletion marker to remove data from its database.
How does log compaction work?
Every topic partition consists of log segments. Log compaction ignores the segments that have been previously cleaned, the active segment of the log, and all segments which contain at least one message with the timestamp within the minimal compaction lag. Every other segment of the log can be compacted.
The ignored segments are another reason why the topic may contain more versions of the events assigned to one key. The newest version may be in a segment that is not eligible for compaction yet, and Kafka is just removing some of the older versions.
Log compaction is implemented in the
cleanFilthiestLog method of a
CleanerThread instance calls the
grabFilthiestCompactedLog method of
LogCleanermanager to get the dirtiest log segment. It is the log segment that has the highest ratio of the number of cleanable bytes to the total number of bytes.
After that, the thread builds an
OffsetMap is a data structure that contains every key within a log segment and the highest offset of that key.
The segment and its
OffsetMap is passed to the
cleanSegments function. The function creates a new segment and iterates over messages within the segment given as a parameter.
For every message, it checks whether the message offset matches the offset of the most recent occurrence of the message key. If
OffsetMap indicates that there is a message with the same key and higher offset, the currently processed message is ignored. Otherwise, the message is copied to the new segment.
In the end, the
cleanSegments method flushes the replacement segment to the disc and replaces the dirty segment of the log with the newly created cleaned one.