...
Code Block | ||
---|---|---|
| ||
broker:
server: kafka:9092
infrastructure: NOOP
fetchTimeout: 15000
participant:
intermediaryParameters:
topics:
operationTopic: policy-acruntime-participant
syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
topicValidation: true
clampAutomationCompositionTopics:
topicSources:
- topic: ${participant.intermediaryParameters.topics.operationTopic}
servers:
- ${broker.server}
topicCommInfrastructure: ${broker.infrastructure}
fetchTimeout: ${broker.fetchTimeout}
- topic: ${participant.intermediaryParameters.topics.syncTopic}
servers:
- ${broker.server}
topicCommInfrastructure: ${broker.infrastructure}
fetchTimeout: ${broker.fetchTimeout}
topicSinks:
- topic: ${participant.intermediaryParameters.topics.operationTopic}
servers:
- ${broker.server}
topicCommInfrastructure: ${broker.infrastructure}
clampAdminTopics:
servers:
- ${broker.server}
topicCommInfrastructure: ${broker.infrastructure}
fetchTimeout: ${broker.fetchTimeout} |
for backward compatibility, clampAdminTopics could be implemented as mandatory or as optional. In the second choice, optional (not mandatory), so Kafka Health check will be optional.
...
Code Block | ||
---|---|---|
| ||
public class TopicHealthCheckFactory { /** * Get Topic HealthCheck. * * @param param TopicParameters * @return TopicHealthCheck */ public TopicHealthCheck getTopicHealthCheck(TopicParameters param) { return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) { case KAFKA -> new KafkaHealthCheck(param); case NOOP -> new NoopHealthCheck(); default -> new NoopHealthCheck()null; }; } } |
The Kafka configuration implemented into the “IntermediaryActivator“ constructor class, has to be move in a method, so the class can be created with no issue related to Kafka connection.
Code Block | ||
---|---|---|
| ||
@Component public class IntermediaryActivator extends ServiceManagerContainer implements Closeable { ------------------------------------ ------------------------------------ ------------------------------------ public IntermediaryActivator() { msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); } public <T> void config(ParticipantParameters parameters, List<Publisher> publishers, List<Listener<T>> listeners) { // topics, initialization ------------------------------------ ------------------------------------ ------------------------------------ } } |
Details of implementation
In policy/common will be implemented the two infrastructures NOOP for testing and KAFKA.
In participant intermediary will be implemented the business logic:
if Kafka is not UP yet, and health check fail, the application just waits and try again later using “
fetchTimeout
“ property for the delayif Kafka is UP, and health check is OK, the application start the Kafka configuration of publishers and listeners.
If clampAdminTopics is not defined into the properties, NOOP will be used as infrastructure, so backward compatibility will be preserved.