THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- Create a custom ClientCallbackHandler and configure the client to use it
- This handler should handle the `SaslExtensionsCallback` in his `handle()` method and attach custom extensions
Code Block @Override public void handle(Callback[] callbacks) { for (Callback callback : callbacks) { if (callback instanceof OAuthBearerTokenCallback) handleTokenCallback((OAuthBearerTokenCallback) callback); else if (callback instanceof SaslExtensionsCallback) { Map<String, String> extensions = new HashMap<>(); extensions.put("trace-id", "123"); ((SaslExtensionsCallback) callback).extensions(extensions); } else throw new UnsupportedCallbackException(callback); } }
- This handler should handle the `SaslExtensionsCallback` in his `handle()` method and attach custom extensions
The extensions can be fetched in the server-side through `OAuthBearerSaslServer#getNegotiatedProperty("trace-id")`
- A custom principal builder can then make use of the new extension
Code Block public class CustomPrincipalBuilder implements KafkaPrincipalBuilder { @Override public KafkaPrincipal build(AuthenticationContext context) { if (context instanceof SaslAuthenticationContext) { SaslServer saslServer = ((SaslAuthenticationContext) context).server(); String traceId = saslServer.getNegotiatedPropery("trace-id"); return new CustomPrincipal("", saslServer, traceId); } throw new Exception(); } }
...