...
Code Block | ||||
---|---|---|---|---|
| ||||
final MessageRouterSource sourceDefinition; //... Source definition obtained by parsing CBS response final MessageRouterSubscribeRequest request = ImmutableMessageRouterPublishRequestImmutableMessageRouterSubscribeRequest.builder() .sinkDefinition(sourceDefinition) .build(); cut.subscribeForElements(request, Duration.ofMinutes(1)) .map(JsonElement::getAsJsonObject) .subscribe(json -> { // application logic }, ex -> { logger.warn("An unexpected error while receiving messages from DMaaP", ex); }); |
...