...
Code Block | ||
---|---|---|
| ||
broker: server: kafka:9092 infrastructure: NOOP fetchTimeout: 15000 participant: intermediaryParameters: topics: operationTopic: policy-acruntime-participant syncTopic: acm-ppnt-sync reportingTimeIntervalMs: 120000 description: Participant Description participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01 topicValidation: true clampAutomationCompositionTopics: topicSources: - topic: ${participant.intermediaryParameters.topics.operationTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} fetchTimeout: ${broker.fetchTimeout} - topic: ${participant.intermediaryParameters.topics.syncTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} fetchTimeout: ${broker.fetchTimeout} topicSinks: - topic: ${participant.intermediaryParameters.topics.operationTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} clampAdminTopics: servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} |
for backward compatibility, clampAdminTopics could be implemented as mandatory or as optional. In the second choice, optional (not mandatory), so Kafka Health check will be optional.
...
Code Block | ||
---|---|---|
| ||
public class TopicHealthCheckFactory { /** * Get Topic HealthCheck. * * @param param TopicParameters * @return TopicHealthCheck */ public TopicHealthCheck getTopicHealthCheck(TopicParameters param) { return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { case KAFKA -> new KafkaHealthCheck(param); case NOOP -> new NoopHealthCheck(); default -> new NoopHealthCheck()null; }; } } |
The Kafka configuration implemented into the “IntermediaryActivator“ constructor class, has to be move in a method, so the class can be created with no issue related to Kafka connection.
...