Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Changed state to accepted

...

This KIP dives deeper into the idea of Shareable State Stores, the ability to use data within a state store across multiple applications without duplicating it on topic level.

Status

Current state: Under DiscussionAccepted

Discussion thread: : https://lists.apache.org/thread/n0rbdpbwn9p92xd6mn5m73tlmbqp1627here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10892

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

...

The use of state stores is crucial in developing streaming applications. They allow us to have a fault-tolerant way of storing and retrieving state within our processors as well as exposing that data outside of our topology. A compacted topic(the changelog) is being used to make the fault-tolerant characteristic possible. This changelog is considered internal to the

...

However, by allowing other applications to use this changelog for building a readonly state store used for lookups, we enable a very powerful paradigm, needed to build event driven architectures at scale. These architectures will have multiple applications working together to accomplish specific business flows and as such, need to share data. While messages can be used to trigger the execution of the logic, the logic itself might want to lookup other data.

Consider two applications; A and B. A writes state s to its state store. This state store is backed by a changelog topic, one that’s internal to A. If B would like to lookup s, there are two possible approaches to take.

...

Do similar processing A is doing to interpret the same events A does and as a result ending up with a copy of the state.

statestore and is set as part of the internal logic within a topology. There is no way to pass the compacted topic for a statestore to be used.

With the introduction of tiered storage support, it becomes possible for compacted topics to grow up to the point where it becomes unfeasible to copy the data around. Therefore having a single compacted topic serving multiple state stores starts to make sense

...

.

Public Interfaces

org/apache/kafka/streams/Topology will be extended with an overloaded addStateStore addReadOnlyStateStore method allowing the changelog topic to be passed in.

Proposed Changes

Concepts

Leader

Only one application can be writing to the state store and as such, is allowed to send logs to the changelog. We call this the state store within this application the leading statestore or Leader for short. Not much changes for the leader as its operation and changelog topic name will remain the same.

Follower

Other applications might have a readonly interest in the stored in the state store. We call such a state store a Follower, since they just follow the state of the leader.

State synchronisation between a leader and its followers is done through the changelog, allowing followers to materialize the state based on that topic. It also opens the door to allow changelog entries to be interpreted differently, ending up with a follower based on an existing changelog, but with a subset of the actual data.

Changes

...

Extend the

...

Keep restoring on state stores instead of pausing the restoring consumer

Implementation Plan

Allow state stores to read from arbitrary compacted topics

...

topology API with the following method:

addReadOnlyStateStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer<KIn> keyDeserializer,
final Deserializer<VIn> valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier)

The method signature is aligned with the one for adding global state stores. Similarly, there is an overloaded method which doesn't take the TimestampExtractor:

addReadOnlyStateStore(final StoreBuilder<?> storeBuilder,

...


final String sourceName,
final

...

 Deserializer<KIn> keyDeserializer,
final Deserializer<VIn> valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier)

The first method would be a wrapper around the following calls:

  • Create a source for the topic and KV deserializers being passed in.
  • Create a processor based on the ProcessorSupplier.
  • Create a state store based on the passed store builder.

Logging on the passed-in StoreBuilder will be disabled to prevent changes to the compacted topic

Allow state stores to continue listening for changes from their changelog

This might be as easy as removing calls to pauseChangelogsFromRestoreConsumer() within the enforceRestoreActive(), restoreChangelog() and initializeChangelogs() functions inside the StoreChangelogReader.

The Leader will make changes directly to the underlying storage as well as sending a log out on the changelog. However, since we do not stop restoring, even that log will be retrieved from the changelog actually applying the change twice. while this does not provide issues if the solution was engineered towards idempotency, it might provide issues. Therefor it might make sense to block a log for the Leader.

The Follower however needs to receive this change and therefor it will be interpreted and applied to the storage.

Prevent Followers from writing to the compacted topic

...

.

Compatibility, Deprecation, and Migration Plan

The proposal is an addition to existing functionality and should not have any impact to existing users or the way they have been using Kafka Streams.

Rejected Alternatives

Consider two applications; A and B. A writes state s to its state store. This state store is backed by a changelog topic, one that’s internal to A. If B would like to lookup s, there are two possible approaches to take.

Copy approach

Do similar processing A is doing to interpret the same events A does and as a result ending up with a copy of the state. This would also require the state store in B to have its own internal changelog topic effectively creating a copy of all state on a new topic.

While this approach can work if there is a limited amount of data on the changelog topic, it becomes problematic when the amount of data grows. With the introduction of tiered storage support, it became more likely for state to grow beyond obvious numbers (into the TB, even possibly PB

...

Source-Processor approach

...

)

...