Status

Discussion threadhttps://lists.apache.org/thread/cvwknd5fhohj0wfv8mfwn70jwpjvxrjj
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.17

Motivation

As Flink clusters using Kerberos authenticated services (just talking to HDFS, HBase, Kerberos enabled Kafka or even running on YARN) scale to hundreds of nodes we will inevitably hit a scalability challenge with Kerberos authentication. In Spark Streaming this became a limitation at around 200 node clusters and led to job and cluster outages due to denied authentication by the Kerberos Key Distribution Center (KDC). We can build on the implementation and lessons learned by the Spark community before this becomes an issue.

There are basically 2 ways to provide kerberos authentication credentials to Flink:

  • Using ticket cache (kinit): Each component that uses ticket-granting-ticket (TGT) is independently responsible for renewing it. If TGT is not renewed then authentication is going to fail and workloads are going to stop. Because of this TGT may be good for batch workloads and keytab is a better option for streaming workloads.
  • Keytab: Each component that uses keytab renews ticket-granting-ticket (TGT) from the keytab automatically. Since TGT can be renewed, keytab is suitable for both batch and streaming workloads.

The current delegation token implementation does the following:

  • Flink obtains delegation tokens for
    • HDFS
    • Hbase if the appropriate jar is on classpath

when “security.kerberos.fetch.delegation-token” is set to true (default: true)

  • Flink will inject delegation tokens into all YARN containers. For AM the injection happens here. For executors the injection happens here.
  • YARN renews tokens appropriately when expiry date reached
  • YARN is not able to renew tokens when the max date is reached. Please see the following YARN documentation for further details. Since YARN is not able to renew tokens when max date is reached (which is somewhere between 72 hours and 7 days) Flink workloads are going to stop.
  • Flink never again re-obtains delegation tokens
  • When both delegation tokens and kerberos credentials are provided then delegation tokens will be used no matter if it’s invalid or not and no fallback to kerberos credentials.

Later folks have realized the last bullet point so made some changes to hack around couple of things:

  • The HDFS part has been resolved in #3776 in a way that when keytab is used then delegation token is abandoned but all other use-cases suffer from the mentioned issue. Even if the HDFS use-case is fixed it is going to trigger another serious issue. When the cluster is big enough (we’ve seen 200+ nodes which is definitely such) then all nodes are bombing KDC with login requests and at the end KDC may collapse.
  • There was similar issue with Hbase which has been resolved similar way in #16466.

The main intention is to implement proper delegation token management which includes:

  • All deployment mode support
  • Pluggable token provider support
  • Initial token obtain
  • Token re-obtain when needed (renew is not planned to be added in order to keep complexity as low as possible, please see Spark’s design)
  • Token secure propagation to task managers

Since we’ve struggled a lot in Spark how the delegation token framework should be formed well, the main intention is to adapt the knowledge I’ve gathered there and create a framework similar to that (considering Flink’s actual state and capabilities).

More details can be found in Spark's security readme file and in Support kafka delegation token in Structured Streaming implementation proposal.

Proposed Change

The following changes are planned:

  • Create DelegationTokenManager class which is responsible for obtaining and re-obtaining delegation tokens for all external systems.
  • Create DelegationTokenProvider interface (planned to be public to make it extensive), all classes which implement this interface are loaded by DelegationTokenManager with service loader. All providers are loaded independently so one provider mustn’t have any effect on other providers.
  • Move all Hadoop FS delegation token related logic into HadoopFSDelegationTokenProvider.
  • Move all HBase delegation token related logic into HBaseDelegationTokenProvider.
  • Store all obtained tokens which JobManager can reach
  • Propagate all obtained tokens to TaskManagers. In order to do that, a new functionality named updateDelegationTokens needs to be added to TaskExecutorGateway.

From high level perspective the functionality is going to look like the following:

  • DelegationTokenManager is instantiated in ResourceManager.
  • It loads all DelegationTokenProvider instances with Java java.util.ServiceLoader.
  • It asks each DelegationTokenProvider instance whether a token is required.
  • If a token is required then obtains one or more for each DelegationTokenProvider.
  • Tokens are stored locally and through TaskExecutorGateway they’re going to be sent to all registered TaskManagers.
  • Newly registered TaskManagers are receiving the initial tokens in RegistrationResponse.
  • DelegationTokenManager starts a renewal thread which re-obtains and propagates the newly obtained tokens like it did initially.

Events sent from JobManager to the TaskManagers are not secure by default because security.ssl.internal.enabled default value is false. In order to guarantee the safe delivery of the tokens this value must be set to true.

One tricky detail is important. Even if the YARN client sets delegation tokens at the initial stage AM must re-obtain tokens at startup because otherwise AM restart may fail (we’ve struggled with this in Spark). Please consider the following situation:

  • YARN client obtains delegation tokens and sets them on the container
  • AM starts and uses HDFS token for log aggregation
  • AM dies for whatever reason after HDFS token max lifetime (typically 7 days)
  • AM restarts with the old token
  • Log aggregation fails immediately because of expired token

New or Changed Public Interfaces

New public API:

public interface DelegationTokenProvider {

  String serviceName();

  void init(Configuration configuration);

  boolean isTokenRequired();

  long obtainToken(Credentials credentials);

}


With this new API any custom delegation token provider can be implemented easily.

New config parameters:

  • security.kerberos.relogin.period (default: 60000): The time in ms when keytab login happens automatically in order to always have a valid TGT.
  • security.kerberos.tokens.retry-wait (default: 3600000): The time in ms how long to wait before retrying to obtain new delegation tokens after a failure.
  • security.kerberos.tokens.renewal-ratio (default: 0.75): Ratio of the tokens's expiration time when new credentials should be re-obtained.
  • security.kerberos.delegation.token.${serviceName}.enabled (default: true): Controls whether to obtain delegation token for a specific provider when security is enabled. By default, delegation tokens for all supported providers are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.

Migration Plan and Compatibility

No migration is needed. When the feature is implemented it will obtain/distribute/re-obtain delegation tokens by default if valid kerberos credentials are provided. The configuration default values are defined in such a way that the user doesn't have to touch anything but works out of the box.

The current implementation contains security.kerberos.fetch.delegation-token configuration to turn off token obtain functionality. This can be useful use-cases where delegation token handling is happening outside of Flink (for ex. when workload started from oozie). It’s intended to keep this configuration for backward compatibility reasons.

Test Plan

It’s planned to implement automated integration tests.

Rejected Alternatives

  • Instead of TaskExecutorGateway tokens can be stored in file as it is now but during the years it became clear that it’s error prone including but not limited to the following reasons:
    • File can be unreachable
    • File can be corrupt
    • File must be well protected
    • TaskManager may not read the file in time
    • Etc…