...
If we make the change suggested belowin the next section, the command runs successfully and reports the group offsets.
The following potential unit tests in scala.integration.kafka.api.AuthorizerIntegrationTest
could further clarify the problem.
Code Block | ||
---|---|---|
| ||
// this test is to clarify that the issue exists for the consumer group command line only, and not the API @Test def testDescribeGroupApiWithGroupDescribe() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group) } // this test highlights the issue with command line, where the supposedly sufficient 'Describe' access is not enough to run the command @Test(expected = classOf[GroupAuthorizationException]) def testDescribeGroupCliWithGroupDescribe() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupService = new KafkaConsumerGroupService(opts) consumerGroupService.describeGroup() } // this test confirms that a minimum of 'Read' access is required to successfully run the command @Test def testDescribeGroupCliWithGroupRead() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) val opts = new ConsumerGroupCommandOptions(cgcArgs) val consumerGroupService = new KafkaConsumerGroupService(opts) consumerGroupService.describeGroup() } |
Proposed Changes
The change proposed by this KIP is very simple: to lower the minimum required permission of the OffsetFetch
API from Read to Describe. These minimum required permissions are hard-coded in kafka.server.KafkaApis.scala
inside each API handler method. For example, the part that enforces the minimum required permission for the OffsetFetch
API currently looks like this:
...