Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The health check will be implemented using a new specific abstract layer, the class “TopicParameters" can be used to fetch properties from properties file, for the admin connection to Kafka.

Add a validation that topics should be already created into Kafka, means that in all docker/Kubernetes CITS tests, has to be present the script that create those topics.

When the auto.create.topics.enable is true in Kafka properties, the topic can be created after a message is sent, so in this case the script that create topics is not necessary. But if the participant is the first to send registration message, a validation a that point it will be an issue. A solution is to add a property to enable the topics validation in Participant (“topicValidation“).

In the example below, there is clampAdminTopics that contains new Kafka Admin Client properties and validation property:

Code Block
languageyaml
broker:
  server: kafka:9092
  infrastructure: NOOP
  fetchTimeout: 15000

participant:
  intermediaryParameters:
    topics:
      operationTopic: policy-acruntime-participant
      syncTopic: acm-ppnt-sync
    reportingTimeIntervalMs: 120000
    description: Participant Description
    participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
    topicValidation: true
    clampAutomationCompositionTopics:
      topicSources:
        - topic: ${participant.intermediaryParameters.topics.operationTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
          fetchTimeout: ${broker.fetchTimeout}
        - topic: ${participant.intermediaryParameters.topics.syncTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
          fetchTimeout: ${broker.fetchTimeout}
      topicSinks:
        - topic: ${participant.intermediaryParameters.topics.operationTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
    clampAdminTopics:
      servers:
        - ${broker.server}
      topicCommInfrastructure: ${broker.infrastructure}

clampAdminTopics could be implemented as mandatory or as optional. In the second choice, Kafka Health check will be optional.

...

Code Block
languagejava
@Component
public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
    ------------------------------------
    ------------------------------------
    ------------------------------------

    public IntermediaryActivator() {
        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
        syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
    }
    
    public <T> void config(ParticipantParameters parameters,
            List<Publisher> publishers, List<Listener<T>> listeners) {

        // topics, initialization
        ------------------------------------
        ------------------------------------
        ------------------------------------
    }

}

...