Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-13202
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The convergence on the use of OAuth/OIDC for authorization and authentication of Internet-based services is well underway. Use of a standard set of technologies allows organizations to select from among a range of OAuth/OIDC-compatible providers instead of defining, developing, and managing identity, security, and policy infrastructure on their own. Organizations can communicate with these providers using standard protocols (defined in RFCs) using code written against mature open source libraries in popular programming languages. There is already a rich (and growing!) ecosystem of tools to integrate with the providers. Although it is not a trivial task of selecting a provider and implementing an OAuth-compliant suite of applications and services in an organization, these standards provide flexibility within that organization.
When it comes to Kafka, the work done via KIP-255 (OAuth Authentication via SASL/OAUTHBEARER) introduced a framework that allowed for integration with OAuth-compliant providers. With this framework in place, Kafka clients could now pass a JWT access token to a broker when initializing the connection as a means of authentication. This means Kafka can start to leverage these standards for authorization and authentication for which many current and prospective customers are asking.
However, the KIP for the existing implementation quickly notes that:
OAuth 2 is a flexible framework with multiple ways of doing the same thing.
Because of the high degree of flexibility required by different organizations as well as differences in implementations by various OAuth providers, “supporting OAuth” isn't as straightforward as one might hope. While flexibility is one of OAuth's strengths it also presents one of its greatest challenges for adopters. Because of OAuth's flexibility, the organizational requirements, and the minor differences in provider implementation, the exact means and logic by which a JWT access token is retrieved and validated may vary on a case-by-case basis. In fact, a generalized, out-of-the-box OAuth implementation was specifically not included in KIP-255 because:
Anything beyond simple unsecured use cases requires significant functionality that is available via multiple open source libraries, and there is no need to duplicate that functionality here. It is also not desirable for the Kafka project to define a specific open source JWT/JWS/JWE library dependency; better to allow installations to use the library that they feel most comfortable with.
The implementation of KIP-255 provided a concrete example implementation that allowed clients to provide an unsecured JWT access token to the broker when initializing the connection only for–as the KIP states–”development scenarios.”
The KIP sums up this intentional missing functionality by directing organizations wishing to adopt OAuth within Kafka:
Production use cases will require writing an implementation of
AuthenticateCallbackHandler
that can handle an instance ofOAuthBearerTokenCallback
So the exact implementation is left up to each organization to implement. To fill the gap in the Apache Kafka project, a handful of open source Java implementations have been provided by the community. These implementations can be included in the class path of a OAuth-enabled Kafka client and configured appropriately to achieve the goal.
This project is to provide a concrete implementation of the interfaces defined in KIP-255 to allow Kafka to connect to an Open ID identity provider for authentication and token retrieval. While KIP-255 provides an unsecured JWT example for development, this will fill in the gap and provide a production-grade implementation.
Public Interfaces
No changes to the public interface are anticipated; it will leverage the existing AuthenticateCallbackHandler
API.
Proposed Changes
The OAuth/OIDC work will allow out-of-the-box configuration by any Apache Kafka users to connect to an external identity provider service (e.g. Okta, Auth0, Azure, etc.). The code will implement the standard OAuth clientcredentials
grant type.
The proposed change is largely composed of a pair of AuthenticateCallbackHandler
implementations: one to login on the client and one to validate on the broker.
As seen in this diagram, the login callback is executed on the client and the validate callback is executed on the broker.
Login Callback Handler (Client)
The login callback handler is invoked as part of the standard Kafka client login process. It is in this callback handler that the necessary HTTPS calls will be made to the OAuth provider using the provided configuration.
Token Retrieval Logic
Here is an example call to retrieve a JWT access token using curl
and jq
:
CLIENT_ID=abc123
CLIENT_SECRET='S3cr3t!'
URL=https://myidp.example.com/oauth2/default/v1/token
SCOPE=sales-pipeline
base_64_string=$(echo -n "$CLIENT_ID:$CLIENT_SECRET" | base64)
access_token=$(curl \
--silent \
--request POST \
--url $URL \
--header "accept: application/json" \
--header "authorization: Basic $base_64_string" \
--header "cache-control: no-cache" \
--header "content-type: application/x-www-form-urlencoded" \
--data "grant_type=client_credentials&scope=$SCOPE" | jq -r .access_token)
The OIDC specification requires the access token to be in the JWT format. The client will perform parsing and some basic structural validation of the JWT access token contents.
Client Configuration
The name of the implementation class will be org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerLoginCallbackHandler
and it will accept instances of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
and org.apache.kafka.common.security.auth.SaslExtensionsCallback
. The fully-qualified class name will be provided to the client's sasl.login.callback.handler.class
configuration.
There are several configuration options for this callback handler:
tokenEndpointUri
: OAuth issuer token endpoint URIclientId
: supports OAuthclientcredentials
grant typeclientSecret
: supports OAuth'sclientcredentials
grantscope
: optional scope to reference in the call to the OAuth serverscopeClaimName
: optional override name of the scope claim; defaults toscope
subClaimName
: optional override name of the sub claim; defaults toscope
loginConnectTimeoutMs
: optional value in milliseconds for HTTPS connect timeout; defaults to10000
loginReadTimeoutMs
: optional value in milliseconds for HTTPS read timeout; defaults to10000
loginAttempts
: optional number of attempts to make to connect to the OAuth/OIDC identity provider; defaults to3
loginRetryMaxWaitMs
: optional value in milliseconds to wait between HTTPS call attempts; wait uses exponential backoff up to the number defined byloginAttempts
but not more thanloginRetryWaitMs
; defaults to250
loginRetryWaitMs
: optional value in milliseconds for the maximum amount of time to wait before HTTPS call attempts; defaults to10000
Here's an example of the configuration as a part of a Java properties file:
sasl.login.callback.handler.class=...OAuthBearerLoginCallbackHandler
sasl.jaas.config=...OAuthBearerLoginModule required \
tokenEndpointUri="https://myidp.example.com/oauth2/default/v1/token" \
clientId="abc123" \
clientSecret="S3cr3t!" \
scope="sales-pipeline" \
Extension_supportFeatureX="true" \
Extension_organizationId="sales-emea" ;
In the above example the OAuth provider’s tokenEndpointUri
to retrieve an access token has been specified. The values for clientId
and clientSecret
as provided by the OAuth provider for an “API” or “machine-to-machine” account are required. The optional scope
value will allow the inclusion of a scope
parameter when requesting the token.
Notice that there are two SASL extension configuration values in this example too: Extension_supportFeatureX
and Extension_organizationId
. These will be ignored during the OAuth token retrieval step, but will be passed to the broker through the existing SASL extension mechanism from KIP-342.
Validator Callback Handler (Broker)
The validation callback handler is invoked as part of the SASL Kafka server authentication process when a JWT is received. This callback handler will parse and validate the JWT using its contents along with provided configuration.
Validation Logic
The basic overview of the JWT validation that will be performed is:
Parse the JWT into separate header, payload, and signature sections
Base-64 decode the header and payload
Extract the necessary claims for validation
Ensure the encoding algorithm isn't
none
and matches what the expected algorithm expectingMatch the key ID (
kid
) specified in the JWT header to a JWK ID from the JWK SetEncode the header and payload sections of the original encoded JWT access token using the public key from the JWK and ensure it matches the signature section of the JWT
Extract the scope,
iat
,exp
, andsub
claims as these are needed by theOAuthBearerToken
object to be passed to theOAuthBearerValidatorCallback
.Optional claim validation that ensures that issuer, audience, or other claims match a given value
The extensions validation will be executed the same as in org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler
today.
A new key ID (kid
) could appear in the header of an incoming JWT access token. Code that can retrieve the JWKS from the OAuth provider on demand will be implemented. The common case will be that the key ID is one that has been accessed recently, so it shouldn’t need to reach out to the JWKS endpoint often. The code will need to have a means to expunge old JWKs that are no longer needed.
Broker Configuration
The name of the implementation class will be org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerValidatorCallbackHandler
and it will accept instances of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback
and org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback
. The fully-qualified class name will be provided to the broker's listener.name.<listener name>.oauthbearer.sasl.server.callback.handler.class
configuration.
It may be that the names of the claims used by the OAuth provider differ from what is expected. For example, the security principal for which the token is valid is usually contained in the sub
(subject) JWT claim. There may be cases where the value of that claim may not be valid or usable, and instead the value will need to be extracted from, for example, the email
claim.
There are a handful of configuration options for this callback handler:
jwksEndpointUri
: OAuth issuer's JWK Set endpoint URI from which to retrieve the set of JWKs managed by the provider; mutually exclusive withjwksFile
jwksEndpointRefreshIntervalMs
: optional value in milliseconds for how often to refresh the JWKS from the URL pointed to byjwksEndpointUri
. Only used when usingjwksEndpointUri
. Defaults to3600000
(1 hour)jwksFile
: specifies a locally-accessible file name that holds a file-based copy of the JWKS data. This allows the JWKS data to be updated on the file system and refreshed on the broker when the file is updated, thus avoiding any HTTP communication with the OAuth/OIDC provider; mutually exclusive withjwksEndpointUri
principalClaimName
: name of the scope from which to extract the subject claim from the JWT; defaults tosub
scopeClaimName
: name of the scope from which to extract the scope claim from the JWT; defaults toscope
- clockSkew: optional value in seconds for the clock skew between the OAuth/OIDC provider and the broker. Only used when using
jwksEndpointUri
. Defaults to30
- expectedAudience
- expectedIssuer
Here's an example of the configuration as a part of a Java properties file:
sasl.login.callback.handler.class=o.a.k...OAuthBearerValidatorCallbackHandler
sasl.jaas.config=o.a.k...OAuthBearerLoginModule required \
jwksEndpointUri="https://myidp.example.com/oauth2/default/v1/keys" \
scopeClaimName="scp" ;
In the above configuration the broker points to the appropriate OAuth provider jwksEndpointUri
to retrieve a the set of JWKs for validation. In this example, a non-default value for scopeClaimName
has ben provided because the provider uses scp
for the name of the scope claim in the JWT it produces.
JWKS Management Logic
The JWKS will be kept up-to-date in two main ways:
Providing a JWKS URL. In this mode, the JWKS data will be retrieved from the OAuth provider via the configured URL on broker startup. All then-current keys will be cached on the broker (per the ‘max age’; the jose4j library has a means to keep these-up-to-date when they age out) for incoming requests. If an authentication request is received for a JWT that includes a
kid
that isn’t yet in the cache, the JWKS endpoint will be queried again on demand. However, we prefer polling via a background thread to hopefully pre-populate the cache with any forthcoming keys before any JWT requests that include them are received.Providing a JWKS file. On startup, the broker will load the JWKS file from a configured location and will watch the file for updates, allowing for dynamic configuration updates. The means by which the JWKS file is updated is left to the cluster administrator. In the event that an unknown JWT key is encountered, this implementation will simply issue an error and validation will fail.
Broker-to-broker Support
The use of OAuth credentials for broker-to-broker communication will continue to be supported. As with the existing implementation, users can specify the protocols and implementations to use for broker-based communication.
Compatibility, Deprecation, and Migration Plan
Users can continue to use the OAuthBearerUnsecuredLoginCallbackHandler
and/or other AuthenticateCallbackHandler
implementations. Users will need to update both clients and brokers in order to use the new functionality.
Rejected Alternatives
Removing OAuthBearerUnsecuredLoginCallbackHandler
Although this change will provide an out-of-the-box implementation of an AuthenticateCallbackHandler
that can be used to communicate with OAuth/OIDC, the exist unsecured implementation is still usable for development and testing. Given that its non-secure status is in its name and the documentation, it shouldn’t need to be removed or deprecated at this time.
Standalone Plugin
Technically the new implementation could be developed and shipped as a plugin separate from the main Apache Kafka project. Community adoption would be improved by an in-tree solution. Including this inside Apache Kafka doesn’t preclude alternative AuthenticateCallbackHandler
implementations from use by clients, if desired.