...
JIRA:
Jira | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
- Reduce unnecessary downtime due to task restoration and global application revocation.
- Better auto scaling experience for KStream applications.
- Stretch goal: better workload balance across KStream instances.
Background
Today Streams embed a full fledged Consumer
client, which hard-code a ConsumerCoordinator
inside. Streams then injects a StreamsPartitionAssignor
to its plugable PartitionAssignor
interface and inside the StreamsPartitionAssignor
we also have a TaskAssignor
interface whose default implementation is StickyPartitionAssignor
. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:
Code Block |
---|
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor. |
StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:
Code Block |
---|
[{member1: {List<Topic> topics1, subscription1}}, {member2: {List<Topic> topics2, subscription2}}, ........ ] |
The subscription metadata bytes (version 4) are:
Code Block |
---|
SubscriptionInfo = {Int versionId, Int latestSupportVersionId, clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, HostInfo endPoint} |
And on assignment:
Code Block |
---|
[{member1: {activePartitions1, assignmentInfo1}}, {member2: {activePartitions2, assignmentInfo2}}, ........ ] |
The assignment metadata bytes (version 4) are:
Code Block |
---|
AssignmentInfo = {Int versionId, Int latestSupportedVersionId, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks, Map<HostInfo, Set<TopicPartition>> partitionsByHost, Int errorCode} |
Proposed Changes
Terminology
...