Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21232

...

Release1.17


Motivation

As Flink clusters using Kerberos authenticated services (just talking to HDFS, HBase, Kerberos enabled Kafka or even running on YARN) scale to hundreds of nodes we will inevitably hit a scalability challenge with Kerberos authentication. In Spark Streaming this became a limitation at around 200 node clusters and led to job and cluster outages due to denied authentication by the Kerberos Key Distribution Center (KDC). We can build on the implementation and lessons learned by the Spark community before this becomes an issue.

...

The main intention is to implement proper delegation token management which includes:

  • YARN + K8S support (YARN comes first which is followed by K8S)All deployment mode support
  • Pluggable token provider support
  • Initial token obtain
  • Token re-obtain when needed (renew is not planned to be added in order to keep complexity as low as possible, please see Spark’s design)
  • Token secure propagation to task managers

...

More details can be found in Spark's security readme file and in Support kafka delegation token in Structured Streaming implementation proposal.

Proposed Proposed Change

The following changes are planned:

  • Create DelegationTokenManager class which is responsible for obtaining and re-obtaining delegation tokens for all external systems.
  • Create DelegationTokenProvider interface (planned to be public to make it extensive), all classes which implement this interface are loaded by DelegationTokenManager with service loader. All providers are loaded independently so one provider mustn’t have any effect on other providers.
  • Move all Hadoop FS delegation token related logic into HadoopFSDelegationTokenProvider.
  • Move all HBase delegation token related logic into HBaseDelegationTokenProvider.
  • Store all obtained tokens which JobManager can reach
  • Propagate all obtained tokens to TaskManagers. In order to do that, a new event functionality named updateDelegationTokens needs to be created with the name DelegationTokensUpdatedadded toTaskExecutorGateway.

From high level perspective the functionality is going to look like the following:

  • DelegationTokenManager is instantiated in ResourceManager.
  • It loads all DelegationTokenProvider instances with Java java.util.ServiceLoader.
  • It asks each DelegationTokenProvider instance whether a token is required.
  • If a token is required then obtains one or more for each DelegationTokenProvider.
  • Tokens are stored locally and DelegationTokensUpdated is through TaskExecutorGateway they’re going to be sent to all registered TaskManagers.
  • Newly registered TaskManagers are receiving the initial tokens in RegistrationResponse.
  • DelegationTokenManager starts a renewal thread which re-obtains and propagates the newly obtained tokens like it did initially.

Events sent from JobManager to the TaskManagers are not secure by default because security.ssl.internal.enabled default value is false. In order to guarantee the safe delivery of the tokens this value must be set to true.

One tricky thing detail is important. Even if the YARN client sets delegation tokens at the initial stage AM must re-obtain tokens at startup because otherwise AM restart may fail (we’ve struggled with this in Spark). Please consider the following situation:

...

Code Block
languagejava
public interface DelegationTokenProvider {

  String name(serviceName();

  void init(Configuration configuration);

  boolean isTokenRequired();

  long obtainToken(Credentials credentials: Credentials);

}


With this new API any custom delegation token provider can be implemented easily.

...

  • security.kerberos.relogin.period (default: 60000): The time in ms when keytab login happens automatically in order to always have a valid TGT.
  • security.kerberos.tokens.retry-wait (default: 3600000): The time in ms how long to wait before retrying to obtain new delegation tokens after a failure.
  • security.kerberos.tokens.renewal-ratio (default: 0.75): Ratio of the tokens's expiration time when new credentials should be re-obtained.
  • security.kerberos.tokensdelegation.token.${providerserviceName}.enabled (default: true): Controls whether to obtain delegation token for a specific provider when security is enabled. By default, delegation tokens for all supported providers are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.

...

Rejected Alternatives

  • Instead of DelegationTokensUpdated event TaskExecutorGateway tokens can be stored in file as it is now but during the years it became clear that it’s error prone including but not limited to the following reasons:
    • File can be unreachable
    • File can be corrupt
    • File must be well protected
    • TaskManager may not read the file in time
    • Etc…