Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The health check will be implemented using a new specific abstract layer, the class “TopicParameters" can be used to fetch properties from properties file, for the admin connection to Kafka.

Example for the new Kafka Admin Client properties (with small simplification):

Code Block
languageyaml
broker:
  server: kafka:9092
  infrastructure: NOOP
  fetchTimeout: 15000

participant:
  intermediaryParameters:
    topics:
      operationTopic: policy-acruntime-participant
      syncTopic: acm-ppnt-sync
    reportingTimeIntervalMs: 120000
    description: Participant Description
    participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
    clampAutomationCompositionTopics:
      topicSources:
        - topic: ${participant.intermediaryParameters.topics.operationTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
          fetchTimeout: ${broker.fetchTimeout}
        - topic: ${participant.intermediaryParameters.topics.syncTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
          fetchTimeout: ${broker.fetchTimeout}
      topicSinks:
        - topic: ${participant.intermediaryParameters.topics.operationTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
    clampAdminTopics:
      servers:
        - ${broker.server}
      topicCommInfrastructure: ${broker.infrastructure}

The code below shows the abstraction layer for topic heath check:

Code Block
languagejava
public class TopicHealthCheckFactory {

    /**
     * Get Topic HealthCheck.
     *
     * @param param TopicParameters
     * @return TopicHealthCheck
     */
    public TopicHealthCheck getTopicHealthCheck(TopicParameters param) {
        return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
            case KAFKA -> new KafkaHealthCheck(param);
            case NOOP ->  new NoopHealthCheck();
            default -> null;
        };
    }
}

The Kafka configuration implemented into the “IntermediaryActivator“ constructor class, has to be move in a method, so the class can be created with no issue related to Kafka connection.

Code Block
languagejava
@Component
public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
    ------------------------------------
    ------------------------------------
    ------------------------------------

    public IntermediaryActivator() {
        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
        syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
    }
    
    public <T> void config(ParticipantParameters parameters,
            List<Publisher> publishers, List<Listener<T>> listeners) {

        // topics, initialization
        ------------------------------------
        ------------------------------------
        ------------------------------------
    }

}