Overview
The following is a proposal for securing Apache Kafka.
Features In Scope
- Authentication
- Unix-like users and permissions and some kind of group or ACL notion
- Need some kind of group or ACL notion
- Encryption over the wire
- No backward incompatible changes
Kerberos is the most commonly requested authentication mechanism for human usage, SSL is more common for applications that access Kafka.
We think you can have security be an all or nothing proposition, it has to be controllable on a per-topic basis with some kind of granular authorization. In the absence of this you will end up with one Kafka cluster per application which defeats the purpose of a central message brokering cluster. Hence you need permissions and a manageable way to assign these in a large organization.
We think all this can probably be done in a backwards compatible manner and without significant performance degradation for non-secure users.
Features Out Of Scope (For Now)
- Encryption of data at rest
- Per-column encryption/security
- Non-repudiation
The above items are important things that people want but for now can be implemented at a level above Kafka by encrypting individual fields in the message.
Details on these items is given below.
Authentication
We need to support several methods of authentication:
- SSL for access from applications
- Kerberos for access on behalf of people
- Hadoop delegation tokens to enable MapReduce, Samza, or other frameworks running in the Hadoop environment to access Kafka
We will use SASL for kerberos.
SASL does not actually transmit the bits required for authentication. To handle this we will need to add a new AuthRequest/AuthResponse API to our protocol. This will contain only a simple byte array containing the auth bytes SASL needs.
Generally you would expect authentication to happen at connection time, but I don't think there is really any reason we need to require this. I think instead we can allow it at any time during a session or even multiple times during a session (if the client wishes to change their user a la su). However this is fundamentally attached to the connection, so if the client reconnects they will lose their authentication and need to re-authenticate.
All connections that have not yet been authenticated will be assigned a fake user ("nobody" or "josephk" or something).
Regardless of the mechanism by which you connect and authenticate, the mechanism by which we check your permissions should be the same. The side effect of a successful connection via SSL with a client cert or a successful auth request will be that we store the user information along with that connection. The user will be based along with the request object to KafkaApis on each subsequent request.
Implementation notes
For TLS we would need a separate TLS port. Presumably this would need to be maintained in the cluster metadata so clients can choose to connect to the appropriate port.
This feature requires some co-operation between the socket server and the api layer. The API layer will handle the authenticate request, but the username will be associated with the connection. One approach to implementing this would be to add the concept of a Session object that is maintained with the connection and contains the username. The session would be stored in the context for the socket in socket server and destroyed as part of socket close. The session would be passed down to the API layer with each request and we would have something like session.authenticatedAs() to get the username to use for authorization purposes. We will also record in the session information about the security level of the connection (does it use encryption? integrity checks?) for use in authorization.
All future checks for authorization will just check this session information.
Authorization
The plan will be to support unix-like permissions on a per-topic level.
Authorization will be done in the "business logic" layer in Kafka (aka KafkaApis). The API can be something like
PermissionManager.isPermitted(Subject subject, Permissions permission, String resource)
For example doing a produce request you would likely check something like the following:
PermissionManager.isPermitted(session.subject(), Permissions.WRITE, topicName)
This check will obviously have to be quite quick as it will be done on every request so the necessary metadata will need to be cached.
The subject is basically the "user name" or identify of the person trying to take some action. This will be established via whatever authentication mechanism. The action is basically a list of things you may be permitted to do (e.g. read, write, etc).
The PermissionManager will both check whether access was permitted and also log the attempt for audit purposes.
The resource will generally be based on the topic name but there could be other resources we want to secure so we can just treat it as an arbitrary string.
I could imagine the following permissions:
READ - Permission to fetch data from the topic
WRITE - Permission to publish data to the topic
DELETE - Permission to delete the topic
CREATE - Permission to create the topic
CONFIGURE - Permission to change the configuration for the topic
DESCRIBE - Permission to fetch metadata on the topic
REPLICATE - Permission to participate as a replica (i.e. issue a fetch request with a non-negative node id). This is different from READ in that it has implications for when a write request is committed.
Permission are not hierarchical since topics are not hierarchical. So a user will have a default value for these (a kind of umask) as well as a potential override on a per-topic basis. Note that CREATE and DESCRIBE permission primarily makes sense at the default level.
Implementing the PermissionManager
The above just gives a high-level api for checking if a particular user is allowed to do a particular thing. How permissions are stored, and how users are grouped together is going to need to be pluggable.
There are several scenarios we have considered:
- Some users may want to pick up and run Kafka without much in the way of external dependencies. These users will want a simple way to maintain permissions that works well out of the box.
- Hortonworks and Cloudera each have separate nascent attempts at securing the larger Hadoop ecosystem across multiple services. As these mature the best way to integrate into the larger ecosystem for their users will be to use either Sentry (Cloudera) or Argus (Hortonworks) depending on the Hadoop distribution the particular organization has.
- Large organizations often have very particular ways of managing security, auditing access, or implementing groups. There are various theories on the best way to manage the assignment of permissions to users (i.e. via roles, groups, acls, etc.).
Unfortunately, there is no single implementation that can satisfy all these cases. Instead we can make the PermissionsManager interface pluggable at run time so that users can specify their implementation in config.
We will try to provide something simple out of the box.
Sequencing
Here is a proposed sequence of work
Phase 1: Prep
- Add session as communication mechanism between socket server and kafka api layer.
- Add SSL port to metadata request
Phase 2: Authentication
- Allow disabling sendfile for reads that need encryption or other integrity checks added
- Implement SSL
- Implement SASL
Phase 3: Authorization
- Implement PermissionManager interface and implement the "out of the box" implementation.
Open Questions
Do we need to separately model hosts? i.e. in addition to user do we need to pass into the authorization layer information about what host the access is coming from.
Likely we need a way to specify the minimum encryption/integrity level of a client that is allowed to read data. Likely we should define something generic like NONE < INTEGRITY < ENCRYPTED and allow the user to set a minimum level for each topic so you can guarantee a particular data stream never goes in the clear.
Out-of-scope Features
On disk and per-field encryption
This is very important and something that can be facilitated within the wire protocol. It requires an additional map data structure for the "encrypted [data encryption key]". With this map (either in your object or in the wire protocol) you can store the dynamically generated symmetric key (for each message) and then encrypt the data using that dynamically generated key. You then encrypt the encryption key using each public key for whom is expected to be able to decrypt the encryption key to then decrypt the message. For each public key encrypted symmetric key (which is now the "encrypted [data encryption key]" along with which public key it was encrypted with for (so a map of [publicKey] = encryptedDataEncryptionKey) as a chain. Other patterns can be implemented but this is a pretty standard digital enveloping [0] pattern with only 1 field added. Other patterns should be able to use that field to-do their implementation too.
Non-repudiation and long term non-repudiation