Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The health check will be implemented using a new specific abstract layer, the class “TopicParameters" can be used to fetch properties from properties file, for the admin connection to Kafka.Example for the

Add a validation that topics should be already created into Kafka, means that in all docker/Kubernetes CITS tests, has to be present the script that create those topics.

When the auto.create.topics.enable is true in Kafka properties, the topic can be created after a message is sent, so in this case the script that create topics is not necessary. But if the participant is the first to send registration message, a validation a that point it will be an issue. A solution is to add a property to enable the topics validation in Participant (“topicValidation“).

In the example below, there is clampAdminTopics that contains new Kafka Admin Client properties (with small simplification)and validation property:

Code Block
languageyaml
broker:
  server: kafka:9092
  infrastructure: NOOP
  fetchTimeout: 15000

participant:
  intermediaryParameters:
    topics:
      operationTopic: policy-acruntime-participant
      syncTopic: acm-ppnt-sync
    reportingTimeIntervalMs: 120000
    description: Participant Description
    participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
    topicValidation: true
    clampAutomationCompositionTopics:
      topicSources:
        - topic: ${participant.intermediaryParameters.topics.operationTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
          fetchTimeout: ${broker.fetchTimeout}
        - topic: ${participant.intermediaryParameters.topics.syncTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
          fetchTimeout: ${broker.fetchTimeout}
      topicSinks:
        - topic: ${participant.intermediaryParameters.topics.operationTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
    clampAdminTopics:
      servers:
        - ${broker.server}
      topicCommInfrastructure: ${broker.infrastructure}
      fetchTimeout: ${broker.fetchTimeout}

for backward compatibility, clampAdminTopics could be implemented as optional (not mandatory), so Kafka Health check will be optional.

The code below shows the abstraction layer for topic heath check:

...

Code Block
languagejava
@Component
public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
    ------------------------------------
    ------------------------------------
    ------------------------------------

    public IntermediaryActivator() {
        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
        syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
    }
    
    public <T> void config(ParticipantParameters parameters,
            List<Publisher> publishers, List<Listener<T>> listeners) {

        // topics, initialization
        ------------------------------------
        ------------------------------------
        ------------------------------------
    }

}

Details of implementation

In policy/common will be implemented the two infrastructures NOOP for testing and KAFKA.

In participant intermediary will be implemented the business logic:

  • if Kafka is not UP yet, and health check fail, the application just waits and try again later using “fetchTimeout“ property for the delay

  • if Kafka is UP, and health check is OK, the application start the Kafka configuration of publishers and listeners.

If clampAdminTopics is not defined into the properties, NOOP will be used as infrastructure, so backward compatibility will be preserved.