Apache kafka allows clients to conenct over SSL . By default SSL is disabled but can be turned on as needed.
1. Generating the key and the certificate for each kafka broker
The first step of deploying HTTPS is to generate the key and the certificate for each machine in the cluster. You can use Java’s keytool utility to accomplish this task.
write into a tmp keystore initiailly so that we can export and sign it later with CA.
$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey
You need to specify two parameters in the above command:
keystore: the keystore file that stores the certificate. The keystore file contains the private key of the certificate; therefore, it needs to be kept safely. validity: the valid time of the certificate in days.
Ensure that common name (CN) matches exactly with the fully qualified domain name (FQDN) of the server. The client compares the CN with the DNS domain name to ensure that it is indeed connecting to the desired server, not the malicious one.
2. Creating your own CA
After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.
Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
The next step is to add the generated CA to the **clients’ truststore** so that the clients can trust this CA:
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
**Note: If you enable client authentication required by setting ssl.client.auth to be requested or required on kafka broker config than you must provide a truststore for kafka broker as well and it should have all the CA certificates that clients keys signed by.**
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
In contrast to the keystore in step 1 that stores each machine’s own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one’s truststore also means that trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means that trusting all passports (certificates) that it has issued. This attribute is called the chains of trust, and it is particularly useful when deploying SSL on a large kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.
3. Signing the certificate
The next step is to sign all certificates generated by step 1 with the CA generated in step 2. First, you need to export the certificate from the keystore:
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
Then sign it with the CA:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
$ keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert $ keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
The definitions of the parameters are the following:
keystore: the location of the keystore ca-cert: the certificate of the CA ca-key: the private key of the CA ca-password: the passphrase of the CA cert-file: the exported, unsigned certificate of the server cert-signed: the signed certificate of the server
All of the above steps in a bash script. Note that one of the commands assumes a password of `test1234`, so either use that password or edit the command before running it.
#!/bin/bash #Step 1 keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey #Step 2 openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert #Step 3 keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234 keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
4. Configuring Kafka Broker
Kafka Broker comes with the feature of listening on multiple ports thanks to [KAFKA-1809](https://issues.apache.org/jira/browse/KAFKA-1809) .
we need to configure the following property in server.properties, which must have one or more comma-separated values
listeners
Both PLAINTEXT and SSL ports are necessary if SSL is not enabled for inter-broker communication (see below for how to enable it)
listeners=PLAINTEXT://host.name:port,SSL://host.name:port
Following SSL configs are needed on the broker side
ssl.keystore.location = /var/private/ssl/kafka.server.keystore.jks ssl.keystore.password = test1234 ssl.key.password = test1234 ssl.truststore.location = /var/private/ssl/kafka.server.truststore.jks ssl.truststore.password = test1234
Optional settings that are worth considering:
ssl.client.auth = none ("required" => client authentication is required, "requested" => client authentication is requested and client without certs can still connect when this option chosen") ssl.cipher.suites = "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol." ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note SSL is deprecated and using that in production is not recommended) ssl.keystore.type = JKS ssl.truststore.type = JKS
If you want to enable SSL for inter-broker communication, add the following to the broker properties file (it defaults to PLAINTEXT)
security.inter.broker.protocol = SSL
If you want to enable any cipher suites other than the defaults that comes with JVM like the ones listed here
https://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html.
One needs to install **Unlimited Strength Policy files** http://www.oracle.com/technetwork/java/javase/downloads/jce-7-download-432124.html
Once you start the broker you should be able to see in the server.log
with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)
To check quickly if the server keystore and truststore are setup properly you can run the following command
openssl s_client -debug -connect localhost:9093 -tls1 (Note: TLSv1 should be listed under ssl.enabled.protocols) In the output of this command you should see server's certificate Server certificate -----BEGIN CERTIFICATE----- MIID+DCCAuACCQCx2Rz1tXx3NTANBgkqhkiG9w0BAQsFADB6MQswCQYDVQQGEwJV UzELMAkGA1UECAwCQ0ExFDASBgNVBAcMC1NhbnRhIENsYXJhMQwwCgYDVQQKDANv cmcxDDAKBgNVBAsMA29yZzEOMAwGA1UEAwwFa2FmYWsxHDAaBgkqhkiG9w0BCQEW DXRlc3RAdGVzdC5jb20wHhcNMTUwNzMwMDQyOTMwWhcNMTYwNzI5MDQyOTMwWjBt MQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExFDASBgNVBAcTC1NhbnRhIENsYXJh MQwwCgYDVQQKEwNvcmcxDDAKBgNVBAsTA29yZzEfMB0GA1UEAxMWU3JpaGFyc2hh IENoaW50YWxhcGFuaTCCAbcwggEsBgcqhkjOOAQBMIIBHwKBgQD9f1OBHXUSKVLf Spwu7OTn9hG3UjzvRADDHj+AtlEmaUVdQCJR+1k9jVj6v8X1ujD2y5tVbNeBO4Ad NG/yZmC3a5lQpaSfn+gEexAiwk+7qdf+t8Yb+DtX58aophUPBPuD9tPFHsMCNVQT WhaRMvZ1864rYdcq7/IiAxmd0UgBxwIVAJdgUI8VIwvMspK5gqLrhAvwWBz1AoGB APfhoIXWmz3ey7yrXDa4V7l5lK+7+jrqgvlXTAs9B4JnUVlXjrrUWU/mcQcQgYC0 SRZxI+hMKBYTt88JMozIpuE8FnqLVHyNKOCjrh4rs6Z1kW6jfwv6ITVi8ftiegEk O8yk8b6oUZCJqIPf4VrlnwaSi2ZegHtVJWQBTDv+z0kqA4GEAAKBgB+Pdz0306bq TpUAdb2FERMPLFsx06H0x+TULivcp7HbS5yrkV9bXZmv/FD98x76QxXrOq1WpQhY YDeGDjH+XQkJ6ZxBVBZNJDIpCnfQpfzXAvryQ+cm8oXUsKidtHf4pLMYViXX6BWX Oc2hX4rG+lC8/NXW+1zVvCr9To9fngzjMA0GCSqGSIb3DQEBCwUAA4IBAQBfyVse RJ+ugiNlWg5trZscqH0tlocbnek4UuV/xis2eAu9l4EFOM5kRt5GmkGZRcM/zHF8 BRJwXbf0fytMQKSPFk8R4/NGDOlzoK+F7uXeJ0S2u/T29xk0u2i4tjvleq6OCphE i9vdjM0E0Whf9SHRhOXirOYFX3cL775XwKdzKKRkk+AszFR+mRu90rdoaepQtgGh 9Kfwr4+6AU/dPtdGuomtBQqMxCzlrLd8EYhVVQ97wHIZ3sPvlM5PIhOJ/YHSBJIC 75eo/4acDxZ+j3sR5kcFulzYwFLgDYBaKH/w3mYCgTALeB1zUkX53NVizIvhUd69 XJO4lDSDtGOlfort -----END CERTIFICATE----- subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafak/emailAddress=test@test.com If the certificate not showed up or if there are any other error messages than your keystores not setup properly.
4. Configuring Kafka Producer & Kafka Consumer
SSL supported only for new Kafka Producer & Consumer, the older API is not supported. The configs for SSL will be same for both producer & consumer.
If client authentication is not required in the broker, then the following is a minimal configuration example:
security.protocol = SSL ssl.truststore.location = /var/private/ssl/kafka.client.truststore.jks ssl.truststore.password = test1234
If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
ssl.keystore.location = /var/private/ssl/kafka.client.keystore.jks ssl.keystore.password = test1234 ssl.key.password = test1234
Other configuration settings that may also be needed depending on our requirements and the broker configuration:
ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.) ssl.cipher.suites (Optional). "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol." ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 **Should list at least one of the protocols configured on the broker side**ssl.truststore.type = "JKS" ssl.keystore.type = JKS
Examples using console-producer and console-consumer:
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --new-consumer --consumer.config client-ssl.properties