Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Renamed sendOffsets to sendOffsetsToTransaction. Also replaced all instances of `txn` in API method to `transaction`

...

Code Block
languagejava
titleKafkaProducer.java
linenumberstrue
public interface Producer<K,V> extends Closeable {
  
  /**
   * Needs to be called before any of the other transaction methods. Assumes that 
   * the producer.app.id is specified in the producer configuration.
   *
   * This method does the following:
   *   1. Ensures any transactions initiated by previous instances of the producer
   *      are committed or rolled back.
   *   2. Gets the internal producer id and epoch, used in all future transactional
   *      messages issued by the producer.
   *
   * @throws IllegalStateException if the appId for the producer is not set 
   *         in the configuration.
   */
  void initTransactions() throws IllegalStateException;
  
  /**
   * Should be called before the start of each new transaction. 
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void beginTransaction() throws ProducerFencedException;
  
  /** 
   * Sends a list of consumed offsets to the consumer group coordinator, and also marks 
   * those offsets as part of the current transaction. These offsets will be considered 
   * consumed only if the transaction is committed successfully.
   *
   * This method should be used when you need to batch consumed and produced messages 
   * together, typically in a consume-transform-produce pattern.
   * 
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void sendOffsetssendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, 
                                  String consumerGroupId) throws ProducerFencedException;
  
  /**
   * Commits the ongoing transaction. 
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void commitTransaction() throws ProducerFencedException;
  
  /**
   * Aborts the ongoing transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 


   */
  void abortTransaction() throws ProducerFencedException;


  /**
   * Send the given record asynchronously and return a future which will eventually contain the response information.
   *
   * @param record The record to send
   * @return A future which will eventually contain the response information
   *
   * @throws UnrecognizedMessageException if the broker detects data loss: ie. previous messages which we expect
   *         to be committed are detected to be missing. This is a fatal error.
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record) throws UnrecognizedMessageException;


  /**
   * Send a record and invoke the given callback when the record has been acknowledged by the server
   *
   * @throws UnrecognizedMessageException if the broker detects data loss: ie. previous messages which we expect
   *         to be committed are detected to be missing. This is a fatal error.
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) throws UnrecognizedMessageException;
}

...

 

Code Block
languagejava
titleKafkaTransactionsExample.java
linenumberstrue
public class KafkaTransactionsExample {
 
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);


    // Note that the ‘transaction.app.id’ configuration _must_ be specified in the 
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

    // We need to initialize transactions once per producer instance. To use transactions, 
    // it is assumed that the application id is specified in the config with the key 
    // transactions.app.id.
    // 
    // This method will recover or abort transactions initiated by previous instances of a 
    // producer with the same app id. Any other transactional messages will report an error 
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will 
    // require a new producer  instance. See the documentation for TransactionMetadata for a 
    // list of error codes.
    producer.initTransactions();
    
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed 
        // records as well
        // as an records produced as a result of processing the input records. 
        //
        // We need to check the response to make sure that this producer is able to initiate 
        // a new transaction.
        producer.beginTransaction();
        
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord); 
        }
        
        // To ensure that the consumed and produced messages are batched, we need to commit 
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
        
        sendOffsetsResult = producer.sendOffsetssendOffsetsToTransaction(getUncommittedOffsets());
        
     
        // Now that we have consumed, processed, and produced a batch of messages, let's 
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransaction();
      }
    } 
  }
}

New Configurations

Broker configs

 

transaction.app.id.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer AppID without receiving any transaction status updates from it.

max.transaction.timeout.ms

The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return an error in BeginTxnRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Producer configs

 

transaction.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

This config value will be sent to the transaction coordinator along with the BeginTxnRequest.

transaction.app.id

A unique and persistent way to identify a producer. This is used to ensure idempotency and to enable transaction recovery or rollback across producer sessions. This is optional: you will lose cross-session guarantees if this is blank.

Consumer configs

 

fetch.mode

Here are the possible values (default is all):

all: consume both committed and uncommitted messages in offset ordering.

committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction.

Proposed Changes

Summary of Guarantees

Idempotent Producer Guarantees

To implement idempotent producer semantics, we introduce the concepts of a producer id, henceforth called the PID, and sequence numbers for Kafka messages. Every new producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users and is never exposed by clients.


For a given PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. For every acknowledged message the producer will increment the sequence number. Similarly, the broker will increment the sequence number associated with the PID/TopicPartition pair for every message it commits for that TopicPartition. The broker will reject a message from a producer unless its sequence number is exactly one greater than the last committed message from that PID/TopicPartition pair.

This ensures that, even though a producer must retry requests upon failures, every message will be persisted in the log exactly once. Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session.

Transactional Guarantees

At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit.


Further, since consumer progress is recorded as a write to the offsets topic, the above capability is leveraged to enable applications to batch consumed and produced messages into a single atomic unit, ie. a set of messages may be considered consumed only if the entire ‘consume-transform-produce’ executed in its entirety.

Additionally, stateful applications will also be able to ensure continuity across multiple sessions of the application. In other words, Kafka can guarantee idempotent production and transaction recovery across application bounces.

To achieve this, we require that the application provides a unique id which is stable across all sessions of the application. For the rest of this document, we refer to such an id as the AppId. While there may be a 1-1 mapping between an AppId and the internal PID, the main difference is the the AppId is provided by users, and is what enables idempotent guarantees across producers sessions described below.

When provided with such an AppId, Kafka will guarantee:
  1. Exactly one active producer with a given AppId. This is achieved by fencing off old generations when a new instance with the same AppId comes online.

  2. Transaction recovery across application sessions. If an application instance dies, the next instance can be guaranteed that any unfinished transactions have been completed (whether aborted or committed), leaving the new instance in a clean state prior to resuming work.

Key Concepts

To implement transactions, ie. ensuring that a group of messages are produced and consumed atomically, we introduce several new concepts:

  1. We introduce a new entity called a Transaction Coordinator. Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning PIDs and managing transactions is done by the transaction coordinator.
  2. We introduce a new internal kafka topic called the Transaction Log. Similar to the Consumer Offsets topic, the transaction log is a persistent and replicated record of every transaction. The transaction log is the state store for the transaction coordinator, with the snapshot of the latest version of the log encapsulating the current state of each active transaction.
  3. We introduce the notion of Control Messages. These are special messages written to user topics, processed by clients, but never exposed to users. They are used, for instance, to let brokers indicate to consumers if the previously fetched messages have been committed atomically or not. Control messages have been previously proposed here.
  4. We introduce a notion of AppId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same AppId will be able to resume (or abort) any transactions instantiated by the previous instance.
  5. We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given AppId, and hence enables us to maintain transaction guarantees in the event of failures.

...

The producer has a new KafkaProducer.sendOffsets sendOffsetsToTransaction API method, which enables the batching of consumed and produced messages. This method takes a Map<TopicPartitions, OffsetAndMetadata> and a groupId argument.

The sendOffsets sendOffsetsToTransaction method sends an AddOffsetCommitsToTxnRequests with the groupId to the transaction coordinator, from which it can deduce the TopicPartition for this consumer group in the internal __consumer-offsets topic. The transaction coordinator logs the addition of this topic partition to the transaction log in step 4.3a.

...

Once the data has been written, the user must call the new commitTxn commitTransaction or abortTxn methods abortTransaction methods of the KafkaProducer. These methods will begin the process of committing or aborting the transaction respectively.

...

When a producer is finished with a transaction, the newly introduced KafkaProducer.endTxn endTransaction or KafkaProducer.abortTxn must abortTransaction must be called. The former makes the data produced in 4 available to downstream consumers. The latter effectively erases the produced data from the log: it will never be accessible to the user, ie. downstream consumers will read and discard the aborted messages.

...