Issue
I am aware this question has been asked in slightly different formats on this site but following the advices given on those posts took me nowhere. I already spent close to two days on this and I am out of ideas.
We have a spring boot micro service which does nothing more than listening for a message coming into an IBM MQ queue do a little bit of transformation and forwarding it to a Kafka topic. We want this to be transactional so there would be no message lost (critical to our business). We also want to be able to react on transaction commit and rollback events for the purpose of monitoring and support.
I just followed a few "how to" places on the internet and I can easily achieve transactional behaviour in a declarative way using @Transactional
annotation like below:
@Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "DEV.QUEUE.1", containerFactory = "mqListenerContainerFactory", concurrency = "10")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
// Some work here including forward to Kafka topic:
// ...
// ...
// Then publish an event which is supposed to be acted on:
applicationEventPublisher.publishEvent(new MqConsumedEvent("JMS Correlation ID", "Message Payload"));
// Uncommented exception below to create a rollback scenario
// or comment it out to have the processing completed
throw new RuntimeException("No good Pal!");
}
As expected when playing a message with the exception in place the processing will spin forever because of the transaction manager rollbacking again and again. This is good for us.
Now we expect the MqConsumedEvent being published inside our listener method to be intercepted by the onRollback
method below:
@Component
@Slf4j
public class MqConsumedEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = MqConsumedEvent.class)
public void onCommit(MqConsumedEvent event) {
log.info("MQ message with correlation id {} committed to Kafka", event.getCorrelationId());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, classes = MqConsumedEvent.class)
public void onRollback(MqConsumedEvent event) {
log.info("Failed to commit MQ message with correlation id {} to Kafka", event.getCorrelationId());
}
}
This is not happening. Similar commenting out the Exception throwing in the listener makes our MQ message being passed to Kafka. However the onCommit
method is not executed.
From further research and spring debug I believe this is not executing because spring thinks there is no active transaction when publishing the event and such my event it is just ignored. Evaluating TransactionSynchronizationManager.isActualTransactionActive()
and printing it in the logs shows false
which is hard to explain because as I said the transaction rollbacks as expected when an exception is thrown on purpose.
Thank you in advance for your inputs.
UPDATE:
The breakpoints I put brought me to the execution of this ApplicationListenerMethodTransactionalAdapter
class:
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
For reason I am not understanding the first if condition is false. Then fallback execution is false
as I haven't set it true
in my @TransactionalEventListener
usage it will end up on the else branch and just skip the event.
Solution
I had the same problem. In my case it turns out that I had defined an ApplicationEventMulticaster
in my project.
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
var eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
That make the ApplicationListenerMethodTransactionalAdapter
to be executed in a different thread (not the one where the event was published). That's why TransactionSynchronizationManager.isActualTransactionActive()
ends up to be false and the event do not get executed.
Removing the definition of the ApplicationEventMulticaster
worked fine for me.
Answered By - jreb
Answer Checked By - Mary Flores (JavaFixing Volunteer)