Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Support processor expansion

 

Here we provide the high level idea of how this can be supported. Most of the logic is expected to be implemented in the stream processing layer.

Note: It is required that, at any given time, the number of partitions of the input topic >= the number of processors of the processing job. In other words, we should always expand partitions of the input topic before expanding processors of the processing job.

When a new processor is started added to the processing job, the processor should be able to know the following information:

- The set S1 of partitions that the new processor should be consuming after its local state is caught up.

...

Initally offsets T2 will be 0 for all partitions in the set S2 because the local state store is empty. Offsets T1 will keep growing as the existing processors keep consuming messages from partitions in the set S1. The new processor should keep reading messages from partitions in S2 to re-build local state until offsets T2 have "almost" caught up with offsets T1 of the existing processors for all partitions in S1. Note that this follows the design in KIP-253 such that, before any message can be read by the new processor from the newly-added partitions in S1, all prior messages with the same key (as any message in partitions S1) must have been consumed in order.

Also note that, while the new processor is catching up, the existing procesor will still be consuming all new messages of all partitions of the input topic and no rebalance has been triggered yet. And new processor uses the messages to re-build local state without generating any output.

After offsets T2 have "almost" caught up with offsets T1, we should trigger rebalance to re-assign the partitions in S1 from the existing procesor to this procssorthe new processor. The previous existing processor should commit offset, generate output and stop consuming messages from partitions in S1. The new processor should first consume messages from offset T2 to offset T1 to finish re-building the local state. Then it can consume messaes staring messages starting from offsets T1 and genreate output for these messages.

...