Apache Kafka
Apache Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design that enables Kafka to achieve very high throughput and very low latencies.
First let's review some basic messaging terminology:
- Kafka maintains feeds of messages in categories called topics.
- We'll call processes that publish messages to a Kafka topic producers.
- We'll call processes that subscribe to topics and process the feed of published messages consumers.
- Kafka is run as a cluster comprised of one or more servers each of which is called a broker.
So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:
Authorization in Apache Kafka
Starting from 0.9.0.0 release, Apache Kafka has various security features built in, like, encryption and authentication using SSL, authentication using SASL, Apache Zookeeper authentication, quotas, and authorization. There are various ways one would want to have authorization done and so Kafka allows different authorization implementations to plug into Kafka. By default Apache Kafka comes with a Zookeeper based authorization implementation, which uses Zookeeper to store ACLs. Note that Kafka has a hard dependency on Zookeeper for configuration management and leader election, and is not an additional requirement to be able to use out of box Zookeeper based authorization. Though it is nice to not have any dependency on an external system for out-of-the-box authorization implementation in Apache Kafka, it has quite a few shortcomings.
- Only supports User principal, so one will have to create an ACL for each and every user of a Kafka cluster, and for each resource they need access to. This could be a huge operational concern for enterprises or clusters with large number of users.
- No way to use user group mapping from external services, like, LDAP, AD, etc. Quite often organizations already have some sort of user group mapping service and redefining those mapping just for authorization in Apache Kafka is probably not the best idea.
- Very Kafka specific implementation. It is not ideal to have separate authorization entities for each component in a data pipeline. It makes it too hard to manage and as users or pipeline's complexity grows, it gets worse.
- Zookeeper based Kafka authorization stores ACLs under zNodes in Zookeeper as JSON strings. As zNodes have size limitations, recommended size is only 1MB, and as ACLs need to be created for each and every user, JSON strings can easily grow beyond zNode's recommended size. It is not scalable.
- Many concurrency issues have been found and fixed lately, but it is not battle tested and is definitely not production ready.
Apache Sentry
Apache Sentry is a system for enforcing fine grained role based authorization. Role Based Authorization Control, RBAC, is a powerful mechanism to manage authorization for a large set of users and data objects in a typical enterprise. Apache Sentry allows for various systems to integrate with it for utilizing it's generic and powerful authorization. Many systems, like, Hive, Impala, HDFS, Sqoop, etc are already capable of using Apache Sentry for providing authorization. It is also capable of getting user group mapping from external systems, like, LDAP, AD, etc. All the shortcomings of Zookeeper based out-of-the-box Apache Kafka authorization implementation can be taken care of if we choose Apache Sentry to provide authorization in Apache Kafka as well.
Starting from 1.7.0 release, Apache Sentry has Kafka binding that can be used to enable authorization in Apache Kafka with Apache Sentry. Following sections go over how to configure Apache Kafka to use Apache Sentry for authorization and a quick-start guide.
Configuring Apache Kafka to use Apache Sentry for Authorization
To enable authorization in Apache Kafka and use Apache Sentry for authorization, follow these steps.
- Add required Sentry jars to Kafka's classpath.
- Add following configs to Kafka broker's properties file.
- authorizer.class.name=org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer
- sentry.kafka.site.url=file:<path to SENTRY-SITE.XML> // with information on how to connect with Sentry server
- sentry.kafka.principal.hostname=<HOST> // host of Kafka broker, required to perform kerberos authentication with Sentry server
- sentry.kafka.kerberos.principal=<KAFKA_PRINCIPAL> // kerberos principal of user running Kafka, required to perform kerberos authentication with Sentry server
- sentry.kafka.keytab.file=<KAFKA_KEYTAB_FILE> // keytab file of user running Kafka, required to perform kerberos authentication with Sentry server
- Add super users
- super.users=<Semicolon separated list of users in form User:<USERNAME1>;User:<USERNAME2>> these users can perform any action on any resource in the Kafka cluster. It is recommended that user running Kafka broker processes is a super user. This will avoid each inter broker communication to be authenticated against Sentry, which might have huge performance impact.
Quick Start
This tutorial assumes you have a working Kafka and Sentry installations, Kafka is configured to use Sentry for authorization, and you are starting fresh and have no existing Kafka or ZooKeeper data.
CLI tool for performing CRUD of privileges and roles.
org.apache.sentry.provider.db.generic.tools.SentryShellKafka
is a tool that takes a configuration file with connection details of Sentry server. One can simply wrap this in a shell script or create an alias for easy access. Below is a sample shell script that uses Apache Kafka's kafka-run-class script that takes care of building classpath. Assuming you have copied all required Sentry jars into Kafka's libs dir or have added those JARS to CLASSPATH
env variable, which is used by kafka-run-class script, below script will work for you.
#!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. CONFIG_OPTION="--config" KAFKA_CONF_DIR=${KAFKA_CONF_DIR:-/etc/kafka/conf} SENTRY_CONF_DIR=${SENTRY_CONF_DIR:-$KAFKA_CONF_DIR/sentry-conf} SENTRY_SITE_XML=${SENTRY_SITE_XML:-sentry-site.xml} SENTRY_CONF_FILE=$SENTRY_CONF_DIR/$SENTRY_SITE_XML USAGE_STRING="USAGE: kafka-sentry [$CONFIG_OPTION <path_to_sentry_conf_dir>] <sentry-cli-arguments>" if [[ "$CONFIG_OPTION" == $1 ]]; then conf_dir=$2 shift;shift conf_file=$conf_dir/$SENTRY_SITE_XML if [[ ! -f $conf_file ]]; then echo "Configuration file, ${conf_file}, does not exist." echo "${USAGE_STRING}" exit 1 fi else if [[ -f "${SENTRY_CONF_FILE}" ]]; then conf_file=${SENTRY_CONF_FILE} else echo "No configuration directory for Sentry specified and default conf file ${SENTRY_CONF_FILE} does not exist. Please provide a configuration directory that contains sentry-site.xml with information on how to connect with Sentry service." echo "${USAGE_STRING}" exit 1 fi fi # supress the HADOOP_HOME warnings in 1.x.x export HADOOP_HOME_WARN_SUPPRESS=true exec $(dirname $0)/kafka-run-class.sh org.apache.sentry.provider.db.generic.tools.SentryShellKafka -conf $conf_file "$@"
Rest of this document will refer this script as kafka-sentry
and will use it for performing CRUD operations on roles and privileges. Below is usage info from the tool.
$ kafka-sentry usage: kafka-sentry -arg,--add_role_group Add role to group -conf,--sentry_conf <arg> sentry-site file path -cr,--create_role Create role -dr,--drop_role Drop role -drg,--delete_role_group Delete role from group -g,--groupname <arg> Group name -gpr,--grant_privilege_role Grant privilege to role -h,--help Shell usage -lp,--list_privilege List privilege -lr,--list_role List role -p,--privilege <arg> Privilege string -r,--rolename <arg> Role name -rpr,--revoke_privilege_role Revoke privilege from role
Creating a role
Here we create a role, test
.
$ kafka-sentry -cr -r test $ kafka-sentry -lr test
Assigning a role to a group
Here we assign the created role, test,
to a group, test-group
. All users in this group, will get any privilege we grant to the role, test
.
$ kafka-sentry -arg -r test -g test-group
Authorizable Resources
Authorizable resources are resources or entities in a Kafka cluster that require special permissions for a user to be able to perform some action on it. As of now Kafka has three authorizable resources.
- Cluster, this controls who can perform cluster level operations, like, creating a topic, deleting a topic, etc. This can only have one value
kafka-cluster
, as one Kafka cluster can not have more than one Cluster resources. - Topic, this controls who can perform topic level operations, like, producing to topic, consuming from topic, etc. Its value must match exactly with the topic name in Kafka cluster.
- Consumergroup, this controls who can perfrom consumergroup level operations, like, join an existing consumergroup, querying offset for a partition, describe a consumergroup, etc. Its value must exactly match
group.id
of a consumer group. - Host, this controls from where specific operations can be performed. It can be though of as a way to achieve IP filtering in Kafka. This can have a wildcard,
*
, as a value, which represents all hosts.
Authorized Actions
Each resource can have multiple actions that users can perform on them. Following operations are supported in Kafka, however not all actions are valid on all resources.
- ALL, this is a wildcard action, and represents all possible actions on a resource.
- read
- write
- create
- delete
- alter
- describe
- clusteraction
Authorizing Privileges
Privileges define what actions are allowed on a resource. A privilege is represented as a string in Sentry. Following are the criterias of a valid privilege.
- Can have at most one
Host
resource. If noHost
resource is specified in a privilege string,Host=*
is assumed. - Must have exactly one non
Host
resource. - Must have exactly one action specified at the end of privilege string.
Valid privilege strings
Host=*->Topic=test->action=ALL
Topic=test->action=ALL
Invalid privilege strings
Host=*->Host=127.0.0.1->Topic=test->action=ALL
, has multipleHost
resourcesCluster=kafka-cluster->Topic=test->action=ALL
, has multiple nonHost
resourcesTopic=test->action=ALL->action=read
, has multiple actionsCluster=cluster1->Topic=test->action=ALL
, should only havekafka-cluster
asCluster
valueaction=ALL->Topic=test
, action must be specified at the end of privilege string
Granting privileges to a role
Here we grant some privileges to the role, test, so that users in
testGroup
can create a topic, testTopic
, and produce to it.
Allow users in test-group
to create a new topic from localhost.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Cluster=kafka-cluster->action=create"
Allow users in test-group
to describe testTopic from localhost, which the user will create and use
.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Topic=testTopic->action=describe"
Allow users in test-group
to write to testTopic
from localhost, this will allow the users to produce to testTopic
.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Topic=testTopic->action=write"
Create testTopic
.
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic $ kafka-topics.sh --list --zookeeper localhost:2181 testTopic
Produce to testTopic
. Note that you will have to pass a config file, producer.properties, with information on jaas conf and other kerberos authentication related information. Here is more information.
$ kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic --producer.config producer.properties This is a message This is another message
Allow users in test-group
to describe a consumer group, testconsumergroup
, that it will be starting or joining.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Consumergroup=testconsumergroup->action=describe"
Allow users in test-group
to read from a consumer group, testconsumergroup
, that it will be starting or joining.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Topic=testTopic->action=read"
Allow users in test-group
to read from testTopic
from localhost, this will allow the users to consumer from testTopic
.
$ kafka-sentry -gpr -r test -p "Host=127.0.0.1->Topic=testTopic->action=read"
Consume from testTopic
. Note that you will have to pass a config file, consumer.properties, with information on jaas conf and other kerberos authentication related information. The config file must also specify group.id
as testconsumergroup
. Here is more information.
$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning --consumer.config consumer.properties This is a message This is another message
Performance Comparison
Following numbers are obtained by running Kafka's ProducerPerformance tests on a three node Kafka cluster, one node Zookeeper and one node Sentry service. All tests are run from same host and test topic has three partitions with three set as replication factor.
Future Work
Though Kafka authorization via Sentry works and it works nicely, there are a few more things that can be done here.
- Add caching of Sentry privilege objects, re-creating these privilege objects is costly and has been raised as concern on Sentry's dev mailing list as well. This will further improve Sentry's Kafka authorizer's performance.
- Add a native script, similar to the example added above, to allow users to get started with performing CRUD operations on roles and privileges out-of-the-box.
- Refactor the authorizer code after KIP-50 goes in Kafka.