Issue
I have two services that should communicate via Kafka
.
Let's call the first service WriteService and the second service QueryService.
On the WriteService side, I have the following configuration for producers.
@Configuration
public class KafkaProducerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
I am trying to send an object of the class com.example.project.web.routes.dto.RouteDto
On the QueryService side, the consumer configuration is defined as follows.
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.groupid}")
private String serviceGroupId;
@Value("${spring.kafka.consumer.trusted-packages}")
private String trustedPackage;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
The listener has the following definition. The payload class has the fully qualified name - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto
@KafkaListener(topics = "${spring.kafka.topics.routes}",
containerFactory = "kafkaListenerContainerFactory")
public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
@Payload RouteDto payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
}
private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}
When a message is sent, I get the following error
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.project.web.routes.dto.RouteDto]; nested exception is java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto
The error is clear enough. However I cannot understand why does it has this behaviour by default. I don't expect to have the same package in different services, that makes no sense at all.
I haven't found a way to disable this and use a class provided to the listener, annotated with @Payload
How this could be solved, without manually configuring the mapper?
Solution
If you are using spring-kafka-2.2.x you can disable the default header by overloaded constructors of JsonDeserializer
docs
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default). The following example shows how to do so:
DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
If with lower version use the MessageConverter
(you might see this problem from spring-kafka-2.1.x and above)
Spring for Apache Kafka provides a MessageConverter abstraction with the MessagingMessageConverter implementation and its StringJsonMessageConverter and BytesJsonMessageConverter customization. You can inject the MessageConverter into a KafkaTemplate instance directly and by using AbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property. The following example shows how to do so:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(RouteDto dto) {
...
}
Note :This type inference can be achieved only when the @KafkaListener annotation is declared at the method level. With a class-level @KafkaListener, the payload type is used to select which @KafkaHandler method to invoke, so it must already have been converted before the method can be chosen.
Answered By - Deadpool
Answer Checked By - Gilberto Lyons (JavaFixing Admin)