Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
public interface SSLPrincipalBuilder extends KafkaPrincipalBuilder {
    void buildSSLPrincipalBuilder(SslPrincipalMapper sslPrincipalMapper);
}


In the ChannelBuilders class, the createPrincipalBuilder method has been updated to support custom principal builders with different authentication mechanisms. When creating a custom principal builder, the appropriate interface methods are called to set the necessary configurations. 


Here is the updated code snippet:

Code Block
languagejava
linenumberstrue
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
                                                               TransportLayer transportLayer,
                                                               Authenticator authenticator,
                                                               KerberosShortNamer kerberosShortNamer,
                                                               SslPrincipalMapper sslPrincipalMapper) {
        Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
        final KafkaPrincipalBuilder builder;

        if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
        } else if (SSLPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            SSLPrincipalBuilder sslPrincipalBuilder = (SSLPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            sslPrincipalBuilder.buildSSLPrincipalBuilder(sslPrincipalMapper);
            builder = sslPrincipalBuilder;
        } else if (KerberosPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            KerberosPrincipalBuilder kerberosPrincipalBuilder = (KerberosPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            kerberosPrincipalBuilder.buildKerberosPrincipalBuilder(kerberosShortNamer);
            builder = kerberosPrincipalBuilder;
        } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
                    createPrincipalBuilder(principalBuilderClass, configs);
            builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
                    oldPrincipalBuilder, kerberosShortNamer);
        } else {
            throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
                    KafkaPrincipalBuilder.class.getName());
        }

        if (builder instanceof Configurable)
            ((Configurable) builder).configure(configs);

        return builder;
    }





Compatibility, Deprecation, and Migration Plan

...