Strategy Message Executor
Describe the functionality and primary purpose of this group of files.
Create an analysis top down from the methods of the class ParticipantHandler.
Create an analysis when participantSyncMsg method receives a message witch contain the missing instance and composition.
If you have missing information you can ask me.
The files you provided constitute the Message Handling and Caching component within an ONAP CLAMP Participant Intermediary.
Functionality and Primary Purpose
The primary purpose of this group of files is to act as the entry point and execution queue for all Kafka messages received by a Participant from the main Automation Composition Management (ACM) runtime.
This module is responsible for two critical functions:
Message Queueing and Throttling: The
ParticipantHandlerandMsgExecutorclasses receive a high volume of Kafka messages (e.g.,AutomationCompositionDeploy,AutomationCompositionStateChange). They queue these messages and execute them sequentially in a dedicated thread pool to prevent concurrency issues and system overload.
(the dedicated thread will use ThreadHandler to create dedicated thread for adapter)State Synchronization and Caching: The
CacheProvidermaintains a local, in-memory cache of the latest Automation Composition Definitions (AcDefinition) and Instances (AutomationComposition). This cache, synchronized byParticipantSyncmessages, is used by theMsgExecutorto ensure that any incoming command is executed using the most recent state information.
In essence, this module ensures that state change commands are processed reliably, sequentially, and with guaranteed consistency relative to the ACM runtime's view of the world.
Top-Down Analysis from the Methods of ParticipantHandler
The ParticipantHandler class acts as the Kafka Listener for all command and synchronization topics from the ACM runtime. Its methods are essentially wrappers that take an incoming Kafka message, wrap it in a custom AutomationCompositionMsg, and submit it to the MsgExecutor queue for processing.
Core Command Handling Pattern
Every command-handling method (handleAutomationCompositionDeploy, handleAutomationCompositionStateChange, etc.) follows this structure:
Receive Kafka Message: The method is invoked by the Kafka listener framework with a specific message type (e.g.,
AutomationCompositionDeploy).Create Wrapper Object: A new
AutomationCompositionMsgis created. This wrapper encapsulates:The Kafka message itself (
T message).A
Consumer<T>(a lambda or method reference) that defines the actual business logic to be executed (e.g.,acStateHandler::handleAcStateChange).
Set Cache Synchronization IDs: Helper methods (
setCompositionUpdate,setInstanceUpdate) extract the Composition ID/Revision ID and Instance ID/Revision ID from the incoming message and set them on theAutomationCompositionMsg. These IDs are crucial for cache validation later.Submit to Executor: The final wrapped message is submitted to the
MsgExecutorviamsgExecutor.execute(acMsg).
This design effectively separates the Kafka reception and message preparation logic (ParticipantHandler) from the asynchronous, sequential execution and cache validation logic (MsgExecutor).
Special Synchronization Method
handleParticipantSync(ParticipantSync participantSyncMsg)
This method is unique as it handles the synchronization message sent periodically by the ACM runtime.
Sync
AcDefinition: It delegates the synchronization of Automation Composition Definitions to a dedicated handler (acDefinitionHandler.handleParticipantSync(participantSyncMsg)). This updates the local cache (CacheProvider) with the latest versions of Definitions and their elements.Process Queued Messages: It calls
msgExecutor.check(). This is the key action. After the cache is updated with the latest Definition and Instance states, theMsgExecutoris signaled to re-evaluate its message queue. Any messages previously deemed outdated or missing their required data (the "stalled" messages) can now be processed if the necessary data has arrived via theParticipantSync.
Analysis of participantSyncMsg with Missing Data
The scenario describes a message handling process where an incoming command message (e.g., AutomationCompositionDeploy) is received, but the essential data—the corresponding Automation Composition Instance or Composition Definition—is missing from the Participant's local cache.
Sequence of Events
Step | Component | Action | Result and State |
|
| Receives | Wraps message in |
|
| Submits |
|
|
| The |
|
|
| Since |
|
|
| Receives the | Updates the local cache. The missing Instance/Composition is now populated in the |
|
| Calls |
|
|
| The |
|
|
| The stalled message is executed via |
|
Purpose of the Mechanism
This mechanism is a critical safeguard against timing issues in a distributed system, known as out-of-order delivery tolerance.
It prevents a command message (like DEPLOY) from being processed before the essential state information (the Instance or Definition data) that it relies on has been received via a prior
ParticipantSyncmessage.The use of Revision IDs in the
validatemethod (cacheProvider.isCompositionDefinitionUpdated) ensures the Participant executes the command only when its local cache holds a version of the data that is at least as current as the one used to generate the command message. This guarantees data consistency.
Create two analysis:
ParticipantHandler receive a AutomationCompositionDeploy message, and composition is in cache
ParticipantHandler receive a AutomationCompositionDeploy message, and composition is not in cache
This analysis focuses on how the ParticipantHandler and MsgExecutor coordinate to process a Kafka command, specifically an AutomationCompositionDeploy message, considering the state of the local data cache.
Analysis 1: Command Received and Composition is in Cache (Ideal Scenario)
This scenario represents the fast path where the Participant has the most up-to-date state information for the Automation Composition Instance and Definition.
Execution Flow:
Message Reception: The
ParticipantHandler.handleAutomationCompositionDeploy(acDeployMsg)method is called by the Kafka listener thread.Message Preparation:
A wrapper object,
AutomationCompositionMsg(acMsg), is created, encapsulating theacDeployMsgand the execution logic (a reference to the actual handler method, e.g.,acDeploymentHandler::handleAcDeploy).The essential IDs and Revision IDs (Instance ID, Instance Revision ID, Composition ID, Composition Revision ID) are extracted from
acDeployMsgand set onacMsgusingsetCompositionUpdateandsetInstanceUpdate.
Executor Submission:
msgExecutor.execute(acMsg)is called. TheacMsgis submitted to theMsgExecutor's queue.Cache Validation (Success): When the
MsgExecutorprocesses the message (likely via its internalcheck()method), it callsvalidate(acMsg).CacheProvider.isCompositionDefinitionUpdated(compositionId, revisionIdComposition)returnstruebecause the cache contains the Definition, and itsrevisionIdmatches the one in the message.
Execution: Since validation is successful, the
MsgExecutorcallsacMsg.execute(). This invokes the actual handler method (acDeploymentHandler::handleAcDeploy), which contains the Participant's business logic (e.g., configuring resources, reporting success viaParticipantIntermediaryApi).Cleanup: The message is fully processed and removed from the executor's queue.
Outcome: The command is executed immediately (in the context of the executor's dedicated thread) and uses the guaranteed-consistent data from the local cache.
Analysis 2: Command Received and Composition is NOT in Cache (Stalled Scenario)
This scenario represents the slow path where the command message has arrived before the required state synchronization message, indicating a temporary Kafka out-of-order delivery.
Execution Flow:
Message Reception & Preparation: Steps 1 and 2 are the same as in Analysis 1. The
AutomationCompositionMsg(acMsg) is created and submitted to theMsgExecutor.Cache Validation (Failure): When the
MsgExecutorcallsvalidate(acMsg):CacheProvider.isCompositionDefinitionUpdated(compositionId, revisionIdComposition)checks the local mapacElementsDefinitions. Since the Composition is not in the cache (acElementsDefinitions.get(compositionId)isnull), the method returnsfalse.The
MsgExecutorlogs the instance as "missing or outdated."
Message is Stalled: Because validation failed, the message is NOT executed. It remains in the
MsgExecutor's internal waiting queue (theconcurrentWaitingMsgstructure, implicitly used byexecuteandchecklogic).Request a Synchronization: request a Synchronization to ACM-r with the missing Automation Composition Definition (
publisher.sendParticipantReqSync(participantReqSync))Synchronization Arrival: Later, the
ParticipantSyncmessage arrives atParticipantHandler.handleParticipantSync(participantSyncMsg). This message contains the missing Automation Composition Definition.Cache Update: The
acDefinitionHandler.handleParticipantSync()updates theCacheProvider, successfully loading the missing Definition into the local cache.Re-Evaluation Trigger:
ParticipantHandler.handleParticipantSync()callsmsgExecutor.check(). This method iterates over the stalled messages.Cache Validation (Success on Retry): The
MsgExecutorcallsvalidate(stalledMessage)again. This time:The Definition is found in the cache.
The Revision IDs are checked and matched.
validatereturnstrue.
Execution: The stalled message is moved out of the waiting queue and executed by calling
stalledMessage.execute().
Outcome: The command is delayed until the system state is synchronized. The message processing is idempotent and self-correcting, guaranteeing that the command only executes after the essential background data (Instance/Definition) has been loaded into the cache. This is a crucial pattern for maintaining data consistency in an asynchronous, distributed environment.