Issue
I have a spring boot app where I use apache kafka-streams. I don't use spring cloud streams. I added actuator health check endpoint. I configured it in the application.yml
like that:
management:
health.db.enabled: false
endpoints.web:
base-path:
path-mapping.health: /
When a runtime exception was thrown and my stream was stopped as logs show but the health check status is UP.
2019-09-17 13:16:31.522 INFO 1 --- [ Thread-5] org.apache.kafka.streams.KafkaStreams : stream-client [lpp-model-stream-7e6e8fea-fcad-4033-92a4-5ede50de6e17] Streams client stopped complet
ely
How to bind kafka stream status to health check endpoint ?
My pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>data-wizards</groupId>
<artifactId>lpp-common-avro</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Link to the code where I create the stream: https://gist.github.com/solveretur/fc4fdd6c7663dc4d58fe72d48029f9c3
Solution
KafkaStreams maintain an in-memory State
which could be mapped to Actuator's health statuses. State could be one of following:CREATED
, ERROR
, NOT_RUNNING
, PENDING_SHUTDOWN
, REBALANCING
, RUNNING
- they are self-explanatory. See docs for state transitions https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/KafkaStreams.State.html
If you're looking for a complete example, you could take following one and update it according to your needs (e.g. you may not count CREATED
as status UP). Make sure you have a bean of type KafkaStreams
in application context.
//Note that class name prefix before `HealthIndicator` will be camel-cased
//and used as a health component name, `kafkaStreams` here
@Component
public class KafkaStreamsHealthIndicator implements HealthIndicator {
//if you have multiple instances, inject as Map<String, KafkaStreams>
//Spring will map KafkaStreams instances by bean names present in context
//so you can provide status details for each stream by name
@Autowired
private KafkaStreams kafkaStreams;
@Override
public Health health() {
State kafkaStreamsState = kafkaStreams.state();
// CREATED, RUNNING or REBALANCING
if (kafkaStreamsState == State.CREATED || kafkaStreamsState.isRunning()) {
//set details if you need one
return Health.up().build();
}
// ERROR, NOT_RUNNING, PENDING_SHUTDOWN,
return Health.down().withDetail("state", kafkaStreamsState.name()).build();
}
}
Then the health endpoint will display it like:
{
"status": "UP",
"kafkaStreams": {
"status": "DOWN",
"details": { //not included if "UP"
"state": "NOT_RUNNING"
}
}
}
Answered By - Yurii J
Answer Checked By - Mary Flores (JavaFixing Volunteer)