Monitoring Job Scanner
SupervisionAspect
A new scan creates a new Thread. The executor uses AcmThreadFactory.
The method SupervisionAspect.schedule schedules the execution of scanning to handle timeout of compositions, instances and participants.
The method SupervisionAspect.doCheck intercepts all methods annotated with @MessageIntercept and run the execution of scanning of compositions, and instances.
Note: scanning of compositions and instances are triggered by schedule o by upcoming messages.
AcmThreadFactory
AcmThreadFactory implements ThreadFactory
This class has been implemented to set Uncaught Exception Handler method to log all errors.
SupervisionScanner.run()
the method SupervisionScanner.run() run the monitoring.
for the compositions:
compositionIds is an HashSet of compositionId
add in compositionIds all compositionId fetching from compositions in transition
add in compositionIds all compositionId fetching from compositions in messages
call the method SupervisionScanner.scanAcDefinition for each compositionId
for the instances:
instanceIds is an HashSet of instanceId
add in instanceIds all instanceId fetching from instances in transition
add in instanceIds all instanceId fetching from instances in messages
call the method SupervisionScanner.scanAutomationComposition for each instanceId
Producer and Consumer and Scheduling job mechanisms
the methods SupervisionScanner.scanAcDefinition and SupervisionScanner.scanAutomationComposition are implementing the consumer job mechanism.
MonitoringScanner
All methods in MonitoringScanner are Transactional, in case of Exception they will rollback all db changes.
MonitoringScanner.scanAcDefinition
fetches all messages and calls AcDefinitionScanner.scanMessage
AcDefinitionScanner.scanAutomationCompositionDefinition
MonitoringScanner.scanAutomationComposition
fetches all messages and calls SimpleScanner.scanMessage
base on operation
StageScanner.scanStage for MIGRATING, MIGRATION_REVERTING and PREPARING
SimpleScanner.simpleScan for UPDATING, REVIEWING and MIGRATION_PRECHECKING
PhaseScanner.scanWithPhase for DEPLOYING, UNDEPLOYING, DELETING, LOCKING, UNLOCKING
Analyzed from AI
Describe the functionality and primary purpose of this group of files.
Create an analysis top down from the method run() of the class SupervisionScanner
The files you provided constitute the Supervision Scanner module within the ONAP CLAMP Automation Composition Management (ACM) runtime.
Functionality and Primary Purpose
The primary purpose of this group of files is to act as a watchdog or state machine supervisor for the life cycle of Automation Compositions. It ensures that state change operations—such as deploying, undeploying, or migrating an Automation Composition—are completed successfully or, if they fail, that the state is properly handled and reflected across the system.
Core Functionality:
Monitor State Transitions: It periodically scans the database for both Automation Composition Definitions and Automation Composition Instances that are currently in a transitional state (e.g.,
DEPLOYING,DELETING,MIGRATING).Handle Participant Messages: It processes asynchronous acknowledgments or messages from Participants (the microservices that execute the composition logic) to update the composition's state and track the progress of the operation.
Timeout Detection and Recovery: It checks if any transition is taking longer than a configured maximum operation wait time. If a process times out, it updates the composition's state to a
TIMEOUTresult.Phase and Stage Mechanism: For ongoing operations that have phase or stage, specialized scanners (like
PhaseScannerandStageScanner) implement logic to retry sending state change messages to the responsible Participants, ensuring forward progress.Synchronization: If a state change occurs, it is persisted to the database and a new sync message is published to the Participants to update their local view of the composition, maintaining consistency across the runtime.
Top-Down Analysis from SupervisionScanner.run()
The run() method in the SupervisionScanner class is the main execution entry point for the scheduled supervision task. It orchestrates the entire state monitoring and cleanup process.
1. The run() Method (Scheduler)
The run() method follows these steps:
Cleanup: Logs the start and calls
messageProvider.removeOldJobs()to clean up expired synchronization locks and temporary job entries.Scan Definitions:
It fetches all Automation Composition Definition IDs that are currently in transition from the
AcDefinitionProvider.It also checks the
MessageProviderfor any Definitions that have pending messages (indicating an operation is in progress).It iterates over the collected IDs, calling a private helper method,
scanAcDefinition(compositionId), for each one.
Scan Instances:
It fetches all Automation Composition Instance IDs that are currently in transition from the
AutomationCompositionProvider.It also checks the
MessageProviderfor any Instances that have pending messages.It iterates over the collected IDs, calling a private helper method,
scanAutomationComposition(instanceId, acDefinitionMap), for each one.
Completion: Logs the end of the scan.
2. Delegated Scanning Logic (scanAcDefinition / scanAutomationComposition)
The private helper methods serve as a critical synchronization layer:
Job Creation (Locking): For each ID, a synchronization job (
messageProvider.createJob(id)) is created. This ensures that only one thread processes a specific Definition or Instance at a time, preventing race conditions. If the job cannot be created (another thread holds the lock), the method exits.Core Scanning: The actual state check is delegated to the
MonitoringScanner:monitoringScanner.scanAcDefinition(compositionId)monitoringScanner.scanAutomationComposition(instanceId, acDefinitionMap)
Job Removal: After the
MonitoringScannercompletes, the job is released usingmessageProvider.removeJob(optJobId.get()).
3. State Dispatcher (MonitoringScanner logic)
The MonitoringScanner acts as the primary dispatcher for AC Instances, selecting the correct specialized scanner based on the current state:
State Check | Scanner Used | Purpose of the Sub-Scanner |
AC Definition Scan | AcDefinitionScanner | Handles messages and timeouts specific to Definition life cycle events (e.g., priming the definition to participants). |
AC Instance Check: | StageScanner | Handles transitions that proceed through numbered stages, primarily used for migration and preparation processes. |
AC Instance Check: | SimpleScanner | Handles states where completion is determined by a simple check of all its component elements being out of a transitional state, without a phase or stage numbering. |
AC Instance Check: All other transitional states (e.g., | PhaseScanner | Handles transitions that proceed through numbered phases, typically used for standard deploy/undeploy and state change operations. |
4. Common Utilities (AbstractScanner and UpdateSync)
All specialized scanners (PhaseScanner, StageScanner, SimpleScanner) extend AbstractScanner. This abstract class provides essential common functionality:
handleTimeout: Logic to check if an instance has exceeded its maximum operation wait time (maxOperationWaitMs) and mark it asTIMEOUT.complete: Logic to update an instance when all its constituent elements have successfully completed a transition.saveAndSync: Logic to update the entity in the database and, if needed, publish a new synchronization message to all Participants.
The UpdateSync class is a simple data structure used across all scanners to bundle boolean flags indicating whether the entity needs to be updated in the DB, synchronized with participants, or deleted.