Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

There are some scenario where participant when starting, is not able to connect properly with Kafka:

  • Participant start before Kafka is up and running

  • Participant start after Kafka is up and running, but the topics are not available yet

Due the failing during the initial configuration, that create a chain of errors, and the participant application is crashing.

In the scenario that the Kafka topic are created from an external code configured into the ACM-Runtime chart, the ACM-Runtime will be not affected.

For the solution, we can use the library “org.apache.kafka.clients.admin.AdminClient that is an Admin Kafka Client. The functionalities that we are interested are: check if Kafka Nodes are available and fetch the topics configured in Kafka.

Currently the implementation is base with an abstract layer of infrastructure, and the connection to Kafka is implemented in policy/common. So policy/common will be the best place to implement the Admin “Kafka health check” and “fetch Kafka topics“.

The health check will be implemented using a new specific abstract layer, the class “TopicParameters" can be used to fetch properties from properties file, for the admin connection to Kafka.

Add a validation that topics should be already created into Kafka, means that in all docker/Kubernetes CITS tests, has to be present the script that create those topics.

When the auto.create.topics.enable is true in Kafka properties, the topic can be created after a message is sent, so in this case the script that create topics is not necessary. But if the participant is the first to send registration message, a validation a that point it will be an issue. A solution is to add a property to enable the topics validation in Participant (“topicValidation“).

In the example below, there is clampAdminTopics that contains new Kafka Admin Client properties and validation property:

broker:
  server: kafka:9092
  infrastructure: NOOP
  fetchTimeout: 15000

participant:
  intermediaryParameters:
    topics:
      operationTopic: policy-acruntime-participant
      syncTopic: acm-ppnt-sync
    reportingTimeIntervalMs: 120000
    description: Participant Description
    participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
    topicValidation: true
    clampAutomationCompositionTopics:
      topicSources:
        - topic: ${participant.intermediaryParameters.topics.operationTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
          fetchTimeout: ${broker.fetchTimeout}
        - topic: ${participant.intermediaryParameters.topics.syncTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
          fetchTimeout: ${broker.fetchTimeout}
      topicSinks:
        - topic: ${participant.intermediaryParameters.topics.operationTopic}
          servers:
            - ${broker.server}
          topicCommInfrastructure: ${broker.infrastructure}
    clampAdminTopics:
      servers:
        - ${broker.server}
      topicCommInfrastructure: ${broker.infrastructure}

for backward compatibility, clampAdminTopics could be implemented as optional (not mandatory), so Kafka Health check will be optional.

The code below shows the abstraction layer for topic heath check:

public class TopicHealthCheckFactory {

    /**
     * Get Topic HealthCheck.
     *
     * @param param TopicParameters
     * @return TopicHealthCheck
     */
    public TopicHealthCheck getTopicHealthCheck(TopicParameters param) {
        return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
            case KAFKA -> new KafkaHealthCheck(param);
            case NOOP ->  new NoopHealthCheck();
            default -> null;
        };
    }
}

The Kafka configuration implemented into the “IntermediaryActivator“ constructor class, has to be move in a method, so the class can be created with no issue related to Kafka connection.

@Component
public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
    ------------------------------------
    ------------------------------------
    ------------------------------------

    public IntermediaryActivator() {
        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
        syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
    }
    
    public <T> void config(ParticipantParameters parameters,
            List<Publisher> publishers, List<Listener<T>> listeners) {

        // topics, initialization
        ------------------------------------
        ------------------------------------
        ------------------------------------
    }

}

  • No labels