You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

 

Status

Current stateUnder Discussion

Discussion threadhere

JIRAhere

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-253 ensures that messages with the same key from the same producer can always be delivered in the order that they are produced. However, for stateful stream processing jobs, after we change the number of partitions or the number of processors, the messages with the same key may go to a different processor that does not have the state necessary to process this message. This may cause stateful stream processing job to output wrong result.

In this KIP we propose a solution that allows user to expand number of partitions of input topic as well as the number of processors of stateful processing jobs while still ensuring output correctness.

Note: In this KIP, we use the terminology "processor" to indicate a combination of consumer, local state store and user-specified processing logic.

Goals

 

In addition to ensuring correctness, we also aim to achieve the following goals to optimize the efficiency of this KIP.

1) When we expand partitions of input topics, we should not change the key -> processor mapping, so that no state has to be moved between the existing processors. This in turn would guarantee correctness because KIP-253 ensures that messages with the same key can be delivered in the order that they are produced.

2) When we expand processors of a given stateful processing job, some key will be assigned to the newly-added processors. And it would be necessary to copy the state for the corresponding keys to these processors before these processors can consume messages and generate output. However, we should not have to copy state between existing processors.

3) In order to avoid disrupting existing use-case and keep backward compatibility, the 3-tuple (topic, partition, offset) should uniquely identify the same message before and after the expansion. And a given message should be consumed exactly once by bootstrapping consumers after the partition expansion. This goal suggests that we should not delete existing message or copy any message in the topic.

Public Interface

TODO

 

Proposed Changes

Support partition expansion

Here we describe how to support partition expansion while keeping the same key -> processor mapping.

Here are the requirements. We skip the detail for now (e.g. how to remember the initial number of consumers in the consumer group).

1) Stateful processing job starts with the same number of processors as the initial number of partitions of the input topics

2) At any given time the number of partitions of the input topic >= the number of processors of the processing job. In other words, we always expand partitions of the input topic before expanding processors of the processing job.

3) The processing job remembers the initial number of processors in the job. This can be supported in core Kafka by remembering the initial number of consumers in the consumer group.

 4) The partition -> processor mapping is determined using linear hashing. This is where we need the initial number of processors in the job.

It can be proven that the above requirements would keep the same key -> processor mapping regardless of how we change the partition number of the input topics, as long as we don't change the number of processors of the processing job.

 

We skip the prove here. Reader can thinking about how the key -> processor mapping changes in the following example:

- Input topic initially has 10 partitions and processing job initially has 10 processors.

- Expand partition of the input topic to 15.

- Expand processors of the processing job to 15.

- Expand partition of the input topic to 18.

 

Support processor expansion

 

Here we provide the high level idea of how this can be supported. Most of the logic is expected to be implemented in the stream processing layer.

 

 

 

 

 

  • No labels