Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
languagejava
titleKafkaTransactionsExample.java
linenumberstrue
public class KafkaTransactionsExample {
 
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);


    // Note that the ‘transaction.app.id’ configuration _must_ be specified in the 
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

    // We need to initialize transactions once per producer instance. To use transactions, 
    // it is assumed that the application id is specified in the config with the key 
    // transactions.app.id.
    // 
    // This method will recover or abort transactions initiated by previous instances of a 
    // producer with the same app id. Any other transactional messages will report an error 
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will 
    // require a new producer  instance. See the documentation for TransactionMetadata for a 
    // list of error codes.
    producer.initTransactions();
    
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed 
        // records as well
        // as an records produced as a result of processing the input records. 
        //
        // We need to check the response to make sure that this producer is able to initiate 
        // a new transaction.
        producer.beginTransaction();
        
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord); 
        }
        
        // To ensure that the consumed and produced messages are batched, we need to commit 
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
        
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        
     
        // Now that we have consumed, processed, and produced a batch of messages, let's 
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransaction();
      }
    } 
  }
}

New Configurations

Broker configs

 

transaction.app.id.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer AppID without receiving any transaction status updates from it.

max.transaction.timeout.ms

The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return an error in BeginTxnRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Producer configs

 

transaction.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

This config value will be sent to the transaction coordinator along with the BeginTxnRequest.

transaction.app.id

A unique and persistent way to identify a producer. This is used to ensure idempotency and to enable transaction recovery or rollback across producer sessions. This is optional: you will lose cross-session guarantees if this is blank.

Consumer configs

 

fetchisolation.modelevel

Here are the possible values (default is all):

allread.uncommitted: consume both committed and uncommitted messages in offset ordering.

read.committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction.

Proposed Changes

Summary of Guarantees

Idempotent Producer Guarantees

To implement idempotent producer semantics, we introduce the concepts of a producer id, henceforth called the PID, and sequence numbers for Kafka messages. Every new producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users and is never exposed by clients.


For a given PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. The sequence number will be incremented for every message sent by the producer. Similarly, the broker will increment the sequence number associated with the PID/TopicPartition pair for every message it commits for that TopicPartition. The broker will reject a message from a producer unless its sequence number is exactly one greater than the last committed message from that PID/TopicPartition pair.

This ensures that, even though a producer must retry requests upon failures, every message will be persisted in the log exactly once. Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session.

Transactional Guarantees

At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit.


Further, since consumer progress is recorded as a write to the offsets topic, the above capability is leveraged to enable applications to batch consumed and produced messages into a single atomic unit, ie. a set of messages may be considered consumed only if the entire ‘consume-transform-produce’ executed in its entirety.

Additionally, stateful applications will also be able to ensure continuity across multiple sessions of the application. In other words, Kafka can guarantee idempotent production and transaction recovery across application bounces.

To achieve this, we require that the application provides a unique id which is stable across all sessions of the application. For the rest of this document, we refer to such an id as the AppId. While there may be a 1-1 mapping between an AppId and the internal PID, the main difference is the the AppId is provided by users, and is what enables idempotent guarantees across producers sessions described below.

When provided with such an AppId, Kafka will guarantee:
  1. Exactly one active producer with a given AppId. This is achieved by fencing off old generations when a new instance with the same AppId comes online.

  2. Transaction recovery across application sessions. If an application instance dies, the next instance can be guaranteed that any unfinished transactions have been completed (whether aborted or committed), leaving the new instance in a clean state prior to resuming work.

Key Concepts

To implement transactions, ie. ensuring that a group of messages are produced and consumed atomically, we introduce several new concepts:

  1. We introduce a new entity called a Transaction Coordinator. Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning PIDs and managing transactions is done by the transaction coordinator.
  2. We introduce a new internal kafka topic called the Transaction Log. Similar to the Consumer Offsets topic, the transaction log is a persistent and replicated record of every transaction. The transaction log is the state store for the transaction coordinator, with the snapshot of the latest version of the log encapsulating the current state of each active transaction.
  3. We introduce the notion of Control Messages. These are special messages written to user topics, processed by clients, but never exposed to users. They are used, for instance, to let brokers indicate to consumers if the previously fetched messages have been committed atomically or not. Control messages have been previously proposed here.
  4. We introduce a notion of AppId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same AppId will be able to resume (or abort) any transactions instantiated by the previous instance.
  5. We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given AppId, and hence enables us to maintain transaction guarantees in the event of failures.

...