Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
final MessageRouterSource sourceDefinition; //... Source definition obtained by parsing CBS response
final MessageRouterSubscribeRequest request = ImmutableMessageRouterPublishRequestImmutableMessageRouterSubscribeRequest.builder()
        .sinkDefinition(sourceDefinition)
        .build();

cut.subscribeForElements(request, Duration.ofMinutes(1))
        .map(JsonElement::getAsJsonObject)
        .subscribe(json -> {
                // application logic
            },
            ex -> {
                logger.warn("An unexpected error while receiving messages from DMaaP", ex);
            });

...