Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
languagejava
titleKafkaTransactionsExample.java
linenumberstrue
public class KafkaTransactionsExample {
 
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);


    // Note that the ‘transactional.id’ configuration _must_ be specified in the 
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

    // We need to initialize transactions once per producer instance. To use transactions, 
    // it is assumed that the application id is specified in the config with the key 
    // transactional.id.
    // 
    // This method will recover or abort transactions initiated by previous instances of a 
    // producer with the same app id. Any other transactional messages will report an error 
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will 
    // require a new producer  instance. See the documentation for TransactionMetadata for a 
    // list of error codes.
    producer.initTransactions();
    
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed 
        // records as well
        // as an records produced as a result of processing the input records. 
        //
        // We need to check the response to make sure that this producer is able to initiate 
        // a new transaction.
        producer.beginTransaction();
        
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord); 
        }
        
        // To ensure that the consumed and produced messages are batched, we need to commit 
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
        
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        
     
        // Now that we have consumed, processed, and produced a batch of messages, let's 
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransaction();
      }
    } 
  }
}

New Configurations

Broker configs

 

transactional.id.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer TransactionalId without receiving any transaction status updates from it.

Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids.

max.transaction.timeout.ms

The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return a InvalidTransactionTimeout error in BeginTxnRequestInitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.

transaction.state.log.replication.factor

The number of replicas for the transaction state topic.

Default: 3

transaction.state.log.num.partitions

The number of partitions for the transaction state topic.

Default: 50

transaction.state.log.segment.bytes

The segment size for the transaction state topic.

Default: 104857600 bytes.

Producer configs

 

enable.idempotence
Whether or not idempotence is enabled (false by default). If disabled, the producer will not set the PID field in produce requests and the current producer delivery semantics will be in effect. Note that idempotence must be enabled in order to use transactions.

transaction.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

This config value will be sent to the transaction coordinator along with the BeginTxnRequestInitPidRequest. If this value is larger than the max.transaction.timeout.ms setting in the broker, the BeginTxnRequest request will fail with a `InvalidTransactionTimeout` error.

Default is 60000. This makes a transaction to not block downstream consumption more than a minute, which is generally allowable in real-time apps.

transactional.id

The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery.

Note that enable.idempotence must be enabled if a TransactionalId is configured.

The default is empty, which means transactions cannot be used.

Consumer configs

 

isolation.level

Here are the possible values (default is read_uncommitted):

read_uncommitted: consume both committed and uncommitted messages in offset ordering.

read_committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction.

Proposed Changes

Summary of Guarantees

Idempotent Producer Guarantees

To implement idempotent producer semantics, we introduce the concepts of a producer id, henceforth called the PID, and sequence numbers for Kafka messages. Every new producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users and is never exposed by clients.


For a given PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. The sequence number will be incremented by the producer on every message sent to the broker. The broker maintains in memory the sequence numbers it receives for each topic partition from every PID. The broker will reject a produce request if its sequence number is not exactly one greater than the last committed message from that PID/TopicPartition pair. Messages with a lower sequence number result in a duplicate error, which can be ignored by the producer. Messages with a higher number result in an out-of-sequence error, which indicates that some messages have been lost, and is fatal.

This ensures that, even though a producer must retry requests upon failures, every message will be persisted in the log exactly once. Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session.

Transactional Guarantees

At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit. 


Further, since consumer progress is recorded as a write to the offsets topic, the above capability is leveraged to enable applications to batch consumed and produced messages into a single atomic unit, ie. a set of messages may be considered consumed only if the entire ‘consume-transform-produce’ executed in its entirety.

Additionally, stateful applications will also be able to ensure continuity across multiple sessions of the application. In other words, Kafka can guarantee idempotent production and transaction recovery across application bounces.

To achieve this, we require that the application provides a unique id which is stable across all sessions of the application. For the rest of this document, we refer to such an id as the TransactionalId. While there may be a 1-1 mapping between an TransactionalId and the internal PID, the main difference is the the TransactionalId is provided by users, and is what enables idempotent guarantees across producers sessions described below.

When provided with such an TransactionalId, Kafka will guarantee:
  1. Exactly one active producer with a given TransactionalId. This is achieved by fencing off old generations when a new instance with the same TransactionalId comes online.

  2. Transaction recovery across application sessions. If an application instance dies, the next instance can be guaranteed that any unfinished transactions have been completed (whether aborted or committed), leaving the new instance in a clean state prior to resuming work.

Note that the transactional guarantees mentioned here are from the point of view of the producer. On the consumer side, the guarantees are a bit weaker. In particular, we cannot guarantee that all the messages of a committed transaction will be consumed all together. This is for several reasons:

  1. For compacted topics, some messages of a transaction maybe overwritten by newer versions.
  2. Transactions may straddle log segments. Hence when old segments are deleted, we may lose some messages in the first part of a transaction.
  3. Consumers may seek to arbitrary points within a transaction, hence missing some of the initial messages.
  4. Consumer may not consume from all the partitions which participated in a transaction. Hence they will never be able to read all the messages that comprised the transaction.

Key Concepts

To implement transactions, ie. ensuring that a group of messages are produced and consumed atomically, we introduce several new concepts:

  1. We introduce a new entity called a Transaction Coordinator. Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning PIDs and managing transactions is done by the transaction coordinator.
  2. We introduce a new internal kafka topic called the Transaction Log. Similar to the Consumer Offsets topic, the transaction log is a persistent and replicated record of every transaction. The transaction log is the state store for the transaction coordinator, with the snapshot of the latest version of the log encapsulating the current state of each active transaction.
  3. We introduce the notion of Control Messages. These are special messages written to user topics, processed by clients, but never exposed to users. They are used, for instance, to let brokers indicate to consumers if the previously fetched messages have been committed atomically or not. Control messages have been previously proposed here.
  4. We introduce a notion of TransactionalId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same TransactionalId will be able to resume (or abort) any transactions instantiated by the previous instance.
  5. We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given TransactionalId, and hence enables us to maintain transaction guarantees in the event of failures.

...

If no TransactionalId is specified in the configuration, a fresh PID is assigned, and the producer only enjoys idempotent semantics and transactional semantics within a single session.

3. Starting a Transaction – The

...

beginTransaction() API

The new KafkaProducer will have a KafkaProducer.beginTxnbeginTransaction() method which has to be called to signal the start of a new transaction. Within this method, the producer will send a BeginTxnRequest to the transaction coordinator, which will record the start of the transaction in the transaction log as denoted in step 3aThe producer records local state indicating that the transaction has begun, but the transaction won’t begin from the coordinator’s perspective until the first record is sent.

4. The consume-transform-produce loop

...

The producer sends this request to the transaction coordinator the first time a new TopicPartition is written to as part of a transaction. The addition of this TopicPartition to the transaction is logged by the coordinator in step 4.1a. We need this information so that we can write the commit or abort markers to each TopicPartition (see section 5.2 for details)If this is the first partition added to the transaction, the coordinator will also start the transaction timer.

 

4.2 ProduceRequest

The producer writes a bunch of messages to the user’s TopicPartitions through one or more ProduceRequests (fired from the send method of the producer). These requests include the PID , epoch, and sequence number as denoted in 4.2a.

...

The transaction coordinator handles each of the following requests: InitPidBeginTxnAddPartitionsToTxnAddOffsetsToTxn, and EndTxn. Each request to the transaction coordinator includes the producer’s TransactionalId and can be used for authorization. Each of these requests mutates the transaction state of the producer, so they all require Write access to the corresponding ProducerTransactionalId resource.  Additionally, the AddPartitionsToTxn API requires Write access to the topics corresponding to the included partitions, and the AddOffsetsToTxn API requires Read access to the group included in the request.

...

Code Block
languagetext
titleInitPidRequest
InitPidRequest => TransactionalId TransactionTimeoutMs
 TransactionalId => String
 TransactionTimeoutMs => int32
Code Block
languagetext
titleInitPidResponse
InitPIDResponse => Error PID Epoch
 Error => Int16
 PID => Int64
 Epoch => Int16

...

  • Ok

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

BeginTxnRequest/Response

Sent by producer to its transaction coordinator to to begin a new transaction. Request handling details can be found here.

Code Block
languagetext
titleBeginTxnRequest
BeginTxnRequest => TransactionalId PID Epoch TransactionTimeoutMs
 TransactionalId => string
 PID => int64
 Epoch => int32
 TransactionTimeoutMs => int32
Code Block
languagetext
titleBeginTxnResponse
BeginTxnResponse => ErrorCode
 ErrorCode => int16

Error code:

  • Ok

  • ProducerFenced

  • InvalidPidMapping

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

  • InvalidTxnRequest

  • InvalidTransactionTimeout

...