Issue
I have developed an application to fetch data from 3rd party site and process it. Data will be fetched in paginated way. Right now the implementation works fine as it collects all the data from all the pages and then creating a list out of it. Because of Memory issue we want to process the data as soon as it's available with/without blocking the next request. Below is my implementation.
public Mono<List<JsonNode>> featchData() {
return webClientHelper.fetchItems(null).expand(response -> {
List<String> headerValue = response.getHeaders().get(Constants.HEADER_ITEM_CURSOR);
if (headerValue == null || headerValue.isEmpty()) {
return Mono.empty();
}
return webClientHelper.fetchItems(headerValue.get(0));
}).flatMap(response -> Flux.fromIterable(Arrays.asList(response.getBody()))).collectList();
}
Below is my web client helper method
public Mono<ResponseEntity<JsonNode[]>> fetchItems(String headerValue) {
return webFluxConfig.getWebClient().get()
.uri("/orders/")
.accept(MediaType.APPLICATION_JSON)
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(5));
})
.header(HEADER_ITEM_CURSOR, headerValue)
.retrieve()
.onStatus(
status -> status.value() == STATUS_CODE_401,
clientResponse -> Mono.empty()
)
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(
new ServerErrorException("Server error", response.rawStatusCode())))
.toEntity(JsonNode[].class)
.timeout(Duration.ofMillis(configuration.getBManagedTimeoutMilliseconds()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2))
.filter(ServerErrorException.class::isInstance)
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
throw new ServerErrorException("Failed to process after max retries",
HttpStatus.SERVICE_UNAVAILABLE.value());
}));
}
Reason why I'm using JsonNode[] is, the response data will be array of dynamic attributes.
NOTE: HEADER_ITEM_CURSOR will contain the data related to next set of results. We need to pass it in the next request, if the HEADER_ITEM_CURSOR is null then there is no more data for next request.
Solution should be, as soon as the first page data is available it should be processed. Call to get the 2nd page data might/might not be blocked and once available it should be added to the list for processing.
Solution
I found the solution by changing Mono to Flux and subscribe.
public Flux<ResponseEntity<JsonNode[]>> featchData() {
return webClientHelper.fetchItems(null).expand(response -> {
List<String> headerValue = response.getHeaders().get(Constants.HEADER_ITEM_CURSOR);
if (headerValue == null || headerValue.isEmpty()) {
return Mono.empty();
}
return webClientHelper.fetchItems(headerValue.get(0));
});
}
In the above code, I removed the last line .flatMap(response -> Flux.fromIterable(Arrays.asList(response.getBody()))).collectList()
And then in the calling method, Instead of collection to list I started directly processing the data.
public static void main(String[] args) {
featchData().subscribe( json -> Arrays.stream(json.getBody()).sequential()
.forEach(item -> System.out.println(item.getId())));
}
Please let me know if anyone sees any issue with the above implementation.
Answered By - Sathesh S
Answer Checked By - Timothy Miller (JavaFixing Admin)