...
CBS configuration response contains various service-specific entries. It also contains a standardized DCAE streams definitions as streams_publishes and streams_subscribes JSON objects. CBS Client API provides a way of parsing this part of configuration so you can use Java objects instead of low-level GSON API.
...
Code Block | ||||
---|---|---|---|---|
| ||||
final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser(); CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()) .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ofSeconds(5), Duration.ofMinutes(1))) .map(DataStreams::namedSinks) .map(sinks -> sinks.filter(StreamPredicates.streamOfType(MESSAGE_ROUTER)).map(mrSinkParser::unsafeParse).toList()) .subscribe( mrSinks -> mrSinks.forEach(mrSink -> { logger.info(mrSink.name()); // name = the configuration key logger.info(mrSink.aafCredentials().username()); // = aaf_username logger.info(mrSink.topicUrl()); // ... }), throwable -> logger.warn("Ooops", throwable) ); |
For details and sample usage please refer to JavaDoc and unit and integration tests. Especially CbsClientImplIT
and , MessageRouterSinksIT
and MixedDmaapStreamsIT
might be usefulluseful.
...
crypt-password
- an utility for BCrypt passwords
...