...
The proposed changes are fully backward-compatible and do not require any deprecations or migration efforts. Existing custom KafkaPrincipalBuilder implementations will continue to work without any modifications. However, they can be updated to utilize the new functionality if desired.
Rejected Alternatives
Alternative-1: Changes in ChannelBuilders
In the ChannelBuilders class, we will update the existing createPrincipalBuilder method to Check for an additional Constructor:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs, TransportLayer transportLayer, Authenticator authenticator, KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { try { Constructor<?> constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class); builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper); } catch (NoSuchMethodException e) { builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { throw new RuntimeException("Error instantiating custom KafkaPrincipalBuilder", e); } } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder = createPrincipalBuilder(principalBuilderClass, configs); builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer, oldPrincipalBuilder, kerberosShortNamer); } else { throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " + "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " + KafkaPrincipalBuilder.class.getName()); } if (builder instanceof Configurable) ((Configurable) builder).configure(configs); return builder; } |
This method will pass the custom KafkaPrincipalBuilder with SslPrincipalMapper and kerberosShortNamer objects if Constructor is available.
Alternative-2: Changes in SslPrincipalMapper
An alternative workaround is to read the configuration and build another SslPrincipalMapper in the custom KafkaPrincipalBuilder implementation. However, this approach is less efficient and may lead to code duplication. The proposed solution provides a more elegant and efficient way to address the issue.
Alternative-3: Changes in SslChannelBuilder
In the SslChannelBuilder class, we will add a new method configurePrincipalBuilder that configures the custom KafkaPrincipalBuilder with SslPrincipalMapper and kerberosShortNamer:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
protected void configurePrincipalBuilder(Map<String, ?> configs) { // implementation } |
...