Status
Current state: Under Discussion
Discussion thread: TBD
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-98 added the following capabilities to Apache Kafka
An Idempotent Producer based on producer identifiers (PIDs) to eliminate duplicates.
Cross-partition transactions for writes and offset commits
- Consumer support for fetching only committed messages
This proposal makes use of these capabilities to strengthen the semantics of Kafka's streams api for stream processing.
The critical question for a stream processing system is "does my stream processing application get the right answer, even if one of the instances crashes in the middle of processing?". The challenge in ensuring this is resuming the work being carried out by the failed instances in exactly the same state as before the crash.
A simple example of this in Kafka land would be a stream processor which took input from some topic, transformed it, and produced output to a new output topic. In this case "getting the right answer" means neither missing any input messages nor producing any duplicate output. This is often called "exactly once semantics" or "exactly once delivery".
In reality the failure scenarios are far more complex than this simple case of producing duplicates:
- We may be taking input from multiple sources and the ordering across these sources may be non-deterministic
- Likewise we may be producing output to multiple destination topics
- We may be aggregating or joining across multiple inputs using the managed state facilities the streams api provides
- We may be looking up side information in a data system or calling out to a service that is updated out of band
Duplicates or non-determinism when combined with changes to the persistent state computed by the application may result not only in duplicates but in incorrect results. Hence the phrase "Exactly once processing"
Currently no stream processing layer addresses this problem when used with Kafka (or most other similar low-latency messaging layers). Some systems prevent duplication in the processing tier, but none handle duplicates in the import of streams into Kafka or the integration between the processing layer and Kafka. Even some of the systems that claim to provide strong semantics in their processing layer, discounting the integration with Kafka, have bugs in the presence of non-determinisim. In stream processing non-determinism, say due to looking up data in a DB or service that is being updated asynchronously or due to the arrival order of data, is so common as to render a guarantee that holds only for fully deterministic systems is quite limited in value.
The features in KIP-98 provide a basis for fixing this. We can think of stream processing in Kafka as any application that performs the following activities:
- Reading input from one or more input topics
- Making changes to local state, which, however it is layed out on disk or in memory can be journaled to a compacted Kafka topic
- Producing output to one or more output topics
- Updating the consumer offset to mark the input as processed
The key problem of ensuring correct semantics is to guarantee that these actions all either happen together or don't happen together. If the application undergoes the state change but fails to produce output, or produces only part of its output then upon restart it will not pick up in the same state.
KIP-98 provides a mechanism for wrapping all four of these actions in a single atomic "transaction" that will either happen or not happen. This combination of features in Kafka—Kafka based offset storage, transactional write capabilities, and compacted topics for journaling state changes—allow us to view the act of stream processing as a set of Kafka updates and KIP-98 allows these updates to all occur transactionally. In this sense KIP-98 brings a kind of "closure" to the Kafka protocol and makes it a basis for stream processing.
However building a stream processing application using raw transactions in the producer and consumer is still some work. The goal of this KIP is to take advantage of this protocol-level capability to provide strong semantics in the presence of failure in Kafka's own streams api in a way that is transparent to the end programmer.
In this KIP we will only focus on the user facing changes: the summary of guarantees, the public API changes, etc. The details of the design is presented in a separate document.
Summary of Guarantees
With Kafka Streams, each input record fetched from the source Kafka topics will only be processed exactly once: its processing effects, both in potential associated state store updates and resulted records sent to output Kafka topics, will be reflected only one, even under failures.
Public Interfaces
The only public interface changes proposed in this KIP is adding one more config to StreamsConfig:
processing.guarantee | Here are the possible values: exactly_once: the processing of each record will be reflected exactly once in the application’s state even in the presence of failures. at_least_once: the processing of each record will be reflected at least once in the application’s state even in the presence of failures. Default: at_least_once |
The tradeoff between these modes is that in exactly_once mode the output will be wrapped in a transaction and hence to consumers running in read_committed mode, will not be available until that task commits it's output. The frequency of this is controlled by the existing config commit.interval.ms. This can be made arbitrarily small, down to processing each input in its own transaction, though as that commit becomes more frequent the overhead of the transaction will be higher.
Proposed Changes
In this section, we will present only a very high level overview of the proposed changes to leverage on KIP-98 features and provide the above exactly once processing guarantees.
As mentioned earlier, we have a separate design document which dives into all the implementation details, interested reader are encouraged to read it here. Note that all these changes will be internal and are completely abstracted away from the users.
Transactionally committing a task
When a task needs to be committed, we need to write the offsets using the producer.sendOffsetsToTransaction API, so that committing the offsets is wrapped as part of the producer transaction, which will either succeed or fail along with the sent messages within the transaction atomically.
In order to do that, we need to create one producer client per each task instead of one producer per thread shared among all hosted tasks.
On the consumer side, we will set the isolation.level config to read_committed
to make sure that any consumed messages are from committed transactions.
Uncleanly shutting down a task
For any updates applied to the local state stores, they usually cannot be rolled back once the current ongoing transaction is determined to be rolled back.
In this case, we cannot reuse the local state stores any more when resuming the task. In this case, we need to indicate that the task was uncleanly shut down and hence we need to restore the states from changelogs, which stores transactional update records and any aborted updates will not be applied.
Better handling runtime errors
Today we throw many of the runtime exceptions to users, which are potentially captured in the user-customizable handler function. With the exactly-once semantics, we should now handle some of those non-fatal errors automatically by aborting the task’s ongoing transaction and then resuming from the last committed transaction.
Compatibility, Deprecation, and Migration Plan
The proposed change should be backward compatible. Users could simple swipe in the new jar as runtime dependency and restart their application (plus changing the config of processing.guarantee
) to get the Exactly Once semantics.