...
The most significant change is in line 14. We are using flatMapMany since we want to map one CbsClient to many JsonObject updates. After 5 seconds CbsClient will call CBS every minute. If the configuration has changed it will pass the JsonObject downstream - in our case consumer of JsonObject will be called.
Parsing streams' definitions
CBS configuration response contains various service-specific entries. It also contains a standardized DCAE streams definitions as streams_publishes and streams_subscribes JSON objects. CBS Client API provides a way of parsing this part of configuration so you can use Java objects instead of low-level GSON API.
Because streams definitions are a simple value objects we were not able to provide you a nice polymorphic API. Instead you have 2-level API at your disposal:
- You can extract raw streams by means of DataStreams.namedSinks (for streams_publishes) and DataStreams.namedSources (for streams_subscribes).
- Then you will be able to parse the specific entry from returned collection to a desired stream type by means of parsers built by StreamFromGsonParsers factory.
Sample usage:
Code Block | ||||
---|---|---|---|---|
| ||||
final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
.flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ofSeconds(5), Duration.ofMinutes(1)))
.map(DataStreams::namedSinks)
.map(sinks -> sinks.filter(StreamPredicates.streamOfType(MESSAGE_ROUTER)).map(mrSinkParser::unsafeParse).toList())
.subscribe(
mrSinks -> mrSinks.forEach(mrSink -> {
logger.info(mrSink.name());
logger.info(mrSink.aafCredentials().username());
logger.info(mrSink.topicUrl());
// ...
}),
throwable -> logger.warn("Ooops", throwable)
); |
...
crypt-password
- an utility for BCrypt passwords
...