...
The health check will be implemented using a new specific abstract layer, the class “TopicParameters
" can be used to fetch properties from properties file, for the admin connection to Kafka.
Add a validation that topics should be already created into Kafka, means that in all docker/Kubernetes CITS tests, has to be present the script that create those topics.
When the auto.create.topics.enable is true in Kafka properties, the topic can be created after a message is sent, so in this case the script that create topics is not necessary. But if the participant is the first to send registration message, a validation a that point it will be an issue. A solution is to add a property to enable the topics validation in Participant (“topicValidation
“).
In the example below, there is clampAdminTopics that contains new Kafka Admin Client properties and validation property:
Code Block | ||
---|---|---|
| ||
broker: server: kafka:9092 infrastructure: NOOP fetchTimeout: 15000 participant: intermediaryParameters: topics: operationTopic: policy-acruntime-participant syncTopic: acm-ppnt-sync reportingTimeIntervalMs: 120000 description: Participant Description participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01 topicValidation: true clampAutomationCompositionTopics: topicSources: - topic: ${participant.intermediaryParameters.topics.operationTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} fetchTimeout: ${broker.fetchTimeout} - topic: ${participant.intermediaryParameters.topics.syncTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} fetchTimeout: ${broker.fetchTimeout} topicSinks: - topic: ${participant.intermediaryParameters.topics.operationTopic} servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} clampAdminTopics: servers: - ${broker.server} topicCommInfrastructure: ${broker.infrastructure} |
clampAdminTopics could be implemented as mandatory or as optional. In the second choice, Kafka Health check will be optional.
...
Code Block | ||
---|---|---|
| ||
@Component public class IntermediaryActivator extends ServiceManagerContainer implements Closeable { ------------------------------------ ------------------------------------ ------------------------------------ public IntermediaryActivator() { msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); } public <T> void config(ParticipantParameters parameters, List<Publisher> publishers, List<Listener<T>> listeners) { // topics, initialization ------------------------------------ ------------------------------------ ------------------------------------ } } |
...