Issue
I'm working on an spring boot app configure like this :
@SpringBootApplication
@EnableRetry
public class SpringBootApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootApp.class, args);
}
}
I've a Kafka listener hooked up to a topic which can have 5 different schemas. To handle this (it might be the wrong way, I'm open to suggestion on this part) we are doing this :
private void fillEvents(ConsumerRecords<Key, SpecificRecord> events) {
events.forEach(event -> {
SpecificRecord value = event.value();
if (value instanceof A a) {
aEvents.add(a);
} else if (value instanceof B b){
bEvents.add(b)
}
....
});
and in the main listener :
@KafkaListener(topics = "topicName", groupId = "myApp", containerFactory = "listenerFactory")
public void receive(ConsumerRecords<Key, SpecificRecord> events) {
Splitter splitter = new Splitter(events); //doing the fillEvents(..) from above
aService.handleEvents(splitter.getAEvents());
bService.handleEvents(splitter.getBEvents());
...
}
In each service we can have other processes (like file's integration or API's calls) that might modify the same resources we want to in our kafka listener. We have a MongoDB database so we are handling the persistence with a classic spring data layer : save(entity). But to avoid failure due to multiple access we are doing this (in each service) :
...
public void handleEvents(List<A> events) {
events.forEach(event -> processEvent(event));
}
@Retryable(value = {OptimisticLockingFailureException.class, DuplicateKeyException.class, MongoCommandException.class},
maxAttempts = 100,
backoff = @Backoff(random = true, delay = 200, maxDelay = 5000, multiplier = 2))
public void processEvent(A event) {
refresh() // in case of failure (from retryable) we are refreshing dependencies
processBusinessRules(event) // processing event with business rules
aRepository.save(event)
}
We are facing a case when our kafka listener polls about 30 messages, with both A and B instances. The handling of A fail due to OptimisticFailureException (a bug which we have now identified) but B are not processed. It seems that the thread stops just after 1 failure and do not retries the processEvent method. The poll is only processed again thanks to the Kafka listener, which could be great with other case of error (like network issues), but not in our case of Optimistic Failure
What are we missing ?
Our goal would be to retry the processEvent method and not discarding the following events
Solution
The @Retryable
method must be in a different bean managed by Spring; if you call the method internally, within the same class, you will bypass the retry interceptor.
Answered By - Gary Russell
Answer Checked By - Mildred Charles (JavaFixing Admin)