Current state: Under discussion
Discussion thread:
Vote thread:
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
When a node takes over as the active controller, it needs to read from zookeeper for the whole cluster state and then constructs full UpdateMetadata requests to be sent to all brokers in the cluster. The full state includes all partitions in the current cluster with the live nodes. Thus the total memory occupied by these UpdateMetadata requests is proportional to the number of partitions in the cluster * the number of brokers. In a large cluster with hundreds of thousands of partitions, and 200 brokers. These UpdateMetadata requests can take up to 30G of ram, and cause the JVM to run out of memory and crash sometimes.
The situation is exacerbated by the fact that when there are N brokers in a large cluster, not only does the controller construct N copies of UpdateMetadataRequestData objects, but also N serialized byte buffers for these objects.
This KIP tries to address the problem by making the UpdateMetadata requests cacheable so that a single copy can be used for all brokers. The end result is that the memory footprint will be proportional only to the number of partitions, regardless of how many brokers are in the cluster.
This KIP bumps the versions of the 3 types of control requests in order to introduce a new field max_broker_epoch and remove the existing field broker_epoch:
LeaderAndIsrRequest V5
|
UpdateMetadataReuqest V7
|
StopReplicaRequest V4
|
Replacing the per broker epoch with cluster-wide maximum broker epoch
Currently all requests sent from the controller to brokers contains a broker epoch field, which is introduced in KIP-380.
The broker epoch is used to differentiate controller requests meant to be sent to a particular epoch of a broker, so that
Currently the controll request sent to different brokers have different broker epoch values. And each broker has a different the broker epoch field.
This field turns out to be the only field that’s different between the UpdateMetadata requests sent due to one metadata change.
In order for all brokers to reuse the same UpdateMetadata request, we need a way to cache the same UpdateMetadata request payloads.
Several options are available:
- either we only cache the other fields, and treat the broker-epoch field differently
- we replace the per-broker broker-epoch field in the control request with some new field so that all brokers can use the same value.
Between these two approaches, we feel the latter is more elegant if the new mechanism works.
Let us examine the case where a broker needs to fence off an obsolete request from the controller in the following figure:
The vertical lines pointing upward represent the time lines for the broker, the zookeeper cluster and the controller respectively.
The process involves message passing between the three entities and the five messages have been labeled with circled numbers from 1 to 5.
Message 1: The broker has started and is sending a registration request to ZK when creating the /brokers/ids/<broker id> znode. On the ZK side, e0 represents the event of receiving this registration request and creating the znode.
Message 2: Three steps happen due to the creation of the new znode
a. ZK notifies the controller that there is a broker change.
b. the controller sends a request to read all the brokers' endpoints and current epochs
c. ZK sends back the response containing the broker's epoch
Message 2 represents the response sent by ZK in step c, and e1 represents the event of sending Message 2.
Message 3: The broker has restarted, and is sending a 2nd registration request to ZK, similar to Message 1. e2 represents the event of receiving this 2nd registration request.
Message 4: ZK replies to the broker with the Zxid that created the broker's znode at /brokers/ids/<broker id>.
Message 5: The controller sends a control request to the broker, using a previously learned broker epoch that has become obsolete. Upon receiving the control request, the broker should have rejected it.
In order for this to work, ZK and the controller must work together to ensure Message 5 carries the info e0 → e1 → e2, where → represents the "happens before" relationship.
The broker epoch field introduced in KIP-380 relies on the monotonically increasing property of zxid in Zookeeper:
Zxid Every change to the ZooKeeper state receives a stamp in the form of a zxid (ZooKeeper Transaction Id). This exposes the total ordering of all changes to ZooKeeper. Each change will have a unique zxid and if zxid1 is smaller than zxid2 then zxid1 happened before zxid2. |
To represent the "happens before" relationship, we are still relying on the monotonically increasing property of zxid. At the time of event e1, the event e2 has not happened yet. So at the time of e1, the requirement e0 → e1 → e2 can be translated to zxid(e0) <= zxid reported at time of e1 < all future transaction events.
For instance, in the response message (Message 2), ZK could have reported its latest zxid, or it could have reported the value max(all brokers' zxids). There is an equivalent approach to ZK reporting the max(all brokers' zxids): ZK still reports the per-broker epochs in its reply in Message 2, while the controller calculates the max(all brokers' zxids) and uses the aggregated max in Message 5.
The analysis above only considered the case of broker-side fencing:
Since the per-broker epoch has worked well for the other two use cases
we propose to keep the current mechanism unchanged by maintaining the per-broker epoch in the controller's memory.
In summary, our proposed new approach works by
By doing that, we’d not only satisfy the safety requirements, but also unify the payload of UpdateMetadata requests sent to different brokers for the same metadata change.
More specifically, our proposed change is
Adopting the change in this KIP requires a two pass deployment:
We considered the following alternatives: