Issue
I have a kafka listener configured in our Spring Boot application as follows:
@KafkaListener(topicPartitions = @TopicPartition(topic = 'data.all', partitions = { "0", "1", "2" }), groupId = "kms")
public void listen(ObjectNode message) throws JsonProcessingException {
// Code to convert to json string and write to ElasticSearch
}
This application gets deployed to and run on 3 servers and, despite all having the group id of kms
, they all get a copy of the message which means I get 3 identical records in Elastic. When I'm running an instance locally, 4 copies get written.
I've confirmed that the producer is only writing 1 message to the topic by checking the count of all messages on the topic before and after the write occurs; it only increases by 1. How can I prevent this?
Solution
When you manually assign partitions like that, you are responsible for distributing the partitions across the instances.
The group is ignored for the purpose of partition distribution, but is still used to track offsets, if needed.
You must use group management and let Kafka do the partition assignment for you, or assign the partitions manually for each instance.
Instead of topicPartitions
, use topics = "data.all"
Answered By - Gary Russell
Answer Checked By - Willingham (JavaFixing Volunteer)