Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted: [VOTE] KIP-129: Kafka Streams Exactly-Once Semantics

Discussion thread: TBD
JIRA: TBD[DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4923

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • We may be taking input from multiple sources and the ordering across these sources may be non-deterministic
  • Likewise we may be producing output to multiple destination topics
  • We may be aggregating or joining across multiple inputs using the managed state facilities the streams api provides
  • We may be looking up side information in a data system or calling out to a service that is updated out of band

Duplicates or Failure and restart, especially when combined with non-determinism when combined with and changes to the persistent state computed by the application, may result not only in duplicates but in incorrect results. For example if one stage of processing is computing a count of the number of events seen then a duplicate in an upstream processing stage may lead to an incorrect count downstream. Hence the phrase "Exactly once processing" is a bit narrow, though it is correct to think of the guarantee as ensuring that the output would be the same as it would if the stream processor saw each message exactly one time (as it would in a case where no failure occurred).

Currently no stream processing layer addresses this problem when used with Kafka (or most other similar low-latency messaging layers). Some systems prevent duplication in the processing tier, but none handle duplicates in the import of streams into Kafka or the integration between the processing layer and Kafka. Even some of the systems that claim to provide strong semantics in their processing layer, discounting the integration with Kafka, have bugs design limitations that lead to incorrect results in the presence of non-determinisim. In Unlike in batch processing, in stream processing non-determinism , say due to looking up data in a is extremely common. Simple things like the arrival order of data from different inputs or lookups against data in an external DB or service that is being updated asynchronously or due to the arrival order of data, is so common as to render a guarantee that holds only for fully deterministic systems is quite limited in value.will introduce non-determinism to processing. Hence any semantic guarantee that doesn't handle non-determinism will be quite limited in its usefulness.

How can we provide a strong guarantee for systems to enable correct stream processing with streams coming from and going to Kafka? The features in KIP-98 provide a basis for fixing this. We can think of stream processing in Kafka as any application that performs the following activities:

...

The key problem of ensuring correct semantics is to guarantee that these actions all either happen together or don't happen together. If the application undergoes the state change but fails to produce output, or produces only part of its output, or does not record local state modifications, then upon restart it will not pick up in the same state and can't be guaranteed to produce an output it might have in a non-failure scenario.

KIP-98 provides a mechanism for wrapping all four of these actions in a single atomic "transaction" that will either happen or not happen. This combination of features in Kafka—Kafka based offset storage, transactional write capabilities, and compacted topics for journaling state changes—allow us to view the act of stream processing as a set of Kafka updates and KIP-98 allows these updates to all occur transactionally. In this sense KIP-98 brings a kind of "closure" to the Kafka protocol and makes it a basis for stream processing.

This is a very general protocol level facility and could be wrapped up in any number of possible processing or application development layers, interfaces, or languages.

However building a stream processing application using raw transactions exposed in the producer and consumer is still some work. The goal of this KIP is to take advantage of this protocol-level capability to provide strong semantics in the presence of failure in Kafka's own streams api in a way that is transparent to the end programmeruser.

In this KIP we will only focus on the user facing changes: the summary of guarantees, the public API changes, etc. The details of the design is presented in a separate document. 

...

 

processing.guarantee

Here are the possible values:


exactly_once: the processing of each record will be reflected exactly once in the application’s state even in the presence of failures.

at_least_once: the processing of each record will be reflected at least once in the application’s state even in the presence of failures.


Default: at_least_once

 

The tradeoff between these modes is that in exactly_once mode the output will be wrapped in a transaction and hence to consumers running in read_committed mode, will not be available until that task commits it's output. The frequency of this is controlled by the existing config commit.interval.ms. This can be made arbitrarily small, down to processing each input in its own transaction, though as that commit becomes more frequent the overhead of the transaction will be higher.

...

The proposed change should be backward compatible. Users could simple swipe in the new jar as runtime dependency and restart their application (plus changing the config of processing.guarantee) to get the Exactly Once semantics.

Rejected Alternatives

  • "producer per thread" design (including a discussion about pros/cons for both approaches)
    • https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_zXISV4oE0ZeMZwT_sG1QWgL4EE/edit
    • "Our overall proposal is, to implement KIP-129 as is as a “Stream EoS 1.0” version. The raised concerns are all valid, but hard to quantify at the moment. Implementing KIP-129, that provides a clean design, allows us to gain more insight in the performance implications. This enables us, to make an educated decision, if the “producer per task” model is performant enough (with or without any of the proposed improvements) or if a switch to a “producer per thread” model is mandatory. If we do decide to do the “producer per thread” design, the changes should be incremental and easily achievable."