Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


  • We may be taking input from multiple sources and the ordering across these sources may be non-deterministic
  • Likewise we may be producing output to multiple destination topics
  • We may be aggregating or joining across multiple inputs using the managed state facilities the streams api provides
  • We may be looking up side information in a data system or calling out to a service that is updated out of band

Duplicates or Failure and restart, especially when combined with non-determinism when combined with and changes to the persistent state computed by the application, may result not only in duplicates but in incorrect results. For example if one stage of processing is computing a count of the number of events seen then a duplicate in an upstream processing stage may lead to an incorrect count downstream. Hence the phrase "Exactly once processing" is a bit narrow, though it is correct to think of the guarantee as ensuring that the output would be the same as it would if the stream processor saw each message exactly one time (as it would in a case where no failure occurred).

Currently no stream processing layer addresses this problem when used with Kafka (or most other similar low-latency messaging layers). Some systems prevent duplication in the processing tier, but none handle duplicates in the import of streams into Kafka or the integration between the processing layer and Kafka. Even some of the systems that claim to provide strong semantics in their processing layer, discounting the integration with Kafka, have bugs design limitations that lead to incorrect results in the presence of non-determinisim. In Unlike in batch processing, in stream processing non-determinism , say due to looking up data in a is extremely common. Simple things like the arrival order of data from different inputs or lookups against data in an external DB or service that is being updated asynchronously or due to the arrival order of data, is so common as to render a guarantee that holds only for fully deterministic systems is quite limited in value.will introduce non-determinism to processing. Hence any semantic guarantee that doesn't handle non-determinism will be quite limited in its usefulness.

How can we provide a strong guarantee for systems to enable correct stream processing with streams coming from and going to Kafka? The features in KIP-98 provide a basis for fixing this. We can think of stream processing in Kafka as any application that performs the following activities:


The key problem of ensuring correct semantics is to guarantee that these actions all either happen together or don't happen together. If the application undergoes the state change but fails to produce output, or produces only part of its output, or does not record local state modifications, then upon restart it will not pick up in the same state and can't be guaranteed to produce an output it might have in a non-failure scenario.

KIP-98 provides a mechanism for wrapping all four of these actions in a single atomic "transaction" that will either happen or not happen. This combination of features in Kafka—Kafka based offset storage, transactional write capabilities, and compacted topics for journaling state changes—allow us to view the act of stream processing as a set of Kafka updates and KIP-98 allows these updates to all occur transactionally. In this sense KIP-98 brings a kind of "closure" to the Kafka protocol and makes it a basis for stream processing.

This is a very general protocol level facility and could be wrapped up in any number of possible processing or application development layers, interfaces, or languages.

However building a stream processing application using raw transactions exposed in the producer and consumer is still some work. The goal of this KIP is to take advantage of this protocol-level capability to provide strong semantics in the presence of failure in Kafka's own streams api in a way that is transparent to the end programmeruser.

In this KIP we will only focus on the user facing changes: the summary of guarantees, the public API changes, etc. The details of the design is presented in a separate document. 


The only public interface changes proposed in this KIP is adding one more config to StreamsConfig:



Here are the possible values:

exactly_once: the processing of each record will be reflected exactly once in the application’s state even in the presence of failures.

at_least_once: the processing of each record will be reflected at least once in the application’s state even in the presence of failures.

Default: at_least_once
