Status
Current state: Under Discussion
Discussion thread:
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Many people using Kafka have a need to encrypt the data that Kafka persists to disk, especially when personal data of customers is involved. After the GDPR became effective in May 2018 discussions around data security have been ever present, and while the GDPR does not require encryption it sure does recommend it. Looking at the financial sector there are a few even stricter regulations that move beyond a simple recommendation and request at-rest encryption "unless technically not feasible". Discussions around this with large corporate compliance departments can at time become quite heated and/or tedious.
Kafka does not currently offer functionality to accomplish this task, so users who need this feature are usually pointed towards technologies like LUKS or dm-crypt to accomplish this. In combination with on-the-wire encryption via TLS, data is secured at all times. However, there are a few drawbacks to this approach:
- TLS encryption requires the broker to decrypt and encrypt every message which prohibits it from using zero-copy-transfer and causes a somewhat larger overhead
- Volume encryption works well as a safeguard against lost disks, but less so for people who already have access to the system (rogue admin problem)
- With volume encryption it is not readily possible to encrypt only specific topics or encrypt topics with unique keys
I think it would be beneficial for Kafka to implement something similar to HDFS's transparent data encryption feature which allows specifying directories as encryption zones, which causes HDFS clients to transparently encrypt on write and decrypt on read all access to those directories. For Kafka the equivalent idea would be to enable users to specify a topic as encrypted which would cause producers to encrypt all data written to and consumers to decrypt all data read from that topic.
As a first step in this document I will outline the full blown transparent, KMS backed solution that I think should cover all corporate requirements. I think it might, in a second step, be useful to consider a slightly scaled down version of this with local keystores and client-side configuration for users who don't want to set up a KMS and are happy to manage keys locally.
Proposed Changes
Overview
A little disclaimer up front: Since a lot of the necessary design and implementation work has already been done in the Hadoop project this proposal leans on the design document & jira heavily and borrows a lot of the ideas and decisions made back then.
I propose to add functionality to Kafka that will enable producers and consumers to transparently perform encryption and decryption of messages sent to Kafka brokers. The Kafka brokers themselves will not need (nor be able to) decrypt the messages, hence they will be stored on disk in an encrypted format, providing additional security. In order to protect against unauthorized access, I suggest that envelope encryption is used for the keys in an analog fashion to how it has been done for Hadoop TDE.
The following diagram shows at a very high level the control flow for key retrieval, encryption and decryption for an existing topic which has been enabled for encryption - omitted specifics are defined in more detail in the later sections of this KIP.
Key Handling
Key Management Server
All key operations will be delegated to an instance of Hadoop KMS instead of locally generating or storing unencrypted keys. This means that for encryption to work a KMS needs to be accessible to producers and consumers. For people running Kafka as part of a Hadoop distribution both major vendors have KMS implementations available that can be used for this. In a standalone installation of Kafka (or Confluent platform) the Hadoop KMS is available as an isolated package and can be installed easily.
The benefit of using the existing implementation is that a lot of the enterprise features around key management have already been solved and will immediately benefit the Kafka implementation while simultaneously limiting the amount of coding necessary. Hadoop KMS setups can range from a single process that stores keys in a .jks file to highly available setups with keys being stored in several redundant hardware security modules (HSM). For some of the more advanced features a vendor specific implementation of KMS will need to be used but diving into the specifics is beyond the purpose of this document.
Integration with KMS will happen via REST calls, so no Hadoop dependencies are introduced with this KIP. I have added a discussion point whether it might make sense to use the Hadoop provided KMSClient libraries, as these would make implementation easier and offer additional services like load-balancing across multiple KMS instances. Authentication for the REST calls can be via SPNEGO or SSL certificates - initially both consumer and producer will use the same credentials that they use to authenticate with Kafka itself.
Envelope Encryption
In order to guard against insider attacks on the encrypted data envelope encryption is implemented around the actual encryption keys. At the time a topic is configured for encryption a topic encryption key (TEK) is requested with the KMS. The KMS creates this key and stores it with an id and version but doesn't send this key to anybody - it never leaves the KMS. Additionally, a data encryption key (DEK) is requested from the KMS per partition of the topic. The KMS creates these DEKs and encrypts them with the TEK to create the encrypted data encryption key (EDEK) which is returned to the broker and stored in an internal topic.
Consumers and producers retrieve the EDEKs from the topic (via an API request to the broker who caches keys internally) and send them to the KMS who will decrypt and return them. Following this process, the Kafka brokers never see the DEKs in their unencrypted form, so even a cluster admin that can impersonate a super user does not have access to the keys and in extension the unencrypted data.
The following diagram shows the key creation flow that takes place during topic creation and when producing/consuming data. Producer and consumer have been abstracted as user here, as the process is so similar that no distinction is really necessary at this point, further detail has been added to the more specific flow charts further down in this proposal.
Key Storage
Since the EDEKs are not stored by the KMS, they will need to be stored somewhere else. The Hadoop implementation creates a new key per file and stores the actual key in the metadata for that file. With predominantly large file sizes this is a feasible approach, for Kafka with small messages, adding the key to every messages seems like too much overhead, so a separate storage solution is necessary.
An internal topic __encryption_keys will be created with a high partition count (similar to what has been done with the __consumer_offsets topic) and keys stored in this topic partitioned by topic name. The brokers will keep an in-memory cache of encryption keys and whenever a client requests a key for a topic the cache will answer the request from this in-memory representation of the key topic. Updates will be dynamically read by subscribing to the topic.
Caveat:
This in memory cache may over time become quite large depending on number of encrypted topics and key rollover configuration. For example for 10 topics with 9 partitions each and a daily key rollover after a year will have accumulated 10 * 9 * 365 = 32850 keys - which at a key size of 256 bytes is 8 Megabyte of raw key data. While this is not terribly large, the input numbers are also fairly small. If this is applied to a huge cluster with hundreds of topics this may become more of an issue. Ideally a strategy could be found that enables regularly deleting old keys from the key topic, however it is not as simple as just setting the retention higher than the rollover value, as there is no way to know how long clients might use older keys without checking all messages in encrypted topics.
In a later version of this feature we might choose to implement a checkpointing feature where producers regularly commit the key they are currently using and the last offset they produced to. Once the smallest offset in an encrypted topic is larger than this committed value, the respective key can safely be removed. For now, I'd start without this feature though to gather some feedback from the community.
Alternative Solution:
As an alternative to keep memory pressure on the brokers to a minimum reading current keys from the topic could be delegated to the client implementation. In this case it would probably make sense to have a _keys topic per encrypted topic in order to isolate keys and keep read overhead at a minimum.
Key Rollover
Topic Encryption Key
In order to properly roll the TEK to a new version, all stored EDEKs should be decrypted and re-encrypted with the new TEK. The Hadoop KMS only offers this functionality starting in version 3.0.0 until then rollover of TEK would only affect DEKs created after this point in time. I propose to add configuration and functionality to perform a forward-effective lookup in the first version of the code. In a later step we can perhaps look into doing a call at startup to check whether the KMS supports reencrypting keys and performing a proper rollover only where supported.
Technically the implementation would be making the __encryption_keys topic a compacted topic, reading all EDEKs for the topic in question, re-encrypting them via the KMS and write them back to the topic with the same keys so that compaction will clean up obsolete keys. I propose to add this as a command line tool in a second step.
Data Encryption Key
The data encryption key should be changed in regular intervals, the broker will offer two parameters for this (see Configuration section) to configure key rollover based on time since last rollover or messages since last rollover. The controller will schedule a regular process to check whether one of the two conditions has been reached for any DEK and if this is the case request a new version for this key from the KMS and store it.
Any subsequent fetch_keys request from a client would return this new key which will then be used going forward. Decryption of old data is still possible, as the key version is stored with the messages and a specific key version can be retrieved for decryption. In order to keep track of the necessary information we will probably need to store information about the last key change in ZooKeeper for the partitions. A znode will be added under /brokers/topics/<topicname>/partitions/<partition_number>/key_change which stores the offset and date when the last key rollover occurred:
{"rollover_offset":101212,"rollover_date":1524686582,"current_version":1}
Client-side De-/Encryption
Producer
In order to avoid a wire-protocol change the implementation will serialize and encrypt the original message and then wrap it in a new message transparently. The wrapper message will have a header field set: encryption_key which contains the unique key identifier that can be used with a fetch_key request to retrieve this specific EDEK from the Kafka broker. The identifier will be of the format topicname/partition/version which should not create any parsing issues as topic names are restricted from containing / characters.
This process will be implemented after Serializer and Interceptors are done with the message right before it is added to the batch to be sent, in order to ensure that existing serializers and interceptors keep working with encryption just like without it.
Consumer
The consumer will look at the header fields of every message it receives and if an encryption_key header field is present it will treat the message as an encrypted message. If the key is not present in its in-memory cache a fetch_key request will be issued by the cache implementation to retrieve the required key and cache it for future use.
The key will be used to decrypt the message.
Configuration
New settings will be added to the broker config and topic configuration. During initial communication between broker and producer/consumer all of these settings will be passed to the client, so that encryption can happen fully transparent to the end user - no encryption parameters need to be specified in the client configuration.
The broker config will allow options to configure:
- KMS instance
- default key rotation interval
- default cipher to use
- default key length
The topic metadata that is stored in Zookeeper will be extended to contain the following information:
- key id
- rotation interval
- flag whether unencrypted data should be accepted
Flow Diagrams
Create encrypted topic
Produce to encrypted topic
Consume from encrypted topic
Proposed Public Interface Change
Binary Protocol
A new request type KeyFetchRequest will be added along with a KeyFetchResponse.
OffsetFetchRequest => [TopicName [Partition KeyVersion]] TopicName => string Partition => int32 KeyVersion => int32
The KeyVersion field can be null, in which case the broker will respond with the latest key that it has stored for that partition. For producers this will be the only request they make, as they should not have a need to encrypt data with older keys. Consumers will request keys as necessary to decode message batches that they receive and will usually request specific key versions.
In the response, the TopicKey needs to be part of the array, as with topic key rollover multiple topic keys may be active at the same time.
OffsetFetchResponse => [TopicName [Partition KeyVersion Key]] TopicName => string Partition => int32 TopicKey => string Key => bytes
Broker Config
The broker config will be extended to include the following new properties:
Option Name | Description |
---|---|
encryption.ciphers.enabled | A list of ciphers that are allowed to be used in topic level cipher settings. |
encryption.kms.uri | URI for KMS that brokers will use to request TEKs and EDEKs. |
Topic Configs
Option Name | Description |
---|---|
encryption.enabled | Is encryption enabled for this topic? |
encryption.key.rollover.messages | Number of messages per partition before a new EDEK is created by the broker. |
encryption.key.rollover.ms | Number of seconds per partition after which a new EDEK is created by the broker. |
encryption.key.length | Length of encryption keys (only affects keys after next rollover if changed) |
encryption.key.cipher | Cipher to use (only affects keys after next rollover if changed) |
Consumer Config
The consumer config will initially not be changed unless we decide to include a non-transparent version of the encryption code in the initial release.
Producer Config
The producer config will initially not be changed unless we decide to include a non-transparent version of the encryption code in the initial release.
Discussion points
As mentioned in the introduction, please keep discussion on the mailing list instead of the wiki, however I felt that it would be beneficial to track points that are still in need of clarification here, as I suspect discussion might be somewhat extensive around this KIP.
Rollover functionality
The ability to roll over Topic Keys is useful to have as stated - do we initially want to include version recognition for the KMS and use functionality to reencrypt keys where supported?
Encryption mechanism
I have proposed to simply encrypt the binary representation of the unencrypted message and use this as the payload in a new message wrapping the encrypted message. This way we can keep the original message unchanged and do not need to come up with a complex method of encrypting payload, header, etc. separately.
Is this a method that people can get behind or should we rethink this approach to do something "cleaner"? This approach might present a compatibility issue in certain scenarios where someone sends messages in an older wire format than the broker is using for on-disk storage as specified by log.message.format.version. The broker would convert the message before storing it on disk, however it would not be able to convert the encrypted record contained within, thus there is a potential for records coming out of a cluster in an older wire format than one would expect. I'm not sure that this would actually create issues in practice as the record would still contain the correct magic byte for version detection. A potential issue I could think of is when the difference in versions becomes so large that the consumer does not support the version of the encrypted message anymore, but this seems like a bit of a stretch.
Compatibility, Deprecation, and Migration Plan
Test Plan
TBD
Rejected Alternatives
Client Configuration
The initial idea was to offer a second alternative for configuring encryption without broker changes by adding parameters to the client libraries. However, this would necessitate storing keys somewhere, passing them into the clients and manually performing rollovers - all of which would encourage insecure behavior and provide a false sense of security to the end user. The decision was for this KIP to focus on providing a proper implementation with all these features.
Key Storage
Keys will not be stored as part of the message as this would invoke a very large overhead. For Hadoop the key is stored in the metadata of each file, which is a feasible approach for large files. For Kafka a message is usually quite small, so the key might actually end up being bigger than the message itself.
Key Retrieval
Key Retrieval from the storage topic will not be implemented in the client code by simply subscribing to the keys topic to limit network activity and enable applying ACLs to key retrieval requests. KMS also authorizes requests, but there is no reason to have the keys topic world readable and allow everybody to retrieve all encrypted keys at his leisure.