Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

After parsing CBS sink definitions you will get a Source or Sink value object. It can be then directly used to communicate with DMaaP Message Router REST API:.

Writing message publisher

Code Block
languagejava
linenumberstrue
final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher();
final MessageRouterSink sinkDefinition; //... Sink definition obtained by parsing CBS response
final MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
        .sinkDefinition(sinkDefinition)
        .build();

Flux.just(1, 2, 3)
        .map(JsonPrimitive::new)
        .transform(input -> publisher.put(request, input))
        .subscribe(resp -> {
                    if (resp.successful()) {
                        logger.debug("Sent a batch of messages to the MR");
                    } else {
                        logger.warn("Message sending has failed: {}", resp.failReason());
                    }
                },
                ex -> {
                    logger.warn("An unexpected error while sending messages to DMaaP", ex);
                });

Note that we are using Reactor transform operator. As an alternative you could assign Flux of JSON values to the variable and then invoke publisher.put on it. The important performance-related thing to remember is that you should feed the put method with a stream of messages instead of multiple calls with single messages. This way the client API will be able to send them in batches which should significantly improve performance (at least on transfer level).

Writing message subscriber

Code Block
languagejava
linenumberstrue
final MessageRouterSource sourceDefinition; //... Source definition obtained by parsing CBS response
final MessageRouterSubscribeRequest request = ImmutableMessageRouterPublishRequest.builder()
        .sinkDefinition(sourceDefinition)
        .build();

cut.subscribeForElements(request, Duration.ofMinutes(1))
        .map(JsonElement::getAsJsonObject)
        .doOnNext(json -> {
            // application logic
        })
        .subscribe();


...

hvvesclient-producer - a reference Java implementation of High Volume VES Collector client

...