THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SSLPrincipalBuilder extends KafkaPrincipalBuilder { void buildSSLPrincipalBuilder(SslPrincipalMapper sslPrincipalMapper); } |
In the ChannelBuilders class, the createPrincipalBuilder method has been updated to support custom principal builders with different authentication mechanisms. When creating a custom principal builder, the appropriate interface methods are called to set the necessary configurations.
Here is the updated code snippet:
Code Block | ||||
---|---|---|---|---|
| ||||
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
TransportLayer transportLayer,
Authenticator authenticator,
KerberosShortNamer kerberosShortNamer,
SslPrincipalMapper sslPrincipalMapper) {
Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
final KafkaPrincipalBuilder builder;
if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
} else if (SSLPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
SSLPrincipalBuilder sslPrincipalBuilder = (SSLPrincipalBuilder) Utils.newInstance(principalBuilderClass);
sslPrincipalBuilder.buildSSLPrincipalBuilder(sslPrincipalMapper);
builder = sslPrincipalBuilder;
} else if (KerberosPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
KerberosPrincipalBuilder kerberosPrincipalBuilder = (KerberosPrincipalBuilder) Utils.newInstance(principalBuilderClass);
kerberosPrincipalBuilder.buildKerberosPrincipalBuilder(kerberosShortNamer);
builder = kerberosPrincipalBuilder;
} else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
} else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
createPrincipalBuilder(principalBuilderClass, configs);
builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer,
oldPrincipalBuilder, kerberosShortNamer);
} else {
throw new InvalidConfigurationException("Type " + principalBuilderClass.getName() + " is not " +
"an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName() + " or " +
KafkaPrincipalBuilder.class.getName());
}
if (builder instanceof Configurable)
((Configurable) builder).configure(configs);
return builder;
} |
Compatibility, Deprecation, and Migration Plan
...