Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
collapsetrue
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
                                                               TransportLayer transportLayer,
                                                               Authenticator authenticator,
                                                               KerberosShortNamer kerberosShortNamer,
                                                               SslPrincipalMapper sslPrincipalMapper) {
        Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
        KafkaPrincipalBuilder builder;

        if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
        } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            try {
                Constructor<?> constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class);
                builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);
            } catch (NoSuchMethodException e) {
                builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
            } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
                throw new RuntimeException("Error instantiating custom KafkaPrincipalBuilder", e);
            }
        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
                    createPrincipalBuilder(principalBuilderClass, configs);
            builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
                    oldPrincipalBuilder, kerberosShortNamer);
        } else {
            throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
                    KafkaPrincipalBuilder.class.getName());
        }

        if (builder instanceof Configurable)
            ((Configurable) builder).configure(configs);

        return builder;
    }

This method In this will pass the custom KafkaPrincipalBuilder with SslPrincipalMapper and kerberosShortNamer objects if Constructor is available.

This approach is solving a problem for a certain Principal type and not all of the principals have to use KerberosShortNamer, it is related if the credentials are provided by the Kerberos authentication.

Alternative-2: Changes in SslPrincipalMapper

...