Issue
I have the following application.yml file:
spring:
application:
name: test-app
cloud:
stream:
kafka:
binder:
consumerProperties:
value:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
brokers: localhost:9092
autoCreateTopics: true
replicationFactor: 1
bindings:
listenCloudEvent-in-0:
destination: com.test.cloudevent
group: test-app-group
consumer:
configuration:
value:
deserializer: io.cloudevents.kafka.CloudEventDeserializer
listenString-in-0:
destination: com.test.string
group: test-app-group
function:
definition: listenCloudEvent;listenString
As far as I know, following properties should overrides the default deserializer but it seems it doesnt work.
consumer:
configuration:
value:
deserializer: io.cloudevents.kafka.CloudEventDeserializer
I am getting the following error for com.test.couldevent
topic.
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of
io.cloudevents.CloudEvent
(no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
If I change the default deserializer to event cloud deserializer, then it works but then the other listener brakes.
Here is my listener function:
@Bean
public Consumer<Flux<Message<CloudEvent>>> listenCloudEvent() {
return inboundMessage -> inboundMessage
.onErrorStop()
.doOnNext(message -> log.info("[{}] Message is received.", message.getPayload().getId()))
.subscribe();
}
I really appreciate if you could help me on this issue. My main aim is having different deserializer for different topics. However, I wasnt able to succeed on it although it is indicated this way to override the default deserializer in spring documentation.
Thank you in advance!
Solution
You are missing the kafka.
element in the consumer properties. configuration
is a kafka-specific property, you are setting a common consumer property.
The following properties are available for Kafka consumers only and must be prefixed with
spring.cloud.stream.kafka.bindings.<channelName>.consumer..
Note ...stream.kafka.bindings...
, not ...stream.bindings...
.
Answered By - Gary Russell
Answer Checked By - Mildred Charles (JavaFixing Admin)