Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here

Pull Request thread: here & Pull Request

Vote thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15452

...

Code Block
languagejava
linenumberstrue
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
                                                               TransportLayer transportLayer,
                                                               Authenticator authenticator,
                                                               KerberosShortNamer kerberosShortNamer,
                                                               SslPrincipalMapper sslPrincipalMapper) {
        Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
        final KafkaPrincipalBuilder builder;

        if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
        } else if (SSLPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            SSLPrincipalBuilder sslPrincipalBuilder = (SSLPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            sslPrincipalBuilder.buildSSLPrincipalBuilder(sslPrincipalMapper);
            builder = sslPrincipalBuilder;
        } else if (KerberosPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            KerberosPrincipalBuilder kerberosPrincipalBuilder = (KerberosPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            kerberosPrincipalBuilder.buildKerberosPrincipalBuilder(kerberosShortNamer);
            builder = kerberosPrincipalBuilder;
        } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
                    createPrincipalBuilder(principalBuilderClass, configs);
            builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
                    oldPrincipalBuilder, kerberosShortNamer);
        } else {
            throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
                    KafkaPrincipalBuilder.class.getName());
        }

        if (builder instanceof Configurable)
            ((Configurable) builder).configure(configs);

        return builder;
    }


These changes ensure that custom KafkaPrincipalBuilder implementations can now access SslPrincipalMapper and KerberosShortNamer, enabling support for mapping rules and improving the overall security configuration of Kafka brokers.

The introduced interfaces provide a clear and extensible way to handle different types of principals, making the solution more modular and maintainable.

Compatibility, Deprecation, and Migration Plan

...

Code Block
languagejava
linenumberstrue
collapsetrue
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
                                                               TransportLayer transportLayer,
                                                               Authenticator authenticator,
                                                               KerberosShortNamer kerberosShortNamer,
                                                               SslPrincipalMapper sslPrincipalMapper) {
        Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
        KafkaPrincipalBuilder builder;

        if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
        } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            try {
                Constructor<?> constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class);
                builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);
            } catch (NoSuchMethodException e) {
                builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
                throw new RuntimeException("Error instantiating custom KafkaPrincipalBuilder", e);
            }
        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
                    createPrincipalBuilder(principalBuilderClass, configs);
            builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
                    oldPrincipalBuilder, kerberosShortNamer);
        } else {
            throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
                    KafkaPrincipalBuilder.class.getName());
        }

        if (builder instanceof Configurable)
            ((Configurable) builder).configure(configs);

        return builder;
    }

...

Code Block
languagejava
linenumberstruecollapsetrue
protected void configurePrincipalBuilder(Map<String, ?> configs) {
    // implementation
}

...