Issue
I'm using scatter gather pattern which uses three parallel flows. I can see the aggregated groupMessage consists of the responses coming in order of which comes first. Can I change the index of the messages that are grouped together in aggregate() method by using correlationId and sequenceNumber?
public IntegrationFlow flow() {
return flow ->
.split()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.scatterGather(
scatterer ->
scatterer
.applySequence(true)
.recipientFlow(flow1())
.recipientFlow(flow2())
.recipientFlow(flow3()),
gatherer -> gatherer.releaseLockBeforeSend(true))
.log("Gatherer")
.aggregate(aggregatingAndFormingRequestForFlow4())
.to(flow4())
public MessageGroupProcessor aggregatingAndFormingRequestForFlow4() {
return group -> {
System.out.println("Get Messages: " + group.getMessages())
// here I need to send three responses as arguments to do further processing but as group is a list in the runtime I don't know the index
prepareRequest()
/// }
Here if I see the log :
INFO [pool-12-thread-1]Gatherer:GenericMessage [payload=[response from flow1, response from flow2, response from flow3 ], headers={sequenceNumber=1, sequenceDetails=[[f4da9a6f-f27d-91ac-cbef-5106eb614d62, 1, 1]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b6efbf0, sequenceSize=1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b6efbf0,correlationId=f4da9a6f-f27d-91ac-cbef-5106eb614d62, id=378f5406-ce9e-19fb-c909-5d5c7c4093c0, timestamp=1662660858032}]
Get Messages: [payload=[response from flow1, response from flow2, response from flow3 ], headers={sequenceNumber=1, sequenceDetails=[[f4da9a6f-f27d-91ac-cbef-5106eb614d62, 1, 1]], errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b6efbf0, sequenceSize=1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b6efbf0,correlationId=f4da9a6f-f27d-91ac-cbef-5106eb614d62, id=378f5406-ce9e-19fb-c909-5d5c7c4093c0, timestamp=1662660858032}]
u can see the log it's an array of three responses combined.
Solution
See theory about an Aggregator pattern in the EIP book: https://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html.
Then take a look into its implementation in Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator.
Indeed a correlation details is important info to determine that some messages belong to the same group to emit a single aggregated message for them.
Exactly this approach is done in the Scatter Gather as well: when we distribute message for recipient, we populate the mentioned headers into its copies for every recipient. The aggregator will gather replies and correlate them to group according to their correlation details headers.
There is no guarantee in which order replies come to the aggregator, but you can add an outputProcessor(MessageGroupProcessor outputProcessor)
option to sort messages in the group whatever way you need. This callback happens before the output message is emitted.
UPDATE
Thank you for sharing the code! Now it makes it more cleaner.
So, you have two levels of correlation: splitter-aggregator in the begging and in the end of flow. And then scatter-gather in the middle. This is correct that you see a single message with a list of payloads in the final aggregator: just because you use a default strategy for that gatherer
. What I suggested to do is to use a custom group processor exactly on the gatherer
. All the logic I explained you in comment is exactly for that level, but you misled me to an aggregator I didn't know until now. When you look into that custom processor on the gatherer, you'll see 3 messages in a group and there you can sort them respectively by their sequenceNumber
.
Answered By - Artem Bilan
Answer Checked By - Marilyn (JavaFixing Volunteer)