...
Current state: Under Discussion
Discussion thread: here
Pull Request thread: here & Pull Request
Vote thread: here
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-15452
...
Code Block | ||||
---|---|---|---|---|
| ||||
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs, TransportLayer transportLayer, Authenticator authenticator, KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); final KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (SSLPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { SSLPrincipalBuilder sslPrincipalBuilder = (SSLPrincipalBuilder) Utils.newInstance(principalBuilderClass); sslPrincipalBuilder.buildSSLPrincipalBuilder(sslPrincipalMapper); builder = sslPrincipalBuilder; } else if (KerberosPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { KerberosPrincipalBuilder kerberosPrincipalBuilder = (KerberosPrincipalBuilder) Utils.newInstance(principalBuilderClass); kerberosPrincipalBuilder.buildKerberosPrincipalBuilder(kerberosShortNamer); builder = kerberosPrincipalBuilder; } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder = createPrincipalBuilder(principalBuilderClass, configs); builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer, oldPrincipalBuilder, kerberosShortNamer); } else { throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " + "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " + KafkaPrincipalBuilder.class.getName()); } if (builder instanceof Configurable) ((Configurable) builder).configure(configs); return builder; } |
These changes ensure that custom KafkaPrincipalBuilder implementations can now access SslPrincipalMapper and KerberosShortNamer, enabling support for mapping rules and improving the overall security configuration of Kafka brokers.
The introduced interfaces provide a clear and extensible way to handle different types of principals, making the solution more modular and maintainable.
Compatibility, Deprecation, and Migration Plan
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs, TransportLayer transportLayer, Authenticator authenticator, KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { try { Constructor<?> constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class); builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper); } catch (NoSuchMethodException e) { builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { throw new RuntimeException("Error instantiating custom KafkaPrincipalBuilder", e); } } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder = createPrincipalBuilder(principalBuilderClass, configs); builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer, oldPrincipalBuilder, kerberosShortNamer); } else { throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " + "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " + KafkaPrincipalBuilder.class.getName()); } if (builder instanceof Configurable) ((Configurable) builder).configure(configs); return builder; } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
protected void configurePrincipalBuilder(Map<String, ?> configs) { // implementation } |
...