Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

State stores have proven their use within a single application, the The use of state stores is crucial in developing streaming applications. They allow us to have a fault-tolerant way of storing and retrieving state within our processors as well as exposing that data outside of our topology. A compacted topic(the changelog) is being used to make the fault-tolerant characteristic possible. This changelog is considered internal to the application; no other application should be using it.

However, by allowing other applications to use this changelog for building a readonly state store used for lookups, we enable a very powerful paradigm, needed to build event driven architectures at scale. These architectures will have multiple applications working together to accomplish specific business flows and as such, need to share data. While messages can be used to trigger the execution of the logic, the logic itself might want to lookup other data.

Consider two applications; A and B. A writes state s to its state store. This state store is backed by a changelog topic, one that’s internal to A. If B would like to lookup s, there are two possible approaches to take.

...

Do similar processing A is doing to interpret the same events A does and as a result ending up with a copy of the state.

statestore and is set as part of the internal logic within a topology. There is no way to pass the compacted topic for a statestore to be used.

With the introduction of tiered storage support, it becomes possible for compacted topics to grow up to the point where it becomes unfeasible to copy the data around. Therefore having a single compacted topic serving multiple state stores starts to make sense

...

.

Public Interfaces

org/apache/kafka/streams/Topology will be extended with an overloaded addStateStore addReadOnlyStateStore method allowing the changelog topic to be passed in.

Proposed Changes

Concepts

Leader

Only one application can be writing to the state store and as such, is allowed to send logs to the changelog. We call this the state store within this application the leading statestore or Leader for short. Not much changes for the leader as its operation and changelog topic name will remain the same.

Follower

Other applications might have a readonly interest in the stored in the state store. We call such a state store a Follower, since they just follow the state of the leader.

State synchronisation between a leader and its followers is done through the changelog, allowing followers to materialize the state based on that topic. It also opens the door to allow changelog entries to be interpreted differently, ending up with a follower based on an existing changelog, but with a subset of the actual data.

Changes

  • Introduce the concept of a leading state store and a following state store.

  • Extend the Topology API to allow a state store to be added with a custom compacted topic name

  • Keep restoring on state stores instead of pausing the restoring consumer

Implementation Plan

Allow state stores to read from arbitrary compacted topics

Code Block
/**
 * Adds a state store, but define the topic to use as a changelog topic.
 *
 * @param topic the changelog topic name
 * @param storeBuilder the storeBuilder used to obtain this state store {@link StateStore} instance
 * @param processorNames the names of the processors that should be able to access the provided store
 * @return itself
 * @throws TopologyException if state store supplier is already added
 */
public synchronized Topology addStateStore(final String topic, final StoreBuilder<?> storeBuilder,
                                           final String... processorNames) {
    internalTopologyBuilder.addStateStore(storeBuilder, processorNames);
    internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
    return this;
}
Allow state stores to continue listening for changes from their changelog

This might be as easy as removing calls to pauseChangelogsFromRestoreConsumer() within the enforceRestoreActive(), restoreChangelog() and initializeChangelogs() functions inside the StoreChangelogReader.

The Leader will make changes directly to the underlying storage as well as sending a log out on the changelog. However, since we do not stop restoring, even that log will be retrieved from the changelog actually applying the change twice. while this does not provide issues if the solution was engineered towards idempotency, it might provide issues. Therefor it might make sense to block a log for the Leader.

The Follower however needs to receive this change and therefor it will be interpreted and applied to the storage.

Prevent Followers from writing to the compacted topic

  • Extend the topology API with an addReadOnlyStateStore() method allowing to pass the topic along with a store builder and processor names.

Implementation Plan

The addReadOnlyStateStore()  method would be a wrapper around the following calls:

  • Create a source for the topic and KV deserializers being passed in.
  • Create a processor based on the ProcessorProvider.
  • Create a state store based on the passed store builder.

Logging on the passed-in StoreBuilder will be disabled to prevent changes to the compacted topic.

The source name is constructed as <store_name>-readonly-log 

The processor name is constructed as <store_name>-processor Since a changelog topic is created with the application id in it’s name, it would allow us to check in the follower if the changelog topic starts with our application id. If it doesn’t, we are not allowed to send a log.

Compatibility, Deprecation, and Migration Plan

...

While this approach can work if there is a limited amount of data on the changelog topic, it becomes problematic when the amount of data grows. With the introduction of tiered storage support, it became more likely for state to grow beyond obvious numbers (into the TB, even possibly PB)

Source-Processor approach

Create a source and processor for listening to the internal changelog of A and update the state store in B. B’s state store has to have logging disabled in order to prevent an uncontrollable chain reaction of changes (especially if there is a C in play also requiring access to s).

Problem with this approach is the lack of having restoring support within the state store. Restoring is a separate phase before the full topology is in effect. Once restored, a state store stops listening to its changelog and switches to a produce-only mode. Without restoring support, processors using the state store will have a hard time functioning since they might be reading a stale version of s, or not even have s at all available yet.