There is no built-in way to remove all messages from a Kafka topic. Yet, sometimes we have to do that, for example, when we make a mistake, send many incorrect messages to Kafka, and end up in a situation when it’s easier to remove everything and start from a known state than try to fix the issue.
To remove the messages, we have two options:
- set the retention time to one second, wait a little while, and restore the previous configuration
- remove the topic and create it again
Set the retention time
- First, I recommend checking the current retention time because we will need it later. We can get the retention time in milliseconds using this command:
kafka-topics --zookeeper <the_host_ip>:<port> --describe --topic <topic_name>.
- Now, we can change the retention time to a shorter value:
kafka-configs.sh --zookeeper <the_host_ip>:<port> --entity-type topics --alter --entity-name <topic_name> --add-config retention.ms=1000.
- After that, we should wait a minute or two because Kafka purges the old messages periodically. You can find more information about log compaction and the retention period in my other article: https://www.mikulskibartosz.name/what-is-kafka-log-compaction-how-does-it-work/
- Restore the previous retention time.
Re-create the topic
This method has some caveats. First of all, deletion does not happen instantaneously. We will have to wait a little bit. Second, this method requires recreating partitions, and the clients must reconnect to Kafka. If you have poorly written client code, you may end up with errors in other applications that suddenly lost the topic and cannot restore regular operation. Finally, suppose you have enabled automatic topic creation, and a client attempts to access the topic after you remove it. In that case, you will end up with a misconfigured topic (unless that’s your usual method of creating topics and the client will provide all required parameters).
If you decided that re-creating the topic is the right way to purge the messages, here is how to do that:
- Remove the topic:
kafka-topics.sh --zookeeper <the_host_ip>:<port> --delete --topic <topic_name>
- Use the same command that you originally used to create the topic to make a new one and specify its configuration. If you are ok with default values, this one will work:
kafka-topics.sh --create --zookeeper <the_host_ip>:<port> --replication-factor <replication> --partitions <partitions> --topic <topic_name>