Hazelcast Overview with CPS/NCMP

Hazelcast - distributed in-memory data grid (IMDG) platform, reliable for processing and storing data efficiently.

Defaults

  • Distributes data, partitions (both primary and backup replicas) across all cluster members

    • Back up counts are ignored in a single instance setup

    • Default number of partitions 271

  • Automatically creates backups at partition level which are distributed across all cluster members (back up count is configurable)

  • Members have core count * 20 threads that handle requests

  • Single replica of each partition is created

    • One of these replica is called primary and others are backups

    • Partition table information is sent and update across the members by default every 15 seconds, this can be configured with the ff:

      • hazelcast.partition.table.send.interval

  • For maps and queues

    • one synchronous back up, zero asynchronous back up

    • Binary in memory format

 

Hazelcast Topologies

There are two available Hazelcast Topologies (Embedded, Client/Server).

CPS/NCMP uses Embedded technology and will be the main focus for the rest of this documentation.

  • Scaling application instance in embedded mode also scales the hazelcast cluster

  • Application instance and hazelcast shares the same JVM and resources.

 

Hazelcast as a AP or CP system

Hazelcast as an AP system provides Availability and Partition Tolerance

  • Data structures under HazelcastInstance API are all AP data structures

    • CPS/NCMP uses AP data structures 

  • No master node in an AP cluster

  • When a partition fails, its data is redistributed to other members

Hazelcast as a CP system provides Consistency and Partition Tolerance

  • Data structures under HazelcastInstance.getCPSubsytem() API are CP data structures

 

CP Subsystem

  • Currently, CP Subsystem contains only the implementations of Hazelcast's concurrency APIs.

  • Provides mechanisms for leader election, distributed locking, synchronization, and metadata management.

  • Prioritizes consistency which means it may halt operations on some members

  • CPMap only available in enterprise edition

 

Hazelcast Distributed Data Structures

Standard Collections (AP)

  • Maps

    • synchronous backup

      • changes to a map waits to finish update on the primary and all other backups before operation continues

    • asynchronous backup

      • changes to a map does not wait  to finish update on the primary and all other backups before operation continues

  • Queues

  • Lists

  • Sets

  • Ringbuffers

Concurrency Utilities

  • AtomicLongs

  • AtomicReferences

  • Semaphores

  • CountdownLatches

Publish/Subscribe Messaging

CP Subsystem

 

 

Hazelcast in CPS/NCMP

Multiple instances of hazelcast created in application.

Each instance is a member and/or client in a Hazelcast cluster.

 

Multiple instances

  • Creates complexity

  • Resource overhead

    • Each hazelcast instance needs its own memory, CPU and network resources

    • Each instance has its own memory allocation within the JVM heap

  • Increase in network traffic

    • synchronization

    • processing

  • Data inconsistency is a risk

 

Cluster Sizing 

  • Cluster size depends on multiple factors for application

  • Adding more members to the cluster

    • Capacity for CPU-bound computation rises

    • Use of Jet stream processing engine helps this

    • Adding members increases the amount of data that needs to be replicated.

    • Can improve fault tolerance or performance (if minimum recommended (1 per application instance) is not enough)

  • Fault tolerance recommendation

    • At least 3 members or n+1

See Hazelcast's Cluster Sizing

 

Tips

  • One instance of hazelcast per application instance

    • though for fault tolerance, we might need 3 ?

  • Use map.set() on maps instead of map.put() if you don’t need the old value.

    • This eliminates unnecessary deserialization of the old value.

    • map.putIfAbsent() - inserts only if key doesn't exist

    • map.set() - inserts or updates unconditionally

  • Consider use of Entry processor of updating map entries in bulk (i.e. get then set)

  • Changing to OBJECT In-Memory format for specific maps

    • applies entry processor directly on the object

    • useful for queries

    • No serialization/deserialization is performed

  • Enabling in-memory back up reads

    • Allowing member to check its own back up instead of calling the owner of the partition or the primary copy

    • Can be useful to reduce latency and improve performance

    • Needs back ups to be synchronised

  • Back Pressure mechanism in hazelcast

    • acts like a traffic controller 

    • helps prevent system overload

    • helps prevent crashes due to excessive operation invocations

    • Limits the number of concurrent operations and periodically syncs async backups

    • Needs to be configured as it is disabled by default

      • configure to be enabled

      • configure max concurrent invocations per partition

  • Controlling/Configuring partitions

    • Putting data structures in one partition

      • reduces network traffic

      • improves data access performance

      • Simplifies data management

    • Using custom partition strategies based on requirements (Partitioning strategy)

  • Reading Map Metrics

 

Backups in Hazelcast

  • Hazelcast does not support backup redistribution, meaning if the back up configuration of a running cluster is changed then the existing backups are unaffected. The cluster needs to be restarted to take effect of the new configuration.

    • For example: The following cluster is configured to have the default backup count of 1, when the backup count is increased to 2, no new backups will be created. If the backup count is decreased to 0, A2 will still exist but may not be valid.

Member 1

Member 2

Member 1

Member 2

Data: A1

Data: B1

Backup: A2

Backup: B2

2 types of data backups

By default, hazelcast structures are configured to have 1 synchronous back up and 0 asynchronous back up.

Synchronous backups

  • operations are blocked until all backups are copied

  • default count is 1

  • sets by “.setBackupCount(1)”

Asynchronous (async) backups

  • operations proceed without waiting for backup completion

  • best for performance over immediate consistency

  • sets by “.setAsyncBackupCount(1)”

 

Different scenarios

Scenario

Data Availability

Performance

 

Scenario

Data Availability

Performance

 

Synchronous Backup Count = 1

Async Backup Count = 0

As main data fails, the back up takes over immediately

Slower write operations as it awaits for sync back up to complete

 

Synchronous Backup Count = 0

Async Backup Count = 1

As main data fails, the async back up can take over but may be stale

Faster write due to no wait time on sync backup, but if there's failure data might be lost

 

Synchronous Backup Count = 0

Async Backup Count = 0

If main data fails, all data is lost

Fastest write as there’s no backups to manage

most likely solution

Synchronous Backup Count = 1

Async Backup Count = 1

Immediate back up when main data fails, async back up provides extra security

Slower writes but more data integrity and redundancy

 

 

Proposal scenario for CPS-NCMP:

Synchronous backup count = 0

Async backup count = 1

  • allows one backup count to be created asynchronously, meaning it won’t block operations while the backup is being created

Map Backups

  • Memory needed for map is “M + B(M)” wherein M is the memory used by primary data and B(M) is the number of backups

  • Enabling In-Memory backup reads

    • allows local members to access data from their own backups than relying on the primary owner of data.

    • sets by “.setReadBackupData(true)”

Scenario with in memory backup reads enabled:

  1. Local backup read attempt → 1a. Local backup contains data → 1b. Returns data

  2. Local back up read attempt → 2a. Local backup does contain data → 2b. Fetches data from primary source → 2c. Returns data

 

Hazelcast data structures overview

  • Hazelcast data structures have options for using synchronous operations or asynchronous operations.

  • Synchronous operations blocks the calling thread until the operation is completed.

  • Asynchronous operations allows the calling thread to continue executing while the operation is processed in the background.

 

Here are some of the operation examples…

IMap

Synchronous operation

Asynchronous (async) operation

Synchronous operation

Asynchronous (async) operation

put(_)

putAsync(_,_)

remove(_)

removeAsync(_)

get(_)

getAsync(_,_)

set(_,_)

setAsync(_,_)

IQueue

Synchronous operation

Asynchronous (async) operation

Synchronous operation

Asynchronous (async) operation

add(_)

addAsync(_,_)

remove(_)

removeAsync(_)

poll(_)

pollAsync(_,_)

ISet

Synchronous operation

Asynchronous (async) operation

Synchronous operation

Asynchronous (async) operation

add(_)

addAsync(_)

remove(_)

removeAsync(_)

contains(_)

containsAsync(_)

 

CPS-NCMP Hazelcast data structures

 

Current Description

Proposal changes

persistence

Done

moduleSyncWorkQueue

holds data nodes awaiting sync process

  • BlockingQueue<DataNode>

  • Change to IQueue<DataNode>

    • distributed queue implementation in Hazelcast

no

 

moduleSyncStartedOnCmHandles

used as a ‘progress map’ for cm handles as it starts module sync

  • Map<String,Object>

    • e.g. entry : [cmHandle1:'started']

  • Change to use of ISet<String>

    • ensures unique entries .. removes use of .putIfAbsent

no

 

dataSyncSemaphores

holds cmhandleIDs with sync state values to signify either data sync done or in progress

  • IMap<String, Boolean>

  • Configuration change from synchronousBackupCount = 0 and asynchronousBackupCount = 1 gives resilience without blocking operations

no

 

trustLevelPerCmHandle

~upon registration of cmhandles to map, unless specified default value is assigned as TrustLevel.Complete

~upon updating dmiTrustLevel, this map is used to retrieve cm handle trustlevel of affected cmhandles

  • Change to use of IMap<> and .putAsync

  • on update of dmiTrustLevel - the value returned from this map is only for notifications

    • does not entirely need to be from shared cache and can have default values (similar to use of .getOrDefault())

maybe

 

trustLevelPerDmiPlugin

 

  • might not need to be shared

no

 

cmNotificationSubscriptionCache

 

  • Map

  • use of .put() on Map

  • Change to IMap to allow use of hazelcast async operations i.e. .putAsync()

  • benefits from configuration of InMemoryFormat.Object

no

 

 

TrustLevel Data structure Overview

Map.TrustLevelPerCmHandle

How is it filled?
  • CmHandleRegistrationService calls TrustLevelManager

  1. Call TrustLevelManager registerCmHandles

  2. Check if Map.TrustLevelPerCmHandle contains cm handle

  3. Put cmhandle in Map.TrustLevelPerCmHandle if it doesnt exist

    1. with TrustLevel.Complete if no initial trust level stated

  4. SendAvcNotificationEvent if initial trust level is NONE

Proposed changes

N/A

 

 

Questions
  • When initial trust level is not stated , why not also send event?

    • for instance update

Notes

Nov 7, 2024

  • cmhandles are spread between two instances (batch)

  • trustLevel is not persisted

  • in initial cm handle registration store cmhandle with trustLevel.NONE

    • other instance won’t know about it

    • see performance for this with/without hazelcast

    • for cmhandle search

      • near cache

How is it updated?
  • DeviceTrustLevelConsumer

  1. Gets event update from DMI wherein cmhandle ID and new cmhandle trust level is data

  2. call TrustLevelManager updateCmHandleTrustLevel

 

  • TrustLevelManager

  1. Call updateCmHandleTrustLevel

  2. Get DmiServiceName for given cmHandleId

  3. Get DmiTrustLevel from Map.TrustLevelPerDmiPlugin

  4. Get old cmhandle trustLevel from Map.TrustLevelPerCmHandle

  5. Put new cmhandle trustLevel from Map.TrustLevelPerCmHandle

  6. Get effective trustlevel and sendAVCevent if required

Proposed changes
  • TrustLevelManager

    1. if DmiTrustLevel is not in Map.TrustLevelPerDmiPlugin

      1. get status from dmi rest client and add to Map.TrustLevelPerDmiPlugin (3)

      2. or get status from rest DMI client anyhow

    2. if cmhandle is not in Map.TrustLevelPerCmHandle (4)

      1. call register cmHandle

  • use different consumer groups for each instance

    • allows both instance to listen to the event

    • issue of event based syncing: duplication of event forwarded

 

Questions

 

Notes

Nov 7, 2024

  • trustLevel.NONE stored and if trustLevel.COMPLETE removed from map

How is it queried?
  • CmHandleQueryServiceImpl

  1. Call getCmHandleReferencesByTrustLevel

  2. Iterate through all DMI plugin identifiers from Map.TrustLevelPerDmiPlugin

  3. Get all cm handles for given dmi plugin identifier

  4. For all retrieved cm handles , get trustLevel from

    Map.TrustLevelPerCmHandle

  5. Get effective trust level and return if it is same as target trust level

Proposed changes
  • Get all entries in Map.TrustLevelPerCmHandle

    • no trustLevelPerCmHandle entries

  • Retrieve dmi name via cmHandleId

    • Get dmiTrustLevel from Map.TrustLevelPerDmiPlugin or if DmiTrustLevel is not in Map.TrustLevelPerDmiPlugin

      • get status from dmi rest client and add to Map.TrustLevelPerDmiPlugin

 

 

Questions
  • Is there another way to get all DMI plugin identifiers available?

Notes

 

 

Map.TrustLevelPerDmiPlugin

How is it filled?
  • CmHandleRegistrationService calls TrustLevelManager

  1. Call TrustLevelManager registerDmiPlugin

  2. Put dmiServiceName in Map.TrustLevelPerDmiPlugin with TrustLevel.COMPLETE

Proposed changes
  •  

 

 

Questions
  •  

Notes
How is it updated?
  • DmiPluginTrustLevelWatchDog

  1. Iterate through all DMI plugin entries from Map.TrustLevelPerDmiPlugin

  2. Get DMI health status using Dmi rest client

  3. If Old Dmi trustlevel is not same as new trustlevel then

    1. get affected cm handle IDs

    2. call TrustLevel updateDmi

 

  • TrustLevelManager

  1. Call updateDmi

  2. Get old DMI trustLevel from Map.TrustLevelPerDmiPlugin

  3. Put new DmiTrustLevel into Map.TrustLevelPerDmiPlugin

  4. Get each affected cmHandleIDs trustlevel from Map.TrustLevelPerCmHandle

    1. get effective trustLevel

    2. SendAvcNotificationEvent if required

Proposed changes
  • TrustLevelManager

    • Get old DMItrustLevel from rest client

    • if affected cmHandleID is not in Map.TrustLevelPerCmHandle (4) i.e. also not been reported for any change by consumer

      • call registerCmHandle (updated through event consumer)

      • or use of @Cacheable annotation for read-through cache like used in YangTextSchemaSourceSetCache wherein if not in cache/map , it is fetched from database

 

 

 

Questions
  • Would watchdog still be needed if we get DMI trustLevel directly whenever DMI trustLevel is needed?

Notes

Nov 7, 2024

  • assume DMIplugin trust.Level complete when not in collection

 

 

 

 

 

 

 

  • NcmpServiceCmHandle

    • contains two TrustLevel property

      • registrationTrustLevel

      • currentTrustLevel

 

Collection<String> queryCmHandlesByTrustLevel(Map<String, String> trustLevelPropertyQueryPairs, Boolean outputAlternateId);

 

image-20241104-100725.png

 

class TrustLevelManager

registerDmiPlugin(final DmiPluginRegistration dmiPluginRegistration)
  • Add dmi plugins to the cache

  • trustLevel is set to COMPLETE initially

image-20241104-112415.png

 

registerCmHandles(final Map<String, TrustLevel> cmHandlesToBeCreated)
  • Add cmHandles to the cache and publish notification for initial trust level of cmHandles if it is NONE.

 

image-20241104-125656.png
  • Proposal

    • when cm handle is in map ,as current, log.warn(…)

    • when cm handle is not in map

      • AND initial trust level is null

        • we set initial trust level to COMPLETE

      • We add it to map

      • when initial trust level is NONE

        • we publish avc event

 

 

updateDmi(final String dmiServiceName, final Collection<String> affectedCmHandleIds, final TrustLevel newDmiTrustLevel)
  • Updates trust level of device in the cache and publish notification for trust level of device if it has changed.

  • Proposal

    • when getting oldDmiTrustLevel does not exist AND newDmiTrustLevel is 0, sendAvcNotificationIfRequired for all affectedCmHandleIds

 

 

updateCmHandleTrustLevel(final String cmHandleId, final TrustLevel newCmHandleTrustLevel)
  • Updates trust level of device in the cache and publish notification for trust level of device if it has * changed.

 

image-20241104-133355.png

 

 

 

 

 

TrustLevel getEffectiveTrustLevel(final TrustLevel other)

 

image-20241104-134416.png

 

 

removeCmHandles(final Collection<String> cmHandleIds)
  • Remove cm handle trust level from the cache

    image-20241104-134702.png

 

sendAvcNotificationIfRequired(final String notificationCandidateCmHandleId, final TrustLevel oldEffectiveTrustLevel, final TrustLevel newEffectiveTrustLevel)
  • Send avc notification if trust levels are different from cache

image-20241104-135114.png

 

 

 

 

@KafkaListener(topics = "${app.dmi.device-heartbeat.topic}", containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") public void deviceTrustLevelListener(final ConsumerRecord<String, CloudEvent> consumerRecord)

 

  • Listening to the device trust level updates

    • ID of event is the cmhandle ID

    • the event payload data contains the trust level (NONE or COMPLETE)

 

image-20241104-111118.png

 

class DmiPluginTrustLevelWatchDog

@Scheduled(fixedDelayString = "${ncmp.timers.trust-level.dmi-availability-watchdog-ms:30000}") public void checkDmiAvailability()
  • This class monitors the trust level of all DMI plugin by checking the health status

  • The resulting trust level wil be stored in the relevant cache. *

  • The @fixedDelayString is the time interval, in milliseconds, between consecutive checks.

 

image-20241104-141101.png

 

 

 

class CmHandleQueryServiceImpl

Collection<String> getCmHandleReferencesByTrustLevel(final TrustLevel targetTrustLevel, final Boolean outputAlternateId)

 

image-20241104-142932.png

 

class cmHandleRegistrationService