Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Create a custom ClientCallbackHandler and configure the client to use it
    1. This handler should handle the `SaslExtensionsCallback` in his `handle()` method and attach custom extensions


      Code Block
      @Override
      public void handle(Callback[] callbacks) {
      
          for (Callback callback : callbacks) {
              if (callback instanceof OAuthBearerTokenCallback)
                  handleTokenCallback((OAuthBearerTokenCallback) callback);
              else if (callback instanceof SaslExtensionsCallback) {
                  Map<String, String> extensions = new HashMap<>();
                  extensions.put("trace-id", "123");
                  ((SaslExtensionsCallback) callback).extensions(extensions);
              } else
                  throw new UnsupportedCallbackException(callback);
          }
      }
  2. The extensions can be fetched in the server-side through `OAuthBearerSaslServer#getNegotiatedProperty("trace-id")`


  3. A custom principal builder can then make use of the new extension

    1. Code Block
      public class CustomPrincipalBuilder implements KafkaPrincipalBuilder {
          @Override
          public KafkaPrincipal build(AuthenticationContext context) {
              if (context instanceof SaslAuthenticationContext) {
                  SaslServer saslServer = ((SaslAuthenticationContext) context).server();
                  String traceId = saslServer.getNegotiatedPropery("trace-id");
                  return new CustomPrincipal("", saslServer, traceId);
              }
              throw new Exception();
          }
      }
      
      
      


...