You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

[This KIP proposal is a joint work between Jason GustafsonFlavio Junqueira,  Apurva Mehta, Sriram, and guozhang Wang]

 

Motivation

This document outlines a proposal for strengthening the message delivery semantics of Kafka. This builds on significant work which has been done previously, specifically, here and here.


Kafka currently provides at least once semantics, viz. When tuned for reliability, users are guaranteed that every message write will be persisted at least once, without data loss. Duplicates may occur in the stream due to producer retries. For instance, the broker may crash between committing a message and sending an acknowledgment to the producer, causing the producer to retry and thus resulting in a duplicate message in the stream.

Users of messaging systems greatly benefit from the more stringent idempotent producer semantics, viz. Every message write will be persisted exactly once, without duplicates and without data loss -- even in the event of client retries or broker failures. These stronger semantics not only make writing applications easier, they expand the space of applications which can use a given messaging system.

However, idempotent producers don’t provide guarantees for writes across multiple TopicPartitions. For this, one needs stronger transactional guarantees, ie. the ability to write to several TopicPartitions atomically. By atomically, we mean the ability to commit a set of messages across TopicPartitions as a unit: either all messages are committed, or none of them are.

Stream processing applications, which are a pipelines of ‘consume-transform-produce’ tasks, absolutely require transactional guarantees when duplicate processing of the stream is unacceptable. As such, adding transactional guarantees to Kafka --a streaming platform-- makes it much more useful not just for stream processing, but a variety of other applications.

In this document we present a proposal for bringing transactions to Kafka. We will only focus on the user facing changes: the client API changes, and the new configurations we will introduce, and the summary of guarantees. We also outline the basic data flow, which summarizes all the new RPCs we will introduce with transactions. The details of the design is presented in a separate document.

Public Interfaces

Producer API changes

The producer will get five new methods (initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction), with the send method updated to throw a new exception. This is detailed below:

KafkaProducer.java
public interface Producer<K,V> extends Closeable {
  
  /**
   * Needs to be called before any of the other transaction methods. Assumes that 
   * the transaction.app.id is specified in the producer configuration.
   *
   * This method does the following:
   *   1. Ensures any transactions initiated by previous instances of the producer
   *      are committed or rolled back.
   *   2. Gets the internal producer id and epoch, used in all future transactional
   *      messages issued by the producer.
   *
   * @throws IllegalStateException if the appId for the producer is not set 
   *         in the configuration.
   */
  void initTransactions() throws IllegalStateException;
  
  /**
   * Should be called before the start of each new transaction. 
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void beginTransaction() throws ProducerFencedException;
  
  /** 
   * Sends a list of consumed offsets to the consumer group coordinator, and also marks 
   * those offsets as part of the current transaction. These offsets will be considered 
   * consumed only if the transaction is committed successfully.
   *
   * This method should be used when you need to batch consumed and produced messages 
   * together, typically in a consume-transform-produce pattern.
   * 
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, 
                                String consumerGroupId) throws ProducerFencedException;
  
  /**
   * Commits the ongoing transaction. 
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 
   */
  void commitTransaction() throws ProducerFencedException;
  
  /**
   * Aborts the ongoing transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transaction.app.id is active. 


   */
  void abortTransaction() throws ProducerFencedException;


  /**
   * Send the given record asynchronously and return a future which will eventually contain the response information.
   *
   * @param record The record to send
   * @return A future which will eventually contain the response information
   *
   * @throws UnrecognizedMessageException if the broker detects data loss: ie. previous messages which we expect
   *         to be committed are detected to be missing. This is a fatal error.
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record) throws UnrecognizedMessageException;


  /**
   * Send a record and invoke the given callback when the record has been acknowledged by the server
   *
   * @throws UnrecognizedMessageException if the broker detects data loss: ie. previous messages which we expect
   *         to be committed are detected to be missing. This is a fatal error.
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) throws UnrecognizedMessageException;
}

An Example Application

Here is an simple application which demonstrates the use of the APIs introduced above.

 

KafkaTransactionsExample.java
public class KafkaTransactionsExample {
 
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);


    // Note that the ‘transaction.app.id’ configuration _must_ be specified in the 
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

    // We need to initialize transactions once per producer instance. To use transactions, 
    // it is assumed that the application id is specified in the config with the key 
    // transactions.app.id.
    // 
    // This method will recover or abort transactions initiated by previous instances of a 
    // producer with the same app id. Any other transactional messages will report an error 
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will 
    // require a new producer  instance. See the documentation for TransactionMetadata for a 
    // list of error codes.
    producer.initTransactions();
    
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed 
        // records as well
        // as an records produced as a result of processing the input records. 
        //
        // We need to check the response to make sure that this producer is able to initiate 
        // a new transaction.
        producer.beginTransaction();
        
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord); 
        }
        
        // To ensure that the consumed and produced messages are batched, we need to commit 
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
        
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        
     
        // Now that we have consumed, processed, and produced a batch of messages, let's 
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransaction();
      }
    } 
  }
}

New Configurations

Broker configs

 

transaction.app.id.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer AppID without receiving any transaction status updates from it.

max.transaction.timeout.ms

The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return an error in BeginTxnRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Producer configs

 

transaction.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

This config value will be sent to the transaction coordinator along with the BeginTxnRequest.

transaction.app.id

A unique and persistent way to identify a producer. This is used to ensure idempotency and to enable transaction recovery or rollback across producer sessions. This is optional: you will lose cross-session guarantees if this is blank.

Consumer configs

 

isolation.level

Here are the possible values (default is all):

read.uncommitted: consume both committed and uncommitted messages in offset ordering.

read.committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction.

Proposed Changes

Summary of Guarantees

Idempotent Producer Guarantees

To implement idempotent producer semantics, we introduce the concepts of a producer id, henceforth called the PID, and sequence numbers for Kafka messages. Every new producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users and is never exposed by clients.


For a given PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. The sequence number will be incremented for every message sent by the producer. Similarly, the broker will increment the sequence number associated with the PID/TopicPartition pair for every message it commits for that TopicPartition. The broker will reject a message from a producer unless its sequence number is exactly one greater than the last committed message from that PID/TopicPartition pair.

This ensures that, even though a producer must retry requests upon failures, every message will be persisted in the log exactly once. Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session.

Transactional Guarantees

At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit.


Further, since consumer progress is recorded as a write to the offsets topic, the above capability is leveraged to enable applications to batch consumed and produced messages into a single atomic unit, ie. a set of messages may be considered consumed only if the entire ‘consume-transform-produce’ executed in its entirety.

Additionally, stateful applications will also be able to ensure continuity across multiple sessions of the application. In other words, Kafka can guarantee idempotent production and transaction recovery across application bounces.

To achieve this, we require that the application provides a unique id which is stable across all sessions of the application. For the rest of this document, we refer to such an id as the AppId. While there may be a 1-1 mapping between an AppId and the internal PID, the main difference is the the AppId is provided by users, and is what enables idempotent guarantees across producers sessions described below.

When provided with such an AppId, Kafka will guarantee:
  1. Exactly one active producer with a given AppId. This is achieved by fencing off old generations when a new instance with the same AppId comes online.

  2. Transaction recovery across application sessions. If an application instance dies, the next instance can be guaranteed that any unfinished transactions have been completed (whether aborted or committed), leaving the new instance in a clean state prior to resuming work.

Key Concepts

To implement transactions, ie. ensuring that a group of messages are produced and consumed atomically, we introduce several new concepts:

  1. We introduce a new entity called a Transaction Coordinator. Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning PIDs and managing transactions is done by the transaction coordinator.
  2. We introduce a new internal kafka topic called the Transaction Log. Similar to the Consumer Offsets topic, the transaction log is a persistent and replicated record of every transaction. The transaction log is the state store for the transaction coordinator, with the snapshot of the latest version of the log encapsulating the current state of each active transaction.
  3. We introduce the notion of Control Messages. These are special messages written to user topics, processed by clients, but never exposed to users. They are used, for instance, to let brokers indicate to consumers if the previously fetched messages have been committed atomically or not. Control messages have been previously proposed here.
  4. We introduce a notion of AppId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same AppId will be able to resume (or abort) any transactions instantiated by the previous instance.
  5. We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given AppId, and hence enables us to maintain transaction guarantees in the event of failures.

In additional to the new concepts above, we also introduce new request types, new versions of existing requests, and new versions of the core message format in order to support transactions. The details of all of these will be deferred to other documents.

Data Flow

 

In the diagram above, the sharp edged boxes represent distinct machines. The rounded boxes at the bottom represent Kafka TopicPartitions, and the diagonally rounded boxes represent logical entities which run inside brokers. 

Each arrow represents either an RPC, or a write to a Kafka topic. These operations occur in the sequence indicated by the numbers next to each arrow. The sections below are numbered to match the operations in the diagram above, and describe the operation in question.

1. Finding a transaction coordinator -- the GroupCoordinatorRequest

Since the transaction coordinator is at the center assigning PIDs and managing transactions,the first thing a producer has to do is issue a GroupCoordinatorRequest to any broker to discover the location of its coordinator.

2. Getting a producer Id -- the InitPIDRequest

After discovering the location of its coordinator, the next step is to retrieve the producer’s PID. This is achieved by issuing a InitPIDRequest to the transaction coordinator

2.1 When an AppId is specified

If the transaction.app.id configuration is set, this AppId passed along with the InitPIDRequest, and the mapping to the corresponding PID is logged in the transaction log in step 2a. This enables us to return the same PID for the AppId to future instances of the producer, and hence enables recovering or aborting previously incomplete transactions.

In addition to returning the PID, the InitPIDRequest performs the following tasks:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

The handling of the InitPIDRequest is synchronous. Once it returns, the producer can send data and start new transactions.

2.2 When an AppId is not specified

If no AppId is specified in the configuration, a fresh PID is assigned, and the producer only enjoys idempotent semantics and transactional semantics within a single session.

3. Starting a Transaction – The BeginTxnRequest

The new KafkaProducer will have a KafkaProducer.beginTxn method which has to be called to signal the start of a new transaction. Within this method, the producer will send a BeginTxnRequest to the transaction coordinator, which will record the start of the transaction in the transaction log as denoted in step 3a.

4. The consume-transform-produce loop

In this stage, the producer begins to consume-transform-produce the messages that comprise the transaction. This is a long phase and is potentially comprised of multiple requests.

4.1 AddPartitionsToTxnRequest

The producer sends this request to the transaction coordinator the first time a new TopicPartition is written to as part of a transaction. The addition of this TopicPartition to the transaction is logged by the coordinator in step 4.1a. We need this information so that we can write the commit or abort markers to each TopicPartition (see section 5.2 for details).

4.2 ProduceRequest

The producer writes a bunch of messages to the user’s TopicPartitions through one or more ProduceRequests (fired from the send method of the producer). These requests include the PID , epoch, and sequence number as denoted in 4.2a.

 4.3 AddOffsetCommitsToTxnRequest

The producer has a new KafkaProducer.sendOffsetsToTransaction API method, which enables the batching of consumed and produced messages. This method takes a Map<TopicPartitions, OffsetAndMetadata> and a groupId argument.

The sendOffsetsToTransaction method sends an AddOffsetCommitsToTxnRequests with the groupId to the transaction coordinator, from which it can deduce the TopicPartition for this consumer group in the internal __consumer-offsets topic. The transaction coordinator logs the addition of this topic partition to the transaction log in step 4.3a.

4.4 TxnOffsetCommitRequest

Also as part of sendOffsets, the producer will send a TxnOffsetCommitRequest to the consumer coordinator to persist the offsets in the __consumer-offsets topic (step 4.4a). The consumer coordinator validates that the producer is allowed to make this request (and is not a zombie) by using the PID and producer epoch which are sent as part of this request.

The consumed offsets are not visible externally until the transaction is committed, the process for which we will discuss now.

5. Committing or Aborting a Transaction

Once the data has been written, the user must call the new commitTransaction or abortTransaction methods of the KafkaProducer. These methods will begin the process of committing or aborting the transaction respectively.

5.1 EndTxnRequest

When a producer is finished with a transaction, the newly introduced KafkaProducer.endTransaction or KafkaProducer.abortTransaction must be called. The former makes the data produced in 4 available to downstream consumers. The latter effectively erases the produced data from the log: it will never be accessible to the user, ie. downstream consumers will read and discard the aborted messages.

Regardless of which producer method is called, the producer issues an EndTxnRequest to the transaction coordinator, with additional data indicating whether the transaction is to be committed or aborted. Upon receiving this request, the coordinator:

  1.  Writes a PREPARE_COMMIT or PREPARE_ABORT message to the transaction log. (step 5.1a)
  2. Begins the process of writing the command messages known as COMMIT (or ABORT) markers to the user logs through the UpdateTxnRequest. (see section 5.2 below).
  3.  Finally writes the COMMITTED (or ABORTED) message to transaction log. (see 5.3 below).

5.2 UpdateTxnRequest

This request is issued by the transaction coordinator to the the leader of each TopicPartition which is part of the transaction. Upon receiving this request, each broker will write a COMMIT(PID) or ABORT(PID) control message to the log. (step 5.2a)

This message indicates to consumers that messages with the given PID must be delivered to the user all together, or not at all. As such, the consumer will buffer messages which have a PID until it reads a corresponding COMMIT or ABORT message, at which point it will deliver or drop the messages respectively.

Note that, if the __consumer-offsets topic is one of the TopicPartitions in the transaction, the commit (or abort) marker is also written to the log, and the consumer coordinator is notified that it needs to materialize these offsets in the case of a commit or ignore them in the case of an abort (step 5.2a on the left).

5.3 Writing the final Commit or Abort Message

After all the commit or abort markers are written the data logs, the transaction coordinator writes the final COMMITTED or ABORTED message to the transaction log, indicating that the transaction is complete (step 5.3 in the diagram). At this point, most of the messages pertaining to the transaction in the transaction log can be removed.

We only need to retain the PID of the completed transaction along with a timestamp, so we can eventually remove the AppId->PID mapping for the producer. See the Expiring PIDs section below.

Compatibility, Deprecation, and Migration Plan

We follow the same approach used in KIP-32. To upgrade from a previous message format version, users should:

  1. Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.

  2. Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.

  3. Upgrade all or most clients.

  4. Restart the brokers, with the message format version set to the latest.

The reason for step 3 is to avoid the performance cost for down-converting messages to an older format, which effectively loses the “zero-copy” optimization. Ideally, all consumers are upgraded before the producers even begin writing to the new message format.

Test Plan

The new features will be tested through unit, integration, and system tests.

The integration tests will focus on ensuring that the basic guarantees (outlined in the Summary of Guarantees section) are satisfied across components.

The system tests will focus on ensuring that the guarantees are satisfied even with failing components, ie. that the system works even when consumers, producers, brokers are killed in various states.

We will also add to existing compatibility system tests to ensure that old clients can still talk to the new brokers with the new message format.

Rejected Alternatives

As mentioned earlier, we have a separate design document which explores the design space --including rejected alternatives-- as well as all the implementation details. The latter also includes the specifics of message format changes, new RPCs, error handling, etc. 

The design document is available here.

  • No labels