Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: In this KIP, we use the terminology "processor" to indicate a combination of consumer, local state store and user-specified processing logic.

Goals

 

In addition to ensuring correctness, we also aim to achieve the following goals to optimize the efficiency of this KIP.

...

2) When we expand processors of a given stateful processing job, some key will be assigned to the newly-added processors. And it would be necessary to copy the state for the corresponding keys to these processors before these processors can consume messages and generate output. However, we should not have to copy state between existing processors.

3) In order to keep processing new messages in near real-time when we are copying state to the newly-added processor, the newly-added processor should only start to consume "new" messages and generate output "after" its local state is caught up. Before its local state is caught up, the existing processor should continue to process new messages and generate output.

4) In order to avoid disrupting existing use-case and keep backward compatibility, the 3-tuple (topic, partition, offset) should uniquely identify the same message before and after the expansion. And a given message should be consumed exactly once by bootstrapping consumers after the partition expansion. This goal suggests that we should not delete existing message or copy any message in the topic.

...

3) The processing job remembers the initial number of processors in the job. This can be supported in core Kafka by remembering the initial number of consumers in the consumer group.

 4) The partition -> processor mapping is determined using linear hashing. This is where we need the initial number of processors in the job.

It can be proven that the above requirements would keep the same key -> processor mapping regardless of how we change the partition number of the input topics, as long as we don't change the number of processors of the processing job.

 

We skip the prove proof here. Reader can thinking about how the key -> processor mapping changes in the following example:

...

Support processor expansion

...

Here we provide the high level idea of how this can be supported. Most of the logic is expected to be implemented in the stream processing layer.

Note: It is required that, at any given time, the number of partitions of the input topic >= the number of processors of the processing job. In other words, we should always expand partitions of the input topic before expanding processors of the processing job.

When a new processor is added to the processing job, the processor should be able to know the following information:

- The set S1 of partitions that the new processor should be consuming after its local state is caught up.

- The latest offsets T1 of partitions in the set S1 that have been consumed by the existing processors in the job. This information should be periodically retrieved over time.

- The set S2 of partitions that may contain messages with the same key as the messages in S1. S2 includes all partitions in S1. Set S2 can be determined using the logic added in KIP-253.

- The latest offsets T2 of partitions in the set S2 that has been used to generate the local state store.

 

Initally offsets T2 will be 0 for all partitions in the set S2 because the local state store is empty. Offsets T1 will keep growing as the existing processors keep consuming messages from partitions in the set S1. The new processor should keep reading messages from partitions in S2 to re-build local state until offsets T2 have "almost" caught up with offsets T1 of the existing processors for all partitions in S1. Note that this follows the design in KIP-253 such that, before any message can be read by the new processor from the newly-added partitions in S1, all prior messages with the same key (as any message in partitions S1) must have been consumed in order.

Also note that, while the new processor is catching up, the existing procesor will still be consuming all new messages of all partitions of the input topic and no rebalance has been triggered yet. And new processor uses the messages to re-build local state without generating any output.

After offsets T2 have "almost" caught up with offsets T1, we should trigger rebalance to re-assign the partitions in S1 from the existing procesor to the new processor. The existing processor should commit offset, generate output and stop consuming messages from partitions in S1. The new processor should first consume messages from offset T2 to offset T1 to finish re-building the local state. Then it can consume messages starting from offsets T1 and genreate output for these messages.

 

Compatibility, Deprecation, and Migration Plan

TODO

 

Rejected Alternatives

TODO