Issue
Can someone please help me understand what's going wrong here?
Requirement:
I want to implement delayed queue (DLQ) using ConcurrentKafkaListenerContainer pause-resume functionality. When a message is received on a DLQ, we'll check the last processed time and want to pause the container if the message is not aged enough.
Since the same logic needs to be implemented for all other DLQs from the project, I have used MethodKafkaListenerEndpoint to create containers. If the message is not aged enough, I throw an exception and I call registry.allcontainers.pause, since I want to pause all other DLQ listeners as well.
It works fine, the offsets are not committed and I see on the broker for the given group, there's a lag of 1. I have set setAckAfterHandle to false in DefaultErrorHandler as well.
However, when I call registry.allcontainers.resume this last message for which an exception was thrown from the OnMessage method is not getting polled. If I send a new message to the DLQ then it will pick the latest message and reset the lag to 0. However, I am losing 1 record in this case.
This is the error handler and recoverer:
@Bean
DefaultErrorHandler defaultErrorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 0L));
defaultErrorHandler.setAckAfterHandle(false);
return defaultErrorHandler;
}
@Bean
ConsumerRecordRecoverer customRecoverer(KafkaListenerEndpointRegistry registry) {
return (message, exception) -> {
registry.getAllListenerContainers()
.stream()
.forEach(MessageListenerContainer::pause);
};
}
Listener Method
- I have created listener method like this. If there's no delay required, then I want to forward the message to some other topic.
- The number of listeners would be property driven. e.g
topic.1=service_1.dlq
forwardto.1=service_1
topic.2=service_2.dlq
forwardto.2=service_2
- Hence the listener needs to be injected with the producer topic name as well.
public class MyListener implements MessageListener<String, String> {
@Override
@SneakyThrows
public void onMessage(ConsumerRecord<String, String> data) {
String bypassDelayCheck = Optional.ofNullable(System.getProperty("bypassDelayCheck")).orElse("false");
if (bypassDelayCheck.equals("false") && shouldDelay(data.value())) {
throw new Exception("I want this consumer container to pause");
} else {
kafkaTemplate.send(getProducerTopic(), data.value());
}
}
}
Listener Creation
- I had to use KafkaListenerConfigurer since I also have some other environment specific properties to be used inside this MyListener.
- I did not use creating container using factory since it would be prototype scoped and registry.getAllContainers tried to create the containers in that case. e.g. the producerTopic will depend on which topic this message needs to be forwarded depending on the DLQ on which it was received.
- I am managing the same using member variables of MyListener.
KafkaListenerConfigurer.class
public class MyListenerConfigurer implements KafkaListenerConfigurer{
@Override
@SneakyThrows
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
listOfConfigurations
.forEach(item -> {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>();
MyListener myListener = new MyListener(topicToForwardTo)
kafkaListenerEndpoint.setBeanFactory(beanFactory);
kafkaListenerEndpoint.setBean(myListener);
try {
kafkaListenerEndpoint.setMethod(myListener.getClass().getMethod("onMessage", ConsumerRecord.class));
} catch (NoSuchMethodException e) {
System.out.println("Some Exception");
throw new RuntimeException(e);
}
kafkaListenerEndpoint.setId(someId);
kafkaListenerEndpoint.setTopics(DLQ topic to listen to);
kafkaListenerEndpoint.setGroupId(groupId);
kafkaListenerEndpoint.setAutoStartup(true);
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
registrar.registerEndpoint(kafkaListenerEndpoint);
});
}
}
Solution
You need to throw an exception from the recoverer after pausing; otherwise the record is considered "recovered".
Answered By - Gary Russell
Answer Checked By - David Marino (JavaFixing Volunteer)