Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 If we make the change suggested belowin the next section, the command runs successfully and reports the group offsets.

The following potential unit tests in scala.integration.kafka.api.AuthorizerIntegrationTest could further clarify the problem. 

Code Block
languagescala
// this test is to clarify that the issue exists for the consumer group command line only, and not the API
@Test
def testDescribeGroupApiWithGroupDescribe() {
  addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
  addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
  AdminClient.createSimplePlaintext(brokerList).describeConsumerGroup(group)
}

// this test highlights the issue with command line, where the supposedly sufficient 'Describe' access is not enough to run the command
@Test(expected = classOf[GroupAuthorizationException])
def testDescribeGroupCliWithGroupDescribe() {
  addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
  addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)

  val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
  val opts = new ConsumerGroupCommandOptions(cgcArgs)
  val consumerGroupService = new KafkaConsumerGroupService(opts)
  consumerGroupService.describeGroup()
}

// this test confirms that a minimum of 'Read' access is required to successfully run the command
@Test
def testDescribeGroupCliWithGroupRead() {
  addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
  addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)

  val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
  val opts = new ConsumerGroupCommandOptions(cgcArgs)
  val consumerGroupService = new KafkaConsumerGroupService(opts)
  consumerGroupService.describeGroup()
}

 

Proposed Changes

The change proposed by this KIP is very simple: to lower the minimum required permission of the OffsetFetch API from Read to Describe. These minimum required permissions are hard-coded in kafka.server.KafkaApis.scala inside each API handler method. For example, the part that enforces the minimum required permission for the OffsetFetch API currently looks like this:

...