Because most services and collectors deployed on DCAE platform relies on similar microservices a commmon Software Development Kit has been created. It contains utilities and clients which may be used when getting configuration from CBS, consuming messages from DMaaP, interacting with AAI, etc. SDK is written in Java.
...
Sample usage:
Code Block | ||||
---|---|---|---|---|
| ||||
// Generate RequestID and InvocationID which will be used when logging and in HTTP requests RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); // Read necessary properties from the environment final EnvProperties env = EnvProperties.fromEnvironment(); // Create the client and use it to get the configuration CbsClientFactory.createCbsClient(env) .flatMap(cbsClient -> cbsClient.get(diagnosticContext)) .subscribe( jsonObject -> { // do a stuff with your JSON configuration using GSON API final int port = Integer.parseInt(jsonObject.get("collector.listen_port").getAsString()); // ... }, throwable -> { logger.warn("Ooops", throwable); }); |
Note that a subscribe handler can/will be called in separate thread asynchronously after CBS address lookup succeeds and CBS service call returns a result.
If you are interested in calling CBS periodically and react only when the configuration changed you can use updates
method:
Code Block | ||||
---|---|---|---|---|
| ||||
// Generate RequestID and InvocationID which will be used when logging and in HTTP requests
RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
// Read necessary properties from the environment
final EnvProperties env = EnvProperties.fromEnvironment();
// Polling properties
final Duration initialDelay = Duration.ofSeconds(5);
final Duration period = Duration.ofMinutes(1);
// Create the client and use it to get the configuration
CbsClientFactory.createCbsClient(env)
.flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
.subscribe(
jsonObject -> {
// do a stuff with your JSON configuration using GSON API
final int port = Integer.parseInt(jsonObject.get("collector.listen_port").getAsString());
// ...
},
throwable -> {
logger.warn("Ooops", throwable);
}); |
The most significant change is in line 13. We are using flatMapMany since we want to map one CbsClient to many JsonObject updates. After 5 seconds CbsClient will call CBS every minute. If the configuration has changed it will pass the JsonObject downstream - in our case consumer of JsonObject will be called.
Notes about reactive programming
...