Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposed changes involve modifications to the following classesclass:

  • org.apache.kafka.common.network.ChannelBuilders
  • org.apache.kafka.common.network.SslChannelBuilder

ChannelBuilders

In the ChannelBuilders class, we will add a new method createPrincipalBuilder that takes an additional parameter Map<String, Object> configsupdate the existing createPrincipalBuilder method to Check for an additional Constructor:

Code Block
languagejava
public static KafkaPrincipalBuilder createPrincipalBuilder(
        KafkaPrincipalBuilder principalBuilder,
        SslPrincipalMapper sslPrincipalMapper,
        KerberosShortNamer kerberosShortNamer,
        Map<String, Object> configs) {
    // implementation
}

This method will be used to create a custom KafkaPrincipalBuilder with access to SslPrincipalMapper and kerberosShortNamer.

SslChannelBuilder

In the SslChannelBuilder class, we will add a new method configurePrincipalBuilder that configures the custom KafkaPrincipalBuilder with SslPrincipalMapper and kerberosShortNamer:

Code Block
languagejava
protected void configurePrincipalBuilder(Map<String, ?> configs) {
    // implementation
}
Map<String, ?> configs,
                                                               TransportLayer transportLayer,
                                                               Authenticator authenticator,
                                                               KerberosShortNamer kerberosShortNamer,
                                                               SslPrincipalMapper sslPrincipalMapper) {
        Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
        KafkaPrincipalBuilder builder;

        if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
        } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            try {
                Constructor<?> constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class);
                builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);
            } catch (NoSuchMethodException e) {
                builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
                throw new RuntimeException("Error instantiating custom KafkaPrincipalBuilder", e);
            }
        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
                    createPrincipalBuilder(principalBuilderClass, configs);
            builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
                    oldPrincipalBuilder, kerberosShortNamer);
        } else {
            throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
                    KafkaPrincipalBuilder.class.getName());
        }

        if (builder instanceof Configurable)
            ((Configurable) builder).configure(configs);

        return builder;
    }

This method will pass the custom KafkaPrincipalBuilder with SslPrincipalMapper and kerberosShortNamer objects if Constructor is availableThis method will be called in the ChannelBuilders class to configure the custom KafkaPrincipalBuilder.

Compatibility, Deprecation, and Migration Plan

The proposed changes are fully backward-compatible and do not require any deprecations or migration efforts. Existing custom KafkaPrincipalBuilder implementations will continue to work without any modifications. However, they can be updated to utilize the new functionality if desired.

Rejected Alternatives

SslPrincipalMapper

An alternative workaround is to read the configuration and build another SslPrincipalMapper in the custom KafkaPrincipalBuilder implementation. However, this approach is less efficient and may lead to code duplication. The proposed solution provides a more elegant and efficient way to address the issue.

SslChannelBuilder

In the SslChannelBuilder class, we will add a new method configurePrincipalBuilder that configures the custom KafkaPrincipalBuilder with SslPrincipalMapper and kerberosShortNamer:

Code Block
languagejava
protected void configurePrincipalBuilder(Map<String, ?> configs) {
    // implementation
}

This method will be called in the ChannelBuilders class to configure the custom KafkaPrincipalBuilder.

Test Plan

The test plan for this KIP includes:

  1. Unit tests to ensure that the new methods in ChannelBuilders and SslChannelBuilder work  work as expected.
  2. Integration tests to ensure that custom KafkaPrincipalBuilder implementations can access SslPrincipalMapper and kerberosShortNamer properly.
  3. Performance tests to ensure that the proposed changes do not introduce any performance regressions.

...