Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:  

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-8019 or key = KAFKA-7149 or key = KAFKA-6145
serverId5aa69414-a9e9-3523-82ec-879b028fb15bkeyKAFKA-8019

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Reduce unnecessary downtime due to task restoration and global application revocation.
  • Better auto scaling experience for KStream applications.
  • Stretch goal: better workload balance across KStream instances.

Background

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its plugable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

Code Block
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.


StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

Code Block
[{member1: {List<Topic> topics1, subscription1}}, {member2: {List<Topic> topics2, subscription2}}, ........ ]

The subscription metadata bytes (version 4) are:

Code Block
SubscriptionInfo = {Int versionId, Int latestSupportVersionId, clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, HostInfo endPoint}


And on assignment: 

Code Block
[{member1: {activePartitions1, assignmentInfo1}}, {member2: {activePartitions2, assignmentInfo2}}, ........ ]

The assignment metadata bytes (version 4) are:

Code Block
AssignmentInfo = {Int versionId, Int latestSupportedVersionId, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks, Map<HostInfo, Set<TopicPartition>> partitionsByHost, Int errorCode}

Proposed Changes

Terminology

...