Strategy Message Executor

Strategy Message Executor

Describe the functionality and primary purpose of this group of files. 

Create an analysis top down from the methods of the class ParticipantHandler.

Create an analysis when participantSyncMsg method receives a message witch contain the missing instance and composition.

If you have missing information you can ask me.

The files you provided constitute the Message Handling and Caching component within an ONAP CLAMP Participant Intermediary.


 

Functionality and Primary Purpose

 

The primary purpose of this group of files is to act as the entry point and execution queue for all Kafka messages received by a Participant from the main Automation Composition Management (ACM) runtime.

This module is responsible for two critical functions:

  1. Message Queueing and Throttling: The ParticipantHandler and MsgExecutor classes receive a high volume of Kafka messages (e.g., AutomationCompositionDeploy, AutomationCompositionStateChange). They queue these messages and execute them sequentially in a dedicated thread pool to prevent concurrency issues and system overload.
    (the dedicated thread will use ThreadHandler to create dedicated thread for adapter)

  2. State Synchronization and Caching: The CacheProvider maintains a local, in-memory cache of the latest Automation Composition Definitions (AcDefinition) and Instances (AutomationComposition). This cache, synchronized by ParticipantSync messages, is used by the MsgExecutor to ensure that any incoming command is executed using the most recent state information.

In essence, this module ensures that state change commands are processed reliably, sequentially, and with guaranteed consistency relative to the ACM runtime's view of the world.


 

Top-Down Analysis from the Methods of ParticipantHandler

 

The ParticipantHandler class acts as the Kafka Listener for all command and synchronization topics from the ACM runtime. Its methods are essentially wrappers that take an incoming Kafka message, wrap it in a custom AutomationCompositionMsg, and submit it to the MsgExecutor queue for processing.

 

Core Command Handling Pattern

 

Every command-handling method (handleAutomationCompositionDeploy, handleAutomationCompositionStateChange, etc.) follows this structure:

  1. Receive Kafka Message: The method is invoked by the Kafka listener framework with a specific message type (e.g., AutomationCompositionDeploy).

  2. Create Wrapper Object: A new AutomationCompositionMsg is created. This wrapper encapsulates:

    • The Kafka message itself (T message).

    • A Consumer<T> (a lambda or method reference) that defines the actual business logic to be executed (e.g., acStateHandler::handleAcStateChange).

  3. Set Cache Synchronization IDs: Helper methods (setCompositionUpdate, setInstanceUpdate) extract the Composition ID/Revision ID and Instance ID/Revision ID from the incoming message and set them on the AutomationCompositionMsg. These IDs are crucial for cache validation later.

  4. Submit to Executor: The final wrapped message is submitted to the MsgExecutor via msgExecutor.execute(acMsg).

This design effectively separates the Kafka reception and message preparation logic (ParticipantHandler) from the asynchronous, sequential execution and cache validation logic (MsgExecutor).

 

Special Synchronization Method

 

 

handleParticipantSync(ParticipantSync participantSyncMsg)

 

This method is unique as it handles the synchronization message sent periodically by the ACM runtime.

  1. Sync AcDefinition: It delegates the synchronization of Automation Composition Definitions to a dedicated handler (acDefinitionHandler.handleParticipantSync(participantSyncMsg)). This updates the local cache (CacheProvider) with the latest versions of Definitions and their elements.

  2. Process Queued Messages: It calls msgExecutor.check(). This is the key action. After the cache is updated with the latest Definition and Instance states, the MsgExecutor is signaled to re-evaluate its message queue. Any messages previously deemed outdated or missing their required data (the "stalled" messages) can now be processed if the necessary data has arrived via the ParticipantSync.


 

Analysis of participantSyncMsg with Missing Data

 

The scenario describes a message handling process where an incoming command message (e.g., AutomationCompositionDeploy) is received, but the essential data—the corresponding Automation Composition Instance or Composition Definition—is missing from the Participant's local cache.

 

Sequence of Events

 

Step

Component

Action

Result and State

  1. Command Message Arrives

ParticipantHandler

Receives AutomationCompositionDeploy (or similar) message.

Wraps message in AutomationCompositionMsg (e.g., acMsg).

  1. Message Execution

MsgExecutor.execute(acMsg)

Submits acMsg to the executor queue. The message moves to MsgExecutor.check().

 

  1. Cache Validation (Failure)

MsgExecutor.validate(acMsg.message)

The validate method checks: cacheProvider.isCompositionDefinitionUpdated(...) and cacheProvider.isInstanceUpdated(...). Since the instance/composition is missing, the check returns false (e.g., isInstanceUpdated returns false if automationCompositions.get(instanceId) is null).

 

  1. Message is Stalled

MsgExecutor

Since validate failed (result = false), the message is not executed. It is instead placed in a wait queue or deferred queue (implied by the msgExecutor.execute and msgExecutor.check logic).

 

  1. Synchronization Message Arrives

ParticipantHandler.handleParticipantSync

Receives the ParticipantSync message from the ACM runtime.

Updates the local cache. The missing Instance/Composition is now populated in the CacheProvider.

  1. Re-Evaluation Trigger

ParticipantHandler.handleParticipantSync

Calls msgExecutor.check().

 

  1. Execution of Stalled Messages

MsgExecutor.check()

The MsgExecutor iterates through its queue of waiting messages (including the one from Step 4). It calls validate(stalledMessage) again. This time, the required Instance/Composition is present in the cache, and the validate method returns true.

 

  1. Command is Executed

MsgExecutor

The stalled message is executed via acMsg.execute(), which calls the encapsulated consumer (e.g., acStateHandler::handleAcDeploy).

 

 

Purpose of the Mechanism

 

This mechanism is a critical safeguard against timing issues in a distributed system, known as out-of-order delivery tolerance.

  • It prevents a command message (like DEPLOY) from being processed before the essential state information (the Instance or Definition data) that it relies on has been received via a prior ParticipantSync message.

  • The use of Revision IDs in the validate method (cacheProvider.isCompositionDefinitionUpdated) ensures the Participant executes the command only when its local cache holds a version of the data that is at least as current as the one used to generate the command message. This guarantees data consistency.


Create two analysis:

ParticipantHandler receive a AutomationCompositionDeploy message, and composition is in cache

ParticipantHandler receive a AutomationCompositionDeploy message, and composition is not in cache

This analysis focuses on how the ParticipantHandler and MsgExecutor coordinate to process a Kafka command, specifically an AutomationCompositionDeploy message, considering the state of the local data cache.


Analysis 1: Command Received and Composition is in Cache (Ideal Scenario)

This scenario represents the fast path where the Participant has the most up-to-date state information for the Automation Composition Instance and Definition.

Execution Flow:

  1. Message Reception: The ParticipantHandler.handleAutomationCompositionDeploy(acDeployMsg) method is called by the Kafka listener thread.

  2. Message Preparation:

    • A wrapper object, AutomationCompositionMsg (acMsg), is created, encapsulating the acDeployMsg and the execution logic (a reference to the actual handler method, e.g., acDeploymentHandler::handleAcDeploy).

    • The essential IDs and Revision IDs (Instance ID, Instance Revision ID, Composition ID, Composition Revision ID) are extracted from acDeployMsg and set on acMsg using setCompositionUpdate and setInstanceUpdate.

  3. Executor Submission: msgExecutor.execute(acMsg) is called. The acMsg is submitted to the MsgExecutor's queue.

  4. Cache Validation (Success): When the MsgExecutor processes the message (likely via its internal check() method), it calls validate(acMsg).

    • CacheProvider.isCompositionDefinitionUpdated(compositionId, revisionIdComposition) returns true because the cache contains the Definition, and its revisionId matches the one in the message.

  5. Execution: Since validation is successful, the MsgExecutor calls acMsg.execute(). This invokes the actual handler method (acDeploymentHandler::handleAcDeploy), which contains the Participant's business logic (e.g., configuring resources, reporting success via ParticipantIntermediaryApi).

  6. Cleanup: The message is fully processed and removed from the executor's queue.

Outcome: The command is executed immediately (in the context of the executor's dedicated thread) and uses the guaranteed-consistent data from the local cache.


Analysis 2: Command Received and Composition is NOT in Cache (Stalled Scenario)

This scenario represents the slow path where the command message has arrived before the required state synchronization message, indicating a temporary Kafka out-of-order delivery.

Execution Flow:

  1. Message Reception & Preparation: Steps 1 and 2 are the same as in Analysis 1. The AutomationCompositionMsg (acMsg) is created and submitted to the MsgExecutor.

  2. Cache Validation (Failure): When the MsgExecutor calls validate(acMsg):

    • CacheProvider.isCompositionDefinitionUpdated(compositionId, revisionIdComposition) checks the local map acElementsDefinitions. Since the Composition is not in the cache (acElementsDefinitions.get(compositionId) is null), the method returns false.

    • The MsgExecutor logs the instance as "missing or outdated."

  3. Message is Stalled: Because validation failed, the message is NOT executed. It remains in the MsgExecutor's internal waiting queue (the concurrentWaitingMsg structure, implicitly used by execute and check logic).

  4. Request a Synchronization: request a Synchronization to ACM-r with the missing Automation Composition Definition (publisher.sendParticipantReqSync(participantReqSync))

  5. Synchronization Arrival: Later, the ParticipantSync message arrives at ParticipantHandler.handleParticipantSync(participantSyncMsg). This message contains the missing Automation Composition Definition.

  6. Cache Update: The acDefinitionHandler.handleParticipantSync() updates the CacheProvider, successfully loading the missing Definition into the local cache.

  7. Re-Evaluation Trigger: ParticipantHandler.handleParticipantSync() calls msgExecutor.check(). This method iterates over the stalled messages.

  8. Cache Validation (Success on Retry): The MsgExecutor calls validate(stalledMessage) again. This time:

    • The Definition is found in the cache.

    • The Revision IDs are checked and matched.

    • validate returns true.

  9. Execution: The stalled message is moved out of the waiting queue and executed by calling stalledMessage.execute().

Outcome: The command is delayed until the system state is synchronized. The message processing is idempotent and self-correcting, guaranteeing that the command only executes after the essential background data (Instance/Definition) has been loaded into the cache. This is a crucial pattern for maintaining data consistency in an asynchronous, distributed environment.