Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionApproved for 3.1.0

Discussion thread: here

JIRA: KAFKA-13202

...

Because the HTTP call made to the OAuth/OIDC provider may time out or transiently fail, there will be a retry mechanism that waits between waits between attempts. The number of attempts that are made (including the first attempt) are configured via the loginAttempts configuration setting. The retry will use an variable as it uses an exponential backoff approach; the first attempt to connect to the HTTP endpoint will be made immediately. If that first attempt fails, a second attempt will first wait a configurable number of milliseconds–loginRetryWaitMs–sasl.login.retry.backoff.ms–before trying again. If that second attempt fails, the wait time (loginRetryWaitMs) will be doubled before a third attempt. This pattern repeats as needed up to loginAttempts. However, there is also a configurable the maximum wait time between attempts–loginRetryMaxWaitMs–such that loginRetryWaitMs <= loginRetryMaxWaitMs, regardless of the number of attempts: min(currentRetryWaitMs, loginRetryMaxWaitMs)of sasl.login.retry.backoff.max.ms.

There are several configuration options for this callback handler:

...

. Sensitive configuration options and SASL extensions appear under the JAAS configuration (sasl.jaas.config) while the rest are top-level configuration.

The JAAS configuration options are:

  • clientId: supports OAuth clientcredentials grant type

  • clientSecret: supports OAuth's clientcredentials grant

  • scope: optional scope to reference in the call to the OAuth server

The top-level configuration options for the client login callback handler are:

  • sasl.oauthbearer.token.endpoint.url: OAuth issuer token endpoint URL
  • sasl.oauthbearer.scope.claim.name: optional scopeClaimName: optional override name of the scope claim; defaults to scope
  • subClaimNamesasl.oauthbearer.sub.claim.name: optional override name of the sub claim; defaults to sub
  • loginConnectTimeoutMssasl.login.connect.timeout.ms: optional value in milliseconds for HTTPS connect timeout; defaults to 10000
  • loginReadTimeoutMs: optional value sasl.login.read.timeout.ms: optional value in milliseconds for HTTPS read timeout; defaults to 10000
  • loginAttempts: optional number of attempts to make to connect to the OAuth/OIDC identity provider; defaults to 3
  • loginRetryWaitMssasl.login.retry.backoff.ms: optional value in milliseconds for the amount of time to wait between HTTPS call attempts; defaults to 250 100
  • sasl.login.retry.backoff.max.msloginRetryMaxWaitMs: optional value in milliseconds for the maximum wait between for HTTPS call attempts (as described above); defaults to 10000

...

sasl.login.callback.handler.class=...OAuthBearerLoginCallbackHandler
sasl.jaaslogin.config=connect...OAuthBearerLoginModule required \
tokenEndpointUri="timeout.ms=15000
sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token
"
sasl.jaas.config=...OAuthBearerLoginModule required \
clientId="abc123" \
clientSecret="S3cr3t!" \
scope="sales-pipeline" \
Extension_supportFeatureX="true" \
Extensionextension_organizationId="sales-emea" ;


In the above example, the OAuth provider’s tokenEndpointUri to retrieve an access token has been specified. The values for clientId and clientSecret as provider’s sasl.oauthbearer.token.endpoint.url has been specified as well as an override of the default for sasl.login.connect.timeout.ms. The values for clientId and clientSecret as provided by the OAuth provider for an “API” or “machine-to-machine” account are required in the JAAS configuration. The optional optional scope value will allow the inclusion of a scope parameter when requesting the token.

Notice that there are two is also a SASL extension configuration values in this example too: Extension_supportFeatureX and Extension_ extension_organizationId. These Extensions will be ignored during the OAuth token retrieval step, but will be passed to the broker through the existing SASL extension mechanism from KIP-342.

Once the login has occurred for this client, the returned access token can be reused by other connections from this client. While these additional connections will not issue the token retrieval HTTP call on the client, the broker will still validate the token repeatedly once for each time it is receivedsent by each client connection.

Per KIP-368, the OAuth token re-authentication logic from the existing implementation is automatically "inherited" by this implementation, so no additional work is needed to support that feature.

...

The extensions validation will be executed the same as in org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler today.

A new key It's possible that a key ID (kid) could appear in the header of an incoming JWT access token . Code that can retrieve the JWKS from the OAuth provider on demand will be implemented. The common case will be that the key ID is one that has been accessed recently, so it shouldn’t need to reach out to the JWKS endpoint often. The code will need to have a means to expunge old JWKs that are no longer needed.that does not appear in the JWKS cached by a broker. 

Broker Configuration

The name of the implementation class will be org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler and it will accept instances of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback and org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback. The fully-qualified class name will be provided to the broker's listener.name.<listener name>.oauthbearer.sasl.server.callback.handler.class configuration.

It may be that the names of the claims used by the OAuth provider differ from what is expected. For example, the security principal for which the token is valid is usually contained in the sub (subject) JWT claim. There may be cases where the value of that claim may not be valid or usable, and instead the value will need to be extracted from, for example, the email claim.

There are a handful of configuration several configuration options for this callback handler:. Since there are no sensitive configuration options, they are all in the top-level configuration. The configuration can be top-level or scoped to a specific listener with the listener prefix listener.name.<listener name>.oauthbearer. Here are the configuration options:

  • sasl.oauthbearer.jwks.endpoint.urljwksEndpointUri: OAuth issuer's JWK Set endpoint URI from URL from which to retrieve the set of JWKs managed managed by the provider; mutually exclusive with jwksFile

  • jwksEndpointRefreshIntervalMs: optional value in milliseconds for how often to refresh the JWKS from the URL pointed to by jwksEndpointUri. Only used when using jwksEndpointUri. Defaults to 3600000 (1 hour)

  • jwksFile: specifies a locally-accessible file name that holds a

    this can be a file://-based URL that points to a broker file system-accessible file-based copy of the JWKS data. This allows the JWKS data to be updated on the file system and refreshed on the broker when the file is updated, thus avoiding any HTTP(S) communication with the OAuth/OIDC provider

    ; mutually exclusive with jwksEndpointUri
  • subClaimName: name of the scope from which to extract the subject claim from the JWT; defaults to sub

  • scopeClaimName: name of the scope from which to extract the scope claim from the JWT; defaults to scope

  • clockSkew: optional value in seconds for the clock skew between the OAuth/OIDC provider and the broker. Only used when using jwksEndpointUri. Defaults to 30
  • expectedAudience: The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the expected audiences. The JWT will be inspected for the standard OAuth aud claim and if this configuration option is set, the broker will match the value from JWT's aud claim to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.
  • expectedIssuer: Optional setting for the broker to use to verify that the JWT was created by the expected issuer. The JWT will be inspected for the standard OAuth iss claim and if this configuration option is set, the broker will match the value from JWT's iss claim to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.

Here's an example of the configuration as a part of a Java properties file:

...

  • sasl.oauthbearer.jwks.endpoint.refresh.interval.ms: optional value in milliseconds for how often to refresh the JWKS from the URL pointed to by sasl.oauthbearer.jwks.endpoint.url. Only used when using an HTTP(S)-based URL for sasl.oauthbearer.jwks.endpoint.url. Defaults to 3600000 (1 hour)

  • sasl.oauthbearer.jwks.endpoint.retry.backoff.ms: optional value in milliseconds for the amount of time to wait between HTTPS call attempts to retrieve the JWKS; only used when using an HTTP(S)-based URL for sasl.oauthbearer.jwks.endpoint.url ; defaults to 100
  • sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms: optional value in milliseconds for the maximum wait for HTTPS call attempts to retrieve the JWKS; only used when using an HTTP(S)-based URL for sasl.oauthbearer.jwks.endpoint.url ; defaults to 10000
  • sasl.oauthbearer.sub.claim.name: name of the scope from which to extract the subject claim from the JWT; defaults to sub

  • sasl.oauthbearer.scope.claim.name: name of the scope from which to extract the scope claim from the JWT; defaults to scope

  • sasl.oauthbearer.clock.skew.seconds: optional value in seconds for the clock skew between the OAuth/OIDC provider and the broker. Only used when using an HTTP(S)-based URL for sasl.oauthbearer.jwks.endpoint.url. Defaults to 30
  • sasl.oauthbearer.expected.audience: The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the expected audiences. The JWT will be inspected for the standard OAuth aud claim and if this configuration option is set, the broker will match the value from JWT's aud claim to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.
  • sasl.oauthbearer.expected.issuer: Optional setting for the broker to use to verify that the JWT was created by the expected issuer. The JWT will be inspected for the standard OAuth iss claim and if this configuration option is set, the broker will match the value from JWT's iss claim to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.

Here's an example of the configuration as a part of a Java properties file:


listener.name.<listener name>.oauthbearer.sasl.server.callback.handler.class=...OAuthBearerValidatorCallbackHandler
listener.name.<listener name>.oauthbearer.sasl.jaas.config=...OAuthBearerLoginModule required;
sasl.oauthbearer.jwks.endpoint.url=https://myidp.example.com/oauth2/default/v1/keys
sasl.oauthbearer.scope.claim.name=scp


In the above configuration the broker points to the appropriate OAuth provider sasl.oauthbearer.jwks.endpoint.url to retrieve a the set of JWKs for validation. In this example, a non-default value for sasl.oauthbearer.scope.claim.name has been provided because the provider uses scp for the name of the scope claim in the JWT it produces.

JWKS Management Logic

The JSON Web Key Set (JWKS) is a JSON document provided by the OAuth/OIDC provider that lists the keys used to sign the JWTs it issues.

Here is a sample JWKS JSON document:

{
  "keys": [
    {
      "kty": "RSA",
      "alg": "RS256",
    "kid": "abc123",
      "use": "sig",
      "e": "AQAB",
    "n": "..."
    },
    {
      "kty": "RSA",
      "alg": "RS256",
    "kid": "def456",
      "use": "sig",
      "e": "AQAB",
    "n": "..."
    }
  ]
}

Without going into too much detail, the array of keys enumerates the key data that the provider is using to sign the JWT signature. The key ID (kid) is referenced by the JWT's header in order to match up the JWT's signing key with the key in the JWKS. During the validation step, the jose4j OAuth library will use the contents of the appropriate key in the JWKS to validate the signature.

Given that the JWKS is referenced by the JWT, the JWKS must be made available by the OAuth/OIDC provider so that a JWT can be validated.

The JWKS will be kept up-to-date in two main ways:

  1. Providing a JWKS URL. In this mode, the JWKS data will be retrieved from the OAuth provider via the configured URL on broker startup. All then-current keys will be cached on the broker (per the ‘max age’; the jose4j library has a means to keep these-up-to-date when they age out) for incoming requests. If an authentication request is received for a JWT that includes a kid that isn’t yet in the cache, the JWKS endpoint will be queried again on demand. However, we prefer polling via a background thread to hopefully pre-populate the cache with any forthcoming keys before any JWT requests that include them are received.

  2. Providing a JWKS file. On startup, the broker will load the JWKS file from the configured location. Any updates to the file will require a restart of the broker. The means by which the JWKS file is updated is left to the cluster administrator. In the event that an unknown JWT key is encountered, this implementation will simply issue an error and validation will fail.


Warning

If the the URL or file that is specified cannot be read, the broker will fail to start up. In the case of an HTTP(S)-based URL, the configured sasl.login.retry.backoff.ms and sasl.login.retry.backoff.max.ms values will be used to make attempts to connect to the remote OAuth provider.

It is also important that the JWKS is retrieved before the broker's ports are opened. Otherwise clients that connect to the broker before the JWKS is retrieved will experience spurious authentication failures (e.g. during broker restarts).


It is expected that an OAuth provider will publish a new JWKS well in advance of issuing any JWTs that contain those keys. However, in the case that a broker receives a JWT with a key ID that it doesn't have stored in its cached JWKS, the broker will need to remediate the issue:

  1. If the JWKS URL is HTTP(S)-based and if the broker hasn't already attempted to resolve the key ID, enqueue a background thread to reload the JWKS from the HTTP(S) endpoint. The broker will keep track of key ID resolution failures so it doesn't repeatedly attempt to do so. If the JWKS URL is file://-based, no remediation processing will occur.
  2. Send an authentication failure to the client. Since it is unknown at this point in processing of the key ID is valid-but-missing or if the key ID is just invalid, the broker will always issue an authentication error. Between the authentication failure delay mechanism and any client retry, there may be sufficient time for the broker to update the JWKS.

The common case will be that the key ID is one that has been published and accessed recently. The broker shouldn’t need to reach out to the JWKS endpoint in the on-demand fashion described above normally.


Warning

The broker will not perform any loading of the JWKS in the network thread. This needs to be performed either prior to opening the ports during startup (as described above) or performed in a background thread. 


The code will need to have a means to expunge old JWKS that are no longer needed. If there is a tangible benefit, perhaps the broker can keep track of recently removed/aged out key IDs so as to provide a more helpful message to the user

In the above configuration the broker points to the appropriate OAuth provider jwksEndpointUri to retrieve a the set of JWKs for validation. In this example, a non-default value for scopeClaimName has been provided because the provider uses scp for the name of the scope claim in the JWT it produces.

JWKS Management Logic

The JWKS will be kept up-to-date in two main ways:

...

Providing a JWKS URL. In this mode, the JWKS data will be retrieved from the OAuth provider via the configured URL on broker startup. All then-current keys will be cached on the broker (per the ‘max age’; the jose4j library has a means to keep these-up-to-date when they age out) for incoming requests. If an authentication request is received for a JWT that includes a kid that isn’t yet in the cache, the JWKS endpoint will be queried again on demand. However, we prefer polling via a background thread to hopefully pre-populate the cache with any forthcoming keys before any JWT requests that include them are received.

...

.

Broker-to-broker Support

The use of OAuth credentials for broker-to-broker communication will continue to be supported. As with the existing implementation, users can specify the protocols and implementations to use for broker-based communicationcommunication. This would require providing the appropriate configuration for both client login and broker validation.

Testing

In addition to unit and integration tests, there will be a standalone tool in the tools directory/module : named org.apache.kafka.tools.OAuthCompatibilityTestOAuthCompatibilityTool. This test can be run via the existing bin/kafka-run-class.sh script thusly: script thusly:


./bin/kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTool \
--client-id foo \
--client-secret bar \
--token-endpoint-url https://example.com/oauth2/v1/token \
--jwks-endpoint-url https://example.com/oauth2/v1/keysKAFKA_OPTS=""
export KAFKA_OPTS="$KAFKA_OPTS -DclientId=$client_id"
export KAFKA_OPTS="$KAFKA_OPTS -DclientSecret=$client_secret"
export KAFKA_OPTS="$KAFKA_OPTS -Dscope=api://$client_id/.default"
export KAFKA_OPTS="$KAFKA_OPTS -DtokenEndpointUri=$token_endpoint_uri"
export KAFKA_OPTS="$KAFKA_OPTS -DjwksEndpointUri=$jwks_uri"
export KAFKA_OPTS="$KAFKA_OPTS -DclockSkew=30"
export KAFKA_OPTS="$KAFKA_OPTS -DexpectedAudience=$expected_audience"
./bin/kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTest


As seen in the example invocation above, the various Kafka configuration options for the client and server broker properties are passed in via the existing KAFKA_OPTS environment variablecommand line options. The test will connect to remote systems as needed to authenticate, retrieve tokens, retrieve JWKS, and perform validation (nothing is "mocked"). The command line options can be listed using the standard --help command line option. 

This tool serves three basic purposes:

...

Additionally debugging can be selectively enabled using the standard tools-log4j.properties file if errors are detected by the tool. All configuration options supported by the client and server are also supported by the tool; it attempts to be as faithful as possible to the runtime logic to minimize discrepancies between the tool and the client and serverbroker.

Compatibility, Deprecation, and Migration Plan

...