Issue
I try to access the flux object in Spring Integration without splitting the flow declaration to two functions. I wonder how can I perform the following:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows.from(somePublisher)
// Access publisher here to perform something like:
.handle(flux -> flux.buffer(Duration.ofMillis(200))
.handle(writeToS3)
.get();
}
I don't mind moving the flux operation that I'm talking about in the comments to another class (maybe for being some kind of a gateway), but it's obviously very important to me to start and the flow from the same mainFlow
function, so it will be super clear and readable to understand what I'm doing in my application. I saw the documentation of gateways with Monos, but the example code is not even possible (they talk about Flux that is not inside a function and it's super hard for me as a beginner to understand what's going on there).
Solution
The IntegrationFlows.from(somePublisher)
start a ractive stream for the provided Publisher
. The rest of the flow is done against every single event in the source Publisher
. So, your .handle(flux ->)
is going to work only if that event from the source is a Flux
.
If your idea to apply that buffer()
for the source Publisher
and go on, then consider to use a reactive()
customizer: https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#fluxmessagechannel-and-reactivestreamsconsumer.
So, instead of that handle()
I would use:
.bridge(e -> e.reactive(flux -> flux.buffer(Duration.ofMillis(200)))
The next handle(writeToS3)
is going to be performed against buffered List
result, so be careful what you are doing and expecting in your writeToS3
.
Answered By - Artem Bilan
Answer Checked By - Robin (JavaFixing Admin)