/
Kafka setup to test SSL functionality in apex

Kafka setup to test SSL functionality in apex

Strimzi Custom Resources

To get kafka-cluster.yaml kafka-topic.yaml and kafka-user.yaml, start from the base resources and modify them in order to create a Kafka cluster with 2-way mutual TLS authentication.

  1. Download kafka-cluster.yaml kafka-topic.yaml and kafka-user.yaml from the links below, and modify if necessary

  2. kafka-cluster.yaml: In order to access the cluster from external we need to add an external listener
    An external listener can be set up as loadbalancer, nodeport etc. To set it up as nodeport add to the base cluster yaml file a new listener with name external:

    apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: # ... listeners: - name: external port: 9094 type: nodeport tls: true authentication: type: tls # ... # ... zookeeper: # ...


    "NodePort" type services are created for each Kafka broker, as well as an external bootstrap service. The bootstrap service routes external traffic to the Kafka brokers.
    The cluster CA certificate to verify the identity of the kafka brokers is also created with the same name as the Kafka resource.

  3. kafka-topic.yaml: No need to edit, unless cluster name is different from "my-cluster"

  4. kafka-user.yaml: No need to edit, unless cluster name and topic name are different

Setup an Apache Kafka cluster on Kubernetes with Strimzi



  1. Install the latest version of the minikube binary, which you can get here

  2. Start minikube: 

    minikube start --diver=docker --memory=4096

  3. Create a namespace:

    kubectl create namespace kafka

  4. Apply the strimzi installation file into the namespace:

    kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

  5. Apply the CustomResource to create a Kafka Cluster (Use the resourse created in the step before):

    kubectl apply -f kafka-cluster.yaml -n kafka

  6. Wait until the clusters are ready:

    kubectl get pods -n kafka



  7. Apply CustomResource file to create a Topic and a User with Authentication TLS & Simple Authorization (Use the resources downloaded in the step before):

    kubectl apply -f kafka-topic.yaml -n kafka kubectl apply -f kafka-user.yaml -n kafka

  8. Download the Cluster CA Cert and PKCS12 ( .p12) keys of User to use with Kafka Client:

    kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' -n kafka | base64 -d > ca.crt kubectl get secret my-user -o jsonpath='{.data.user\.password}' -n kafka | base64 -d > user.password kubectl get secret my-user -o jsonpath='{.data.user\.p12}' -n kafka | base64 -d > user.p12

  9. Convert the ca.cert to truststore jks and user.p12 to keystore jks (You will prompted for the truststore password. For JDK truststore, the default password is "changeit". For the keystore the password is inside user.password)

    keytool -keystore user-truststore.jks -alias CARoot -import -file ca.crt


    You will be asked:
    Enter keystore password: Choose a password, in this example it is used 'changeit'
    Re-enter new password: Re-enter the password
    Trust this certificate? [no]: Type 'yes'

    keytool -importkeystore -srckeystore user.p12 -srcstoretype pkcs12 -destkeystore user-keystore.jks -deststoretype jks


    You will be asked:
    Enter destination keystore password: Choose a password, in this example it is used 'changeit'
    Re-enter new password: Re-enter the password
    Enter source keystore password: Paste the content of user.password



  10. If using a nodeport we can get the bootstrap server IP with:

    minikube ip


    And the PORT from my-cluster-kafka-external-bootstrap nodeport service (In this example is 32341):

    kubectl get svc -n kafka

  11. Create a file called client-ssl-auth.properties with the following contents:

    bootstrap.servers=[IP]:[PORT] security.protocol=SSL ssl.truststore.location=[TRUSTSTORE_LOCATION]/user-trustore.jks ssl.truststore.password=changeit ssl.keystore.location=[KEYTSTORE_LOCATION]/user-keystore.jks ssl.keystore.password=changeit ssl.key.password=[contents of user.password file] ssl.endpoint.identification.algorithm=


    #Host name verification of servers is enabled by default for client connections as well as inter-broker connections. Server host name verification may be disabled by setting ssl.endpoint.identification.algorithm= to an empty string.

  12. Test connection with an external kafka producer and consumer in two different terminals replacing ip and port

    /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.49.2:32341 --topic my-topic --producer.config client-ssl-auth.properties /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.2:32341 --group my-group --topic my-topic --consumer.config client-ssl-auth.properties --from-beginning


Start Apex with a sample policy with kafka properties

  1. You should have a tosca policy in $APEX_HOME
    - A tosca policy can be generated via The Apex CLI Tosca Editor
    - Or get a sample policy form ToscaPolicy.json


  2. Change Carrier Technology Parameters and add a kafkaProperties set to the values used in client-ssl-auth.properties

    "kafkaProperties": [ [ "security.protocol", "SSL" ], [ "ssl.truststore.type", "JKS" ], [ "ssl.truststore.location", "/tmp/policy/user-truststore.jks" ], [ "ssl.truststore.password", "changeit" ], [ "ssl.keystore.location", "/tmp/policy/user-keystore.jks" ], [ "ssl.keystore.password", "changeit" ], [ "ssl.key.password", "XPArx7u418h2" ], [ "ssl.endpoint.identification.algorithm", "" ] ]





  3. Start Apex in standalone mode, replace the path to yor truststore and keystore.


    docker run -p 6969:6969 -v $APEX_HOME/ToscaPolicy.json:/tmp/policy/ToscaPolicy.json \

    -v $APEX_HOME/user-truststore.jks:/tmp/policy/user-truststore.jks \

    -v $APEX_HOME/user-keystore.jks:/tmp/policy/user-keystore.jks \

    --name apex -it nexus3.onap.org:10001/onap/policy-apex-pdp:latest \

    -c "/opt/app/policy/apex-pdp/bin/apexEngine.sh -p /tmp/policy/ToscaPolicy.json"


  4. You have two containers apex and minikube, list them with 

    docker ps -a



  5. For containers to communicate with other, they need to be part of the same "network". Docker creates a virtual network called bridge by default, and connects your containers to it. 
    From another terminal use the command network connect, in order to reach the other container.

    docker network connect minikube apex




    Otherwise connection cannot be granted and there would be a timeout




    (Optional)Setup SASL with SCRAM-SHA-512 authentication

    1. Edit the kafka-cluster.yaml and kafka-user.yaml, changing the authentication line
      authentication:
        type: scram-sha-512

    2. Apply those changes
      kubectl apply -f kafka-cluster.yaml -n kafka
      kubectl apply -f kafka-user.yaml -n kafka

    3. Get the scram password

      kubectl get secret my-user -n kafka -o jsonpath='{.data.password}' | base64 --decode > user-scram.password

    4. Create a file called client-scram-auth.properties with the following contents:

      bootstrap.servers=192.168.49.2:32341 security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 ssl.truststore.location=[TRUSTSTORE_LOCATION]/user-truststore.jks ssl.truststore.password=changeit sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="my-user" password="[contents of user-scram.password file]"; ssl.endpoint.identification.algorithm=

    5. Test connection with an external kafka producer and consumer in two different terminals replacing ip and port

      /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.49.2:32341 --topic my-topic --producer.config client-scram-auth.properties /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.2:32341 --group my-group --topic my-topic --consumer.config client-scram-auth.properties --from-beginnin




    6. Change Carrier Technology Parameters and add a kafkaProperties set to the values used in client-ssl-auth.properties. Making sure to change the correct parameters.

      "kafkaProperties": [ [ "security.protocol", "SASL_SSL" ], [ "ssl.truststore.type", "JKS" ], [ "ssl.truststore.location", "/tmp/policy/user-trustore.jks" ], [ "ssl.truststore.password", "changeit" ], [ "sasl.mechanism", "SCRAM-SHA-512" ], [ "sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"my-user\" password=\"user.password\";" ], [ "ssl.endpoint.identification.algorithm", "" ] ]