Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updates to fix image issue, include details about validating issuer, access token reuse, package renaming, and some formatting fixes.

...

When it comes to Kafka, the work done via KIP-255 (OAuth Authentication via SASL/OAUTHBEARER) introduced a framework that allowed for integration with OAuth-compliant providers. With this framework in place, Kafka clients could now pass a JWT access token to a broker when initializing the connection as a means of authentication. This means Kafka can start to leverage these standards for authorization and authentication for which many current and prospective customers are asking.

However, the KIP for the existing implementation -255 quickly notes that:

OAuth 2 is a flexible framework with multiple ways of doing the same thing.

...

The KIP sums up this intentional missing functionality by directing organizations wishing to adopt OAuth within Kafka:

Production use cases will require writing an implementation of AuthenticateCallbackHandler that can handle an instance of OAuthBearerTokenCallback

So the exact implementation is left up to each organization to implement. To fill the gap in the Apache Kafka project, a handful of open source Java implementations have been provided by the community. These implementations can be included in the class path of a OAuth-enabled Kafka client and configured appropriately to achieve the goal.

...

No changes to the public interface are anticipated; it will leverage the existing AuthenticateCallbackHandler API.

Proposed Changes

The OAuth/OIDC work will allow out-of-the-box configuration by any Apache Kafka users to connect to an external identity provider service (e.g. Okta, Auth0, Azure, etc.). The code will implement the standard OAuth clientcredentials grant type.

The proposed change is largely composed of a pair of AuthenticateCallbackHandler implementations: one to login on the client and one to validate on the broker.


Image RemovedImage Added

As show in the above diagram, the login callback is executed on the client and the validate callback is executed on the broker.

...

Here is an example call to retrieve a JWT access token using curl and jq:

CLIENT_ID=abc123
CLIENT_SECRET='S3cr3t!'
URL=https://myidp.example.com/oauth2/default/v1/token
SCOPE=sales-pipeline

base_64_string=$(echo -n "$CLIENT_ID:$CLIENT_SECRET" | base64)

access_token=$(curl \
--silent \
--request POST \
--url $URL \
--header "accept: application/json" \
--header "authorization: Basic $base_64_string" \
--header "cache-control: no-cache" \
--header "content-type: application/x-www-form-urlencoded" \
--data "grant_type=client_credentials&scope=$SCOPE" | jq -r .access_token)

...

The name of the implementation class will be org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerLoginCallbackHandler and it will accept instances of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback and org.apache.kafka.common.security.auth.SaslExtensionsCallback. The fully-qualified class name will be provided to the client's sasl.login.callback.handler.class configuration.

Because the HTTP call made to the OAuth/OIDC provider may time out or transiently fail, there will be a retry mechanism that waits between attempts. The number of attempts that are made (including the first attempt) are configured via the loginAttempts configuration setting. The retry will use an exponential backoff approach; the first attempt to connect to the HTTP endpoint will be made immediately. If that first attempt fails, a second attempt will first wait a configurable number of milliseconds–loginRetryWaitMs–before trying again. If that second attempt fails, the wait time (loginRetryWaitMs) will be doubled before a third attempt. This pattern repeats up to loginAttempts. However, there is also a configurable maximum wait time between attempts–loginRetryMaxWaitMs–such that loginRetryWaitMs <= loginRetryMaxWaitMs, regardless of the number of attempts: min(currentRetryWaitMs, loginRetryMaxWaitMs).

There are several configuration options for this callback handler:

  • tokenEndpointUri: OAuth issuer token endpoint URI

  • clientId: supports OAuth clientcredentials grant type

  • clientSecret: supports OAuth's clientcredentials grant

  • scope: optional scope to reference in the call to the OAuth server

  • scopeClaimName: optional override name of the scope claim; defaults to scope
  • subClaimName: optional override name of the sub claim; defaults to scope
  • loginConnectTimeoutMs: optional value in milliseconds for HTTPS connect timeout; defaults to 10000
  • loginReadTimeoutMs: optional value in milliseconds for HTTPS read timeout; defaults to 10000
  • loginAttempts: optional number of attempts to make to connect to the OAuth/OIDC identity provider; defaults to 3
  • loginRetryWaitMs: optional value in milliseconds for the amount of time to wait between HTTPS call attempts; defaults to 250
  • loginRetryMaxWaitMs: optional value in milliseconds for the maximum wait between HTTPS call attempts (as described above); defaults to 10000

...

In the above example the OAuth provider’s tokenEndpointUri to retrieve an access token has been specified. The values for clientId and clientSecret as provided by the OAuth provider for an “API” or “machine-to-machine” account are required. The optional scope value will allow the inclusion of a scope parameter when requesting the token.

Notice that there are two SASL extension configuration values in this example too: Extension_supportFeatureX and Extension_organizationId. These will be ignored during the OAuth token retrieval step, but will be passed to the broker through the existing SASL extension mechanism from KIP-342.

Once the login has occurred for this client, the returned access token can be reused by other connections from this client.

Validator Callback Handler (Broker)

...

  1. Parse the JWT into separate header, payload, and signature sections

  2. Base-64 decode the header and payload

  3. Extract the necessary claims for validation

  4. Match the key ID (kid) specified in the JWT header to a JWK ID from the JWK Set

  5. Ensure the encoding algorithm (alg from the header) isn't none and matches the expected algorithm for the JWK ID
  6. Encode the header and payload sections of the original encoded JWT access token using the public key from the JWK and ensure it matches the signature section of the JWT

  7. Extract the scopeiatexp, and sub claims as these are needed by the OAuthBearerToken object to be passed to the OAuthBearerValidatorCallback.

  8. Optional claim validation that ensures that issuer, audience, or other claims match a given value

The extensions validation will be executed the same as in org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler today.

A new key ID (kid) could appear in the header of an incoming JWT access token. Code that can retrieve the JWKS from the OAuth provider on demand will be implemented. The common case will be that the key ID is one that has been accessed recently, so it shouldn’t need to reach out to the JWKS endpoint often. The code will need to have a means to expunge old JWKs that are no longer needed.

...

The name of the implementation class will be org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerValidatorCallbackHandler and it will accept instances of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback and org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback. The fully-qualified class name will be provided to the broker's listener.name.<listener name>.oauthbearer.sasl.server.callback.handler.class configuration.

It may be that the names of the claims used by the OAuth provider differ from what is expected. For example, the security principal for which the token is valid is usually contained in the sub (subject) JWT claim. There may be cases where the value of that claim may not be valid or usable, and instead the value will need to be extracted from, for example, the email claim.

There are a handful of configuration options for this callback handler:

  • jwksEndpointUri: OAuth issuer's JWK Set endpoint URI from which to retrieve the set of JWKs managed by the provider; mutually exclusive with jwksFile

  • jwksEndpointRefreshIntervalMs: optional value in milliseconds for how often to refresh the JWKS from the URL pointed to by jwksEndpointUri. Only used when using jwksEndpointUri. Defaults to 3600000 (1 hour)

  • jwksFile: specifies a locally-accessible file name that holds a file-based copy of the JWKS data. This allows the JWKS data to be updated on the file system and refreshed on the broker when the file is updated, thus avoiding any HTTP communication with the OAuth/OIDC provider; mutually exclusive with jwksEndpointUri
  • principalClaimName: name of the scope from which to extract the subject claim from the JWT; defaults to subto sub

  • scopeClaimName: name of the scope from which to extract the scope claim from the JWT; defaults to scopeto scope

  • clockSkew: optional value in seconds for the clock skew between the OAuth/OIDC provider and the broker. Only used when using jwksEndpointUri. Defaults to 30
  • expectedAudience
  • expectedIssuer
  • : The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the expected audiences. The JWT will be inspected for the standard OAuth aud claim and if this configuration option is set, the broker will match the value from JWT's aud claim to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.
  • expectedIssuer: Optional setting for the broker to use to verify that the JWT was created by the expected issuer. The JWT will be inspected for the standard OAuth iss claim and if this configuration option is set, the broker will match the value from JWT's iss claim to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.

Here's an Here's an example of the configuration as a part of a Java properties file:

...

In the above configuration the broker points to the appropriate OAuth provider jwksEndpointUri to retrieve a the set of JWKs for validation. In this example, a non-default value for scopeClaimName has ben been provided because the provider uses scp for the name of the scope claim in the JWT it produces.

...

  1. Providing a JWKS URL. In this mode, the JWKS data will be retrieved from the OAuth provider via the configured URL on broker startup. All then-current keys will be cached on the broker (per the ‘max age’; the jose4j library has a means to keep these-up-to-date when they age out) for incoming requests. If an authentication request is received for a JWT that includes a kid that isn’t yet in the cache, the JWKS endpoint will be queried again on demand. However, we prefer polling via a background thread to hopefully pre-populate the cache with any forthcoming keys before any JWT requests that include them are received.

  2. Providing a JWKS file. On startup, the broker will load the JWKS file from a configured location and will watch the file for updates, allowing for dynamic configuration updates. The means by which the JWKS file is updated is left to the cluster administrator. In the event that an unknown JWT key is encountered, this implementation will simply issue an error and validation will fail.

...

Users can continue to use the OAuthBearerUnsecuredLoginCallbackHandler and/or other AuthenticateCallbackHandler implementations. Users will need to update both clients and brokers in order to use the new functionality.

Rejected Alternatives

Removing OAuthBearerUnsecuredLoginCallbackHandler

Although this change will provide an out-of-the-box implementation of an AuthenticateCallbackHandler that can be used to communicate with OAuth/OIDC, the exist unsecured implementation is still usable for development and testing. Given that its non-secure status is in its name and the documentation, it shouldn’t need to be removed or deprecated at this time.

...

Technically the new implementation could be developed and shipped as a plugin separate from the main Apache Kafka project. Community adoption would be improved by an in-tree solution. Including this inside Apache Kafka doesn’t preclude alternative alternative AuthenticateCallbackHandler implementations from use by clients, if desired.