Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here

Pull Request thread: here & Pull Request

Vote thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15452

...

Proposed Changes

The proposed changes involve modifications aim to the following class:

  • org.apache.kafka.common.network.ChannelBuilders

...

enhance the flexibility and usability of custom KafkaPrincipalBuilder implementations by introducing interfaces and subclasses to handle different types of principals and their configurations.

Specifically, two new interfaces have been introduced: KerberosPrincipalBuilder and SSLPrincipalBuilder, both of which extend the existing KafkaPrincipalBuilder interface. These interfaces provide methods to set the necessary configurations for building Kafka principals with Kerberos authentication and SSL authentication, respectively.

Additionally, the ChannelBuilders class has been updated to accommodate these changes and provide support for custom principal builders with different authentication mechanisms.


Firstly, the KerberosPrincipalBuilder interface is defined as follows:

Code Block
languagejava
linenumberstrue
public interface KerberosPrincipalBuilder extends KafkaPrincipalBuilder {
    void buildKerberosPrincipalBuilder(KerberosShortNamer kerberosShortNamer);
}


Secondly, the SSLPrincipalBuilder interface is defined as follows:

Code Block
languagejava
linenumberstrue
public interface SSLPrincipalBuilder extends KafkaPrincipalBuilder {
    void buildSSLPrincipalBuilder(SslPrincipalMapper sslPrincipalMapper);
}


In the ChannelBuilders class, we will update the existing createPrincipalBuilder method to Check for an additional Constructorhas been updated to support custom principal builders with different authentication mechanisms. When creating a custom principal builder, the appropriate interface methods are called to set the necessary configurations. 


Here is the updated code snippet:

Code Block
languagejava
linenumberstrue
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
                                                               TransportLayer transportLayer,
                                                               Authenticator authenticator,
                                                               KerberosShortNamer kerberosShortNamer,
                                                               SslPrincipalMapper sslPrincipalMapper) {
        Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
        final KafkaPrincipalBuilder builder;

        if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
        } else if (KafkaPrincipalBuilderSSLPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            trySSLPrincipalBuilder {
sslPrincipalBuilder = (SSLPrincipalBuilder) Utils.newInstance(principalBuilderClass);
             Constructor<?> constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.classsslPrincipalBuilder.buildSSLPrincipalBuilder(sslPrincipalMapper);
                builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper)sslPrincipalBuilder;
        } else   } catch (NoSuchMethodException eif (KerberosPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            KerberosPrincipalBuilder  kerberosPrincipalBuilder  builder = (KafkaPrincipalBuilderKerberosPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
kerberosPrincipalBuilder.buildKerberosPrincipalBuilder(kerberosShortNamer);
            builder = kerberosPrincipalBuilder;
        } else throw new RuntimeException("Error instantiating custom KafkaPrincipalBuilder", e);
if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            builder =  }(KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
                    createPrincipalBuilder(principalBuilderClass, configs);
            builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
                    oldPrincipalBuilder, kerberosShortNamer);
        } else {
            throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
                    KafkaPrincipalBuilder.class.getName());
        }

        if (builder instanceof Configurable)
            ((Configurable) builder).configure(configs);

        return builder;
    }


This method will pass the These changes ensure that custom KafkaPrincipalBuilder with SslPrincipalMapper and kerberosShortNamer objects if Constructor is available implementations can now access SslPrincipalMapper and KerberosShortNamer, enabling support for mapping rules and improving the overall security configuration of Kafka brokers.

The introduced interfaces provide a clear and extensible way to handle different types of principals, making the solution more modular and maintainable.

Compatibility, Deprecation, and Migration Plan

...

Code Block
languagejava
linenumberstrue
collapsetrue
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
                                                               TransportLayer transportLayer,
                                                               Authenticator authenticator,
                                                               KerberosShortNamer kerberosShortNamer,
                                                               SslPrincipalMapper sslPrincipalMapper) {
        Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
        KafkaPrincipalBuilder builder;

        if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
        } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            try {
                Constructor<?> constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class);
                builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);
            } catch (NoSuchMethodException e) {
                builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
                throw new RuntimeException("Error instantiating custom KafkaPrincipalBuilder", e);
            }
        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
                    createPrincipalBuilder(principalBuilderClass, configs);
            builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
                    oldPrincipalBuilder, kerberosShortNamer);
        } else {
            throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
                    KafkaPrincipalBuilder.class.getName());
        }

        if (builder instanceof Configurable)
            ((Configurable) builder).configure(configs);

        return builder;
    }

...

Code Block
languagejava
linenumberstruecollapsetrue
protected void configurePrincipalBuilder(Map<String, ?> configs) {
    // implementation
}

...