Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
                                                               TransportLayer transportLayer,
                                                               Authenticator authenticator,
                                                               KerberosShortNamer kerberosShortNamer,
                                                               SslPrincipalMapper sslPrincipalMapper) {
        Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
        final KafkaPrincipalBuilder builder;

        if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
        } else if (SSLPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            SSLPrincipalBuilder sslPrincipalBuilder = (SSLPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            sslPrincipalBuilder.buildSSLPrincipalBuilder(sslPrincipalMapper);
            builder = sslPrincipalBuilder;
        } else if (KerberosPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            KerberosPrincipalBuilder kerberosPrincipalBuilder = (KerberosPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            kerberosPrincipalBuilder.buildKerberosPrincipalBuilder(kerberosShortNamer);
            builder = kerberosPrincipalBuilder;
        } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
                    createPrincipalBuilder(principalBuilderClass, configs);
            builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
                    oldPrincipalBuilder, kerberosShortNamer);
        } else {
            throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
                    KafkaPrincipalBuilder.class.getName());
        }

        if (builder instanceof Configurable)
            ((Configurable) builder).configure(configs);

        return builder;
    }


These changes ensure that custom KafkaPrincipalBuilder implementations can now access SslPrincipalMapper and KerberosShortNamer, enabling support for mapping rules and improving the overall security configuration of Kafka brokers.

The introduced interfaces provide a clear and extensible way to handle different types of principals, making the solution more modular and maintainable.

Compatibility, Deprecation, and Migration Plan

...