Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added link to the discussion thread

...

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/n0rbdpbwn9p92xd6mn5m73tlmbqp1627here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10892

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

State stores have proven their use within a single application, the allow us to have a fault-tolerant way of storing and retrieving state within our processors. A compacted topic(the changelog) is being used to make the fault-tolerant characteristic possible. This changelog is considered internal to the application; no other application should be using it.

However, by allowing other applications to use this changelog for building a readonly state store used for lookups, we enable a very powerful paradigm, needed to build event driven architectures at scale. These architectures will have multiple applications working together to accomplish specific business flows and as such, need to share data. While messages can be used to trigger the execution of the logic, the logic itself might want to lookup other data.

Consider two applications; A and B. A writes state s to its state store. This state store is backed by a changelog topic, one that’s internal to A. If B would like to lookup s, there are two possible approaches to take.

  1. Do similar processing A is doing to interpret the same events A does and as a result ending up with a copy of the state.

  2. Create a source and processor for listening to the internal changelog of A and update the state store in B.

...

Proposed Changes

Concepts

Leader

Only one application can be writing to the state store and as such, is allowed to send logs to the changelog. We call this the state store within this application the leading statestore or Leader for short. Not much changes for the leader as its operation and changelog topic name will remain the same.

Follower

Other applications might have a readonly interest in the stored in the state store. We call such a state store a Follower, since they just follow the state of the leader.

State synchronisation between a leader and its followers is done through the changelog, allowing followers to materialize the state based on that topic. It also opens the door to allow changelog entries to be interpreted differently, ending up with a follower based on an existing changelog, but with a subset of the actual data.

Changes

  • Introduce the concept of a leading state store and a following state store.

  • Extend the Topology API to allow a state store to be added with a custom compacted topic name

  • Keep restoring on state stores instead of pausing the restoring consumer

...

Allow state stores to continue listening for changes from their changelog

This might be as easy as removing calls to pauseChangelogsFromRestoreConsumer() within the enforceRestoreActive(), restoreChangelog() and initializeChangelogs() functions inside the StoreChangelogReader.

The Leader will make changes directly to the underlying storage as well as sending a log out on the changelog. However, since we do not stop restoring, even that log will be retrieved from the changelog actually applying the change twice. while this does not provide issues if the solution was engineered towards idempotency, it might provide issues. Therefor it might make sense to block a log for the Leader.

The Follower however needs to receive this change and therefor it will be interpreted and applied to the storage.

Prevent Followers from writing to the compacted topic

Since a changelog topic is created with the application id in it’s name, it would allow us to check in the follower if the changelog topic starts with our application id. If it doesn’t, we are not allowed to send a log.

Compatibility, Deprecation, and Migration Plan

The proposal is an addition to existing functionality and should not have any impact to existing users or the way they have been using Kafka Streams.

Rejected Alternatives

Consider two applications; A and B. A writes state s to its state store. This state store is backed by a changelog topic, one that’s internal to A. If B would like to lookup s, there are two possible approaches to take.

Copy approach

Do similar processing A is doing to interpret the same events A does and as a result ending up with a copy of the state. This would also require the state store in B to have its own internal changelog topic effectively creating a copy of all state on a new topic.

While this approach can work if there is a limited amount of data on the changelog topic, it becomes problematic when the amount of data grows. With the introduction of tiered storage support, it became more likely for state to grow beyond obvious numbers (into the TB, even possibly PB)

Source-Processor approach

Create a source and processor for listening to the internal changelog of A and update the state store in B. B’s state store has to have logging disabled in order to prevent an uncontrollable chain reaction of changes (especially if there is a C in play also requiring access to s).

Problem with this approach is the lack of having restoring support within the state store. Restoring is a separate phase before the full topology is in effect. Once restored, a state store stops listening to its changelog and switches to a produce-only mode. Without restoring support, processors using the state store will have a hard time functioning since they might be reading a stale version of s, or not even have s at all available yet.