Issue
Context
Per the spring docs https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#using-the-get-command, the GET command on the SFTP outbound gateway with STREAM option would return the input stream corresponding to the file passed in the input channel.
We could configure an integration flow similar to the recommendation at
https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#configuring-with-the-java-dsl-3
@Bean
public QueueChannelSpec remoteFileOutputChannel() {
return MessageChannels.queue();
}
@Bean
public IntegrationFlow sftpGetFlow() {
return IntegrationFlows.from("sftpGetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.GET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.STREAM))
.channel("remoteFileOutputChannel")
.get();
}
I plan to obtain the input stream from the caller similar to the response provided in the edits in the question here No Messages When Obtaining Input Stream from SFTP Outbound Gateway
public InputStream openFileStream(final int retryCount, final String filename, final String directory)
throws Exception {
InputStream is = null;
for (int i = 1; i <= retryCount; ++i) {
if (sftpGetInputChannel.send(MessageBuilder.withPayload(directory + "/" + filename).build(), ftpTimeout)) {
is = getInputStream();
if (is != null) {
break;
} else {
logger.info("Failed to obtain input stream so attempting retry " + i + " of " + retryCount);
Thread.sleep(ftpTimeout);
}
}
}
return is;
}
private InputStream getInputStream() {
Message<?> msgs = stream.receive(ftpTimeout);
if (msgs == null) {
return null;
}
InputStream is = (InputStream) msgs.getPayload();
return is;
}
I would like to pass the input stream to the item reader that is part of a Spring Batch job. The job would read from the input stream and close the stream/session upon completion.
Question
The response from the SFTP outbound gateway is sent to a queue channel. If there are concurrent GET requests to the gateway from multiple jobs/clients, how does the consumer pick the appropriate input stream from the blocking queue in the queue channel? The solution I could think of
- Mark getInputStream as synchronized. This would ensure that only one consumer can send commands to the outbound gateway. Since all we are doing is returning a reference to the input stream, it is not a huge performance bottleneck. We could also set the capacity of the queue channel as an additional measure.
This is not an ideal solution because it is very much possible for other devs to bypass the synchronized method here and interact with the outbound gateway. We run the risk of fetching an incorrect stream.
The underlying SFTP client implementation used by Spring doesn't impose any such restrictions so I am seeking a Spring integration solution that can overcome this problem.
Does the GET with STREAM return any headers with the input file name from the payload that can be used by the client to make sure that the stream corresponds to the requested file? This would require peeking + inspection in to the queue before popping a message out of the queue. Not ideal, I think.
Is there a way to pass the response queue channel name as a parameter from the caller?
Appreciate any insights.
Solution
Yes, simply set the replyChannel
header with a new QueueChannel
for each request and terminate the flow with the gateway; if there is no output channel, the ob gateway sends the reply to the header channel.
That is similar to how inbound gateways work.
Answered By - Gary Russell
Answer Checked By - Katrina (JavaFixing Volunteer)