You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This KIP dives deeper into the idea of Shareable State Stores, the ability to use data within a state store across multiple applications without duplicating it on topic level.

Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/n0rbdpbwn9p92xd6mn5m73tlmbqp1627

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

State stores have proven their use within a single application, the allow us to have a fault-tolerant way of storing and retrieving state within our processors. A compacted topic(the changelog) is being used to make the fault-tolerant characteristic possible. This changelog is considered internal to the application; no other application should be using it.

However, by allowing other applications to use this changelog for building a readonly state store used for lookups, we enable a very powerful paradigm, needed to build event driven architectures at scale. These architectures will have multiple applications working together to accomplish specific business flows and as such, need to share data. While messages can be used to trigger the execution of the logic, the logic itself might want to lookup other data.

Consider two applications; A and B. A writes state s to its state store. This state store is backed by a changelog topic, one that’s internal to A. If B would like to lookup s, there are two possible approaches to take.

  1. Do similar processing A is doing to interpret the same events A does and as a result ending up with a copy of the state.

  2. Create a source and processor for listening to the internal changelog of A and update the state store in B.

Public Interfaces

org/apache/kafka/streams/Topology will be extended with an overloaded addStateStore method allowing the changelog topic to be passed in.

Proposed Changes

Concepts

Leader

Only one application can be writing to the state store and as such, is allowed to send logs to the changelog. We call this the state store within this application the leading statestore or Leader for short. Not much changes for the leader as its operation and changelog topic name will remain the same.

Follower

Other applications might have a readonly interest in the stored in the state store. We call such a state store a Follower, since they just follow the state of the leader.

State synchronisation between a leader and its followers is done through the changelog, allowing followers to materialize the state based on that topic. It also opens the door to allow changelog entries to be interpreted differently, ending up with a follower based on an existing changelog, but with a subset of the actual data.

Changes

  • Introduce the concept of a leading state store and a following state store.

  • Extend the Topology API to allow a state store to be added with a custom compacted topic name

  • Keep restoring on state stores instead of pausing the restoring consumer

Implementation Plan

Allow state stores to read from arbitrary compacted topics

/**
 * Adds a state store, but define the topic to use as a changelog topic.
 *
 * @param topic the changelog topic name
 * @param storeBuilder the storeBuilder used to obtain this state store {@link StateStore} instance
 * @param processorNames the names of the processors that should be able to access the provided store
 * @return itself
 * @throws TopologyException if state store supplier is already added
 */
public synchronized Topology addStateStore(final String topic, final StoreBuilder<?> storeBuilder,
                                           final String... processorNames) {
    internalTopologyBuilder.addStateStore(storeBuilder, processorNames);
    internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
    return this;
}
Allow state stores to continue listening for changes from their changelog

This might be as easy as removing calls to pauseChangelogsFromRestoreConsumer() within the enforceRestoreActive(), restoreChangelog() and initializeChangelogs() functions inside the StoreChangelogReader.

The Leader will make changes directly to the underlying storage as well as sending a log out on the changelog. However, since we do not stop restoring, even that log will be retrieved from the changelog actually applying the change twice. while this does not provide issues if the solution was engineered towards idempotency, it might provide issues. Therefor it might make sense to block a log for the Leader.

The Follower however needs to receive this change and therefor it will be interpreted and applied to the storage.

Prevent Followers from writing to the compacted topic

Since a changelog topic is created with the application id in it’s name, it would allow us to check in the follower if the changelog topic starts with our application id. If it doesn’t, we are not allowed to send a log.

Compatibility, Deprecation, and Migration Plan

The proposal is an addition to existing functionality and should not have any impact to existing users or the way they have been using Kafka Streams.

Rejected Alternatives

Consider two applications; A and B. A writes state s to its state store. This state store is backed by a changelog topic, one that’s internal to A. If B would like to lookup s, there are two possible approaches to take.

Copy approach

Do similar processing A is doing to interpret the same events A does and as a result ending up with a copy of the state. This would also require the state store in B to have its own internal changelog topic effectively creating a copy of all state on a new topic.

While this approach can work if there is a limited amount of data on the changelog topic, it becomes problematic when the amount of data grows. With the introduction of tiered storage support, it became more likely for state to grow beyond obvious numbers (into the TB, even possibly PB)

Source-Processor approach

Create a source and processor for listening to the internal changelog of A and update the state store in B. B’s state store has to have logging disabled in order to prevent an uncontrollable chain reaction of changes (especially if there is a C in play also requiring access to s).

Problem with this approach is the lack of having restoring support within the state store. Restoring is a separate phase before the full topology is in effect. Once restored, a state store stops listening to its changelog and switches to a produce-only mode. Without restoring support, processors using the state store will have a hard time functioning since they might be reading a stale version of s, or not even have s at all available yet.

  • No labels