Complete solution using current event schema implementation
This section is an proposition keeping using exiting event schema implementation.
CPS Events Releases
CPS events are defined (JSON Schema) and generated (Java classes) from cps repository in cps-events module: https://github.com/onap/cps/tree/master/cps-events
Each time a new version of CPS Events schema is published it contains both:
- The new schema definition (N)
- The previously published schema definition (N-1)
It also clearly specifies (in release notes ?) what is the Compatibility type of new schema (N) with previous schema (N-1).
Finally, an EventSchemaMapper is provider to map events from new schema (N) to previous schema (N-1) and the other way around. This mapper needs to have some logic depending on the change made in the schema. For example, If new fields are added in the new schema, they might be given a default value in the previous one, if possible.
No more than 2 schemas versions are supported at the same time.
Following is an example of v0 and v1 schema definition in cps-events module.
Then, cps-event artifact built contains the generated classes for both new and previous event schema. Both are available to be used by producer and consumers that are importing this specific cps-events jar as a dependency.
cps-events Maven Artifact Version
DataUpdatedEvent Schema Version
Consumers of CPS Events
Compiled Code
When using cps-events artifact, consumers has access to both new and previous cps events classes.
Then, it is possible to implement listeners for both events versions as suggested in the following snippet:
/** * Consume event from v0 schema. */ public void consume(final org.onap.cps.event.model.v0.CpsDataUpdatedEvent eventV0) { // Map event v0 to v1 org.onap.cps.event.model.v1.CpsDataUpdatedEvent eventV1 = this.eventSchemaMapper.v0ToV1(eventV0); // Consume event v1 consume(eventV1); } /** * Consume event from v1 schema. */ public void consume(final org.onap.cps.event.model.v1.CpsDataUpdatedEvent eventV1) { // Map event to entity final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(eventV1); // Persist entity this.networkDataService.addNetworkData(networkData); }
From a compiled code point of view, supporting both event type versions is in place.
Runtime Configuration
The challenge is now to be able to choose which event version a listener is consuming at startup, by configuration only, without changing the application compiled code.
This is done by enabling only one of the 2 listeners by configuration:
@KafkaListener( topics = "${app.listener.data-updated.v0.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler", autoStartup = "${app.listener.data-updated.v0.autoStartup}") public void consume(final org.onap.cps.event.model.v0.CpsDataUpdatedEvent eventV0) @KafkaListener( topics = "${app.listener.data-updated.v1.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler", autoStartup = "${app.listener.data-updated.v1.autoStartup}") public void consume(final org.onap.cps.event.model.v1.CpsDataUpdatedEvent eventV1)
app: listener: data-updated: v0: topic: ${CPS_CHANGE_EVENT_V0_LISTENER_TOPIC:cps.dev.null} autoStartup: ${CPS_CHANGE_EVENT_V0_LISTENER_ENABLED:false} v1: topic: ${CPS_CHANGE_EVENT_V1_LISTENER_TOPIC:cps.cfg-state-events} autoStartup: ${CPS_CHANGE_EVENT_V1_LISTENER_ENABLED:true}
Producer of CPS Events
Compiled Code
The producer is able to send both current and previous versions of events by implementing 2 sets of Notification Service and Notification Publisher using Template Method design pattern.
Following diagrams and snippets are illustrating this implementation:
EventFactory is able to create events for both versions:
public class CpsDataUpdatedEventFactory { ... private EventSchemaMapper eventSchemaMapper; ... public org.onap.cps.event.model.v0.CpsDataUpdatedEvent createCpsDataUpdatedEventV0( final String dataspaceName, final String anchorName) { final var eventV1 = this.createCpsDataUpdatedEventV1(dataspaceName, anchorName); return this.eventSchemaMapper.v1ToV0(eventV1); } public org.onap.cps.event.model.v1.CpsDataUpdatedEvent createCpsDataUpdatedEventV1( final String dataspaceName, final String anchorName) { final var dataNode = cpsDataService.getDataNode(dataspaceName, anchorName, "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); final var anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); return toCpsDataUpdatedEvent(anchor, dataNode); } ... }
Abstract Notification and Publisher classes:
public abstract class NotificationService { ... public void processDataUpdatedEvent(final String dataspaceName, final String anchorName) { ... try { if (shouldSendNotification()) { notify(dataspaceName, anchorName); } } catch (final Exception exception) { notificationErrorHandler.onException("Failed to process cps-data-updated-event.", exception, dataspaceName, anchorName); } } protected abstract void notify(String dataspaceName, String anchorName); }
public abstract class NotificationPublisher<V> { private final KafkaTemplate<String, V> kafkaTemplate; private final String topicName; protected NotificationPublisher( final KafkaTemplate<String, V> kafkaTemplate, final @Value("${notification.data-updated.topic}") String topicName) { this.kafkaTemplate = kafkaTemplate; this.topicName = topicName; } protected abstract void sendNotification(@NonNull final V event); }
Concrete Notification and Publisher classes:
@Service("v1NotificationService") public class V1NotificationService extends NotificationService { private final V1NotificationPublisher notificationPublisher; ... @Override protected void notify(final String dataspaceName, final String anchorName) { final var cpsDataUpdatedEvent = super.getCpsDataUpdatedEventFactory().createCpsDataUpdatedEventV1(dataspaceName, anchorName); notificationPublisher.sendNotification(cpsDataUpdatedEvent); } }
import org.onap.cps.event.model.v1.CpsDataUpdatedEvent; public class V1NotificationPublisher extends NotificationPublisher<CpsDataUpdatedEvent> { public V1NotificationPublisher( final KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate, final @Value("${notification.data-updated.topic}") String topicName) { super(kafkaTemplate, topicName); } protected void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) { final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + "," + cpsDataUpdatedEvent.getContent().getAnchorName(); super.getKafkaTemplate().send(super.getTopicName(), messageKey, cpsDataUpdatedEvent); } }
Runtime Configuration
The event schema version to be published is specified and configurable in application properties. The specified value is used by Spring when the application is starting to instantiate the expected concrete classes for Notification Service and Publisher.
notification: data-updated: enabled: false topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events} event-schema-version: ${CPS_CHANGE_EVENT_SCHEMA_VERSION:v1}
The specific requested Notification Service is injected into Data Service:
public class CpsDataServiceImpl implements CpsDataService { ... @Value("${notification.event-schema-version}NotificationService") private String notificationServiceQualifier; private NotificationService notificationService; @Autowired public void setNotificationService(final ApplicationContext applicationContext) { this.notificationService = (NotificationService) applicationContext.getBean(notificationServiceQualifier); } ... }
POC
POC code can be found in Gerrit WIP changes:
Conclusion
Proposed solution above gives flexibility to deploy new releases of the components (producer or consumers) independently, in any order.
When deploying them, the event schema configuration provided ensures that producer and consumers can still remain compliant with specific Compatibility type (forward or backward), if any.
However, some of complexity is introduced in the design because current and previous event classes do not implement any common type, then more specific code is needed to support both types of event.