Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

CBS configuration response contains various service-specific entries. It also contains a standardized DCAE streams definitions as streams_publishes and streams_subscribes JSON objects. CBS Client API provides a way of parsing this part of configuration so you can use Java objects instead of low-level GSON API.

...

Code Block
languagejava
linenumberstrue
final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();

CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
    .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ofSeconds(5), Duration.ofMinutes(1)))
    .map(DataStreams::namedSinks)
    .map(sinks -> sinks.filter(StreamPredicates.streamOfType(MESSAGE_ROUTER)).map(mrSinkParser::unsafeParse).toList())
    .subscribe(
        mrSinks -> mrSinks.forEach(mrSink -> {
            logger.info(mrSink.name()); // name = the configuration key
            logger.info(mrSink.aafCredentials().username()); // = aaf_username
            logger.info(mrSink.topicUrl());
            // ...
        }),
        throwable -> logger.warn("Ooops", throwable)
 );

For details and sample usage please refer to JavaDoc and unit and integration tests. Especially CbsClientImplIT and , MessageRouterSinksIT and  MixedDmaapStreamsIT might be usefulluseful.


...

crypt-password - an utility for BCrypt passwords

...