You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateUnder Discussion

Discussion thread: TBD

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

 

Motivation

Today, a Streams application needs to do the following steps when committing its processing state:

  1. Flushing the produce to make sure all its sent messages have been acknowledged.

  2. Flushing the local persistent state store to make sure all changes have been written.

  3. Committing the consumed offsets for the input Kafka topics.

If a failure happens between step 1) and step 3) above, then upon resuming the application will re-consume these uncommitted messages again from the input Kafka topics and hence result in duplicated data; on the other hand, if we reserve the ordering of these steps then a failure happened in between, then upon resuming the application will not re-consume and process these messages again while the resulted messages may not be successfully appended to the output Kafka topics, hence causing data loss. The root cause of this issue is that, the above three steps client cannot be done atomically.


In this KIP, we proposed to enhance Streams with exactly once processing semantics based on KIP-98: Exactly-Once Delivery and Transactional Messaging. More specifically, we will modify the Streams client to leverage the following provided features in KIP-98:

  1. Idempotent Kafka Producer based on producer identifiers (PIDs) to eliminate duplicates.

  2. Transactional messaging on top of the idempotent producer with consumer committing offsets as part of the transaction.

  3. Read-committed Kafka Consumer to fetch committed message only.


In this KIP we will only focus on the user facing changes: the summary of guarantees, the public API changes, etc. The details of the design is presented in a separate document.


 

Summary of Guarantees

 

With Kafka Streams, each input record fetched from the source Kafka topics will only be processed exactly once: its processing effects, both in potential associated state store updates and resulted records sent to output Kafka topics, will be reflected only one, even under failures.

 


Public Interfaces

The only public interface changes proposed in this KIP is adding one more config to StreamsConfig:

 

 

processing.guarantee

Here are the possible values:


exactly_once: the processing of each record will be reflected exactly once in the application’s state even in the presence of failures.

at_least_once: the processing of each record will be reflected at least once in the application’s state even in the presence of failures.


Default: at_least_once

 

Proposed Changes

In this section, we will present only a very high level overview of the proposed changes to leverage on KIP-98 features and provide the above exactly once processing guarantees.

As mentioned earlier, we have a separate design document which dives into all the implementation details, interested reader are encouraged to read it here. Note that all these changes will be internal and are completely abstracted away from the users.


Transactionally committing a task

When a task needs to be committed, we transform step 3) mentioned in the Motivation section above with the producer.sendOffsetsToTransaction API, so that committing the offsets is wrapped as part of the producer transaction, which will either succeed or fail along with the sent messages within the transaction atomically.

In order to do that, we need to create one producer client per each task instead of one producer per thread shared among all hosted tasks.

On the consumer side, we will set the isolation.level config to read_committed to make sure that any consumed messages are from committed transactions.


Uncleanly shutting down a task

For any updates applied to the local state stores, they usually cannot be rolled back once the current ongoing transaction is determined to be rolled back.

In this case, we cannot reuse the local state stores any more when resuming the task. In this case, we need to indicate that the task was uncleanly shut down and hence we need to restore the states from changelogs, which stores transactional update records and any aborted updates will not be applied.


Better handling runtime errors

Today we throw many of the runtime exceptions to users, which are potentially captured in the user-customizable handler function. With the exactly-once semantics, we should now handle some of those non-fatal errors automatically by aborting the task’s ongoing transaction and then resuming from the last committed transaction.


Compatibility, Deprecation, and Migration Plan

The proposed change should be backward compatible. Users could simple swipe in the new jar as runtime dependency and restart their application to get the Exactly Once semantics.


  • No labels