...
The health check will be implemented using a new specific abstract layer, the class “TopicParameters
" can be used to fetch properties from properties file, for the admin connection to Kafka.
Example for the In the example below, there is clampAdminTopics that contains new Kafka Admin Client properties (with small simplification):
Code Block | ||
---|---|---|
| ||
broker: server: kafka:9092 infrastructure: NOOP fetchTimeout: 15000 participant: intermediaryParameters: topics: operationTopic: policy-acruntime-participant syncTopic: acm-ppnt-sync reportingTimeIntervalMs: 120000 description: Participant Description participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01 clampAutomationCompositionTopics: topicSources: - topic: ${participant.intermediaryParameters.topics.operationTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} fetchTimeout: ${broker.fetchTimeout} - topic: ${participant.intermediaryParameters.topics.syncTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} fetchTimeout: ${broker.fetchTimeout} topicSinks: - topic: ${participant.intermediaryParameters.topics.operationTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} clampAdminTopics: servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} |
clampAdminTopics could be implemented as mandatory or as optional. In the second choice, Kafka Health check will be optional.
The code below shows the abstraction layer for topic heath check:
Code Block | ||
---|---|---|
| ||
public class TopicHealthCheckFactory { /** * Get Topic HealthCheck. * * @param param TopicParameters * @return TopicHealthCheck */ public TopicHealthCheck getTopicHealthCheck(TopicParameters param) { return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { case KAFKA -> new KafkaHealthCheck(param); case NOOP -> new NoopHealthCheck(); default -> nullnew NoopHealthCheck(); }; } } |
The Kafka configuration implemented into the “IntermediaryActivator“ constructor class, has to be move in a method, so the class can be created with no issue related to Kafka connection.
...