Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Apache Curator is used in order to perform interactions with ZooKeeper in HA mode for Flink (link to the code). Current set up misses several configurations options, which could be useful in certain Flink deployments.

We want to ensure that related available options in Apache Curator are configurable for Flink users. Thus Flink users can have all mechanisms to allow Flink interacts with ZooKeeper. Given list of features could be critical for Flink adoption with ZooKeeper in cloud environment. For example, currently is not possible to use ZooKeeper with authorization mechanism along with Flink.

Public Interfaces

There are some new configurations should be exposed for high-availability.zookeeper configuration.

Proposed option

Confgiration typeConfiguration type

Motivation

high-availability.zookeeper.client.authorization


Map<String, String>ConfigOptions#mapType()

Ability to fully utilise given set up of ZooKeeper for environment.

For example: In certain cases ZooKeeper requires additional Authentication Authorization information. For example list of valid names for ensemble in order to prevent the accidental connecting to a wrong ensemble.

high-availability.zookeeper.client.maxCloseWaitMsmax-close-wait-ms


ConfigOptions#intType()Integer

Ability that would enable the user to adjust to different network speeds.

high-availability.zookeeper.client.simulatedSessionExpirationPercentsimulated-session-expiration-percent

ConfigOptions#intType()Integer

Additional checking for Session expiration above what is provided by ZooKeeper.

...

An issue arises due to a type mismatch between the Flink configuration parameter high-availability.zookeeper.client.authorization and the corresponding Curator method call. The Curator method anticipates an array of AuthInfo (see method javadoc with signature authorization#List<AuthInfo>) while the Flink configuration for ConfigOptions#mapType() provides a different java type - Map<String, String>. To address resolve this, we can handle suggest the conversion between String and byte[] by executing the getBytes() methodfollowing conversion: Each entry of type Map.Entry<String, String> will be transformed into an AuthInfo object with the constructor AuthInfo(String, byte[]). The field entry.key()  will serve as the String scheme  value, while the field entry.getValue()  will be initially converted to a byte[]  using the String#getBytes()  method. Subsequently, this byte array will be utilized as byte[] auth  parameter during the creation of the AuthInfo.

Compatibility, Deprecation, and Migration Plan

...