...
The health check will be implemented using a new specific abstract layer, the class “TopicParameters
" can be used to fetch properties from properties file, for the admin connection to Kafka.
Example for the new Kafka Admin Client properties (with small simplification):
Code Block |
---|
|
broker:
server: kafka:9092
infrastructure: NOOP
fetchTimeout: 15000
participant:
intermediaryParameters:
topics:
operationTopic: policy-acruntime-participant
syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
clampAutomationCompositionTopics:
topicSources:
- topic: ${participant.intermediaryParameters.topics.operationTopic}
servers:
- ${broker.server}
topicCommInfrastructure: ${broker.infrastructure}
fetchTimeout: ${broker.fetchTimeout}
- topic: ${participant.intermediaryParameters.topics.syncTopic}
servers:
- ${broker.server}
topicCommInfrastructure: ${broker.infrastructure}
fetchTimeout: ${broker.fetchTimeout}
topicSinks:
- topic: ${participant.intermediaryParameters.topics.operationTopic}
servers:
- ${broker.server}
topicCommInfrastructure: ${broker.infrastructure}
clampAdminTopics:
servers:
- ${broker.server}
topicCommInfrastructure: ${broker.infrastructure}
|
The code below shows the abstraction layer for topic heath check:
Code Block |
---|
|
public class TopicHealthCheckFactory {
/**
* Get Topic HealthCheck.
*
* @param param TopicParameters
* @return TopicHealthCheck
*/
public TopicHealthCheck getTopicHealthCheck(TopicParameters param) {
return switch (Topic.CommInfrastructure.valueOf(param.getTopicCommInfrastructure().toUpperCase())) {
case KAFKA -> new KafkaHealthCheck(param);
case NOOP -> new NoopHealthCheck();
default -> null;
};
}
}
|
The Kafka configuration implemented into the “IntermediaryActivator“ constructor class, has to be move in a method, so the class can be created with no issue related to Kafka connection.
Code Block |
---|
|
@Component
public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
------------------------------------
------------------------------------
------------------------------------
public IntermediaryActivator() {
msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
}
public <T> void config(ParticipantParameters parameters,
List<Publisher> publishers, List<Listener<T>> listeners) {
// topics, initialization
------------------------------------
------------------------------------
------------------------------------
}
} |