Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Draft

Discussion thread: TBD

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka transaction uses a two-phase model, where the transaction state is stored by transaction coordinator. As of current, the way to make it work is through AddPartitionToTxn call each time we see a new output partition. In reality though, the set of output partitions for a producer doesn't change very often, or and even deterministic over time, so it is a waste of network round trips to update that set unnecessarily .

Similarly for partition offsets, it is very unlikely to have the transactional producer register more than one consumer group, which means the consumer offset topic partition shall be repeatedly updated. It is favorable to save this round trip as well.

based on witnessing of new output records.

The other issue is the infinite retry mechanism for AddPartitionToTxn when being served as the starting request for a new transaction. Right now the EndTxn call returns immediately when the txn commit message is materialized in transaction log. However broker needs to propagate this information to all the affected parties to complete the transaction, which means there is a black-out period that no more transaction could be started. During this time, all the new AddPartitionToTxn calls shall be rejected and asked to retry, which again creates many unnecessary network trips.

Proposed Changes

Block Begin Transaction

We propose to make AddPartitionToTxn a blocking call when there is other pending transaction going on. This could be done by holding the response callback as part of the transaction marker completion in the purgatory.  

Use Session Context to Record Sent Partitions

...

The goal is to reduce unnecessary bounces during the transaction marker completion.

Pre-register Output Partitions

Considering the latest EOS example below:

Code Block
linenumberstrue
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  // Recommend a smaller txn timeout, for example 10 seconds.
  producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10000);
  KafkaProducer producer = new KafkaProducer(producerConfig);

  producer.initTransactions();	
  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

    producer.commitTransaction();
  }

If the application developer could provide a 

Public Interfaces

As one could see, with the set of output records, developer could easily infer the output partitions. Even further for advanced users, the output partitions could be inferred as well.

Public Interfaces

We would bump the AddPartitionToTxn API version by one to enable the blocking path on the broker.

We would change beginTransaction API on the KafkaProducer to give user the access for including the pre-registered partitions.

Code Block
languagejava
titleKafkaProducer.java
    /**
     * Should be called before the start of each new transaction. Note that prior to the first invocation
     * of this method, you must invoke {@link #initTransactions()} exactly one time.
     *
     * @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()}
     *         has not yet been invoked
     * @throws ProducerFencedException if another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized. See the exception for more details
     * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
     */
    public void beginTransaction(Set<TopicPartition> outputPartitions) throws ProducerFencedException;


This API shall be flexible for user to decide whether to call it before the record processing to pipeline the effort, as:

Code Block
linenumberstrue
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

	// --------- Call before process ---------- //
	// The application could infer output partition beforehand
	Set<TopicPartition> outputPartitions = outputPartition(consumed);
	producer.beginTransaction(outputPartitions);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    for (ProducerRecord record : processed)
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

    producer.commitTransaction();

or user could do it after the processing for larger produce batch.

Code Block
linenumberstrue
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

	// --------- Call after process ---------- //
	Set<TopicPartition> outputPartitions = outputPartitions(processed); 
    producer.beginTransaction(outputPartitions);

    // Write the records and commit offsets under a single transaction
    for (ProducerRecord record : processed)
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

    producer.commitTransaction();

Compatibility, Deprecation, and Migration Plan

The new beginTransaction API and blocking behavior shall only be available for new producers, so there should not be a compatibility issue. To enable blocking behavior, the broker has to be on the latest version, while for partition pre-registration, the broker just need to be 0.11 to support first version of EOS semantics.

Rejected Alternatives

We have thought about using a context session on the broker side to remember the partitions being written to in the last transaction, and bump the EndTxn We shall bump the EndTxn to include the actual written partitions so that the session gets updated. 

Code Block
titleEndTxnRequest.json
{
  "apiKey": 26,
  "type": "request",
  "name": "EndTxnRequest",
  // Version 1 is the same as version 0.
  // 
  // Version 2 adds the support for new PRODUCER_FENCED error code.
  //
  // Version 3 
  "validVersions": "0-3",
  "flexibleVersions": "none",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
      "about": "The ID of the transaction to end." },
    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
      "about": "The producer ID." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
      "about": "The current epoch associated with the producer." },
    { "name": "Committed", "type": "bool", "versions": "0+",
      "about": "True if the transaction was committed, false if it was aborted." },
	// -------------- START NEW FIELD -------------- 
	{ "name": "Topics", "type": "[]EndTxnTopic", "versions": "3+",
      "about": "The partitions ending up writing in current transaction.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]int32", "versions": "3+",
        "about": "The partition indexes to add to the transaction" }
    ]}
	// -------------- END NEW FIELD -------------- 
  ]
}

We would also bump beginTransaction API on the KafkaProducer to give user the access for including the pre-registered partitions. The only exception is that during the transaction session the offset topic is still able to be added.

Code Block
languagejava
titleKafkaProducer.java
    /**
     * Should be called before the start of each new transaction. Note that prior to the first invocation
     * of this method, you must invoke {@link #initTransactions()} exactly one time.
     *
     * @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()}
     *         has not yet been invoked
     * @throws ProducerFencedException if another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized. See the exception for more details
     * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
     */
    public void beginTransaction(Set<TopicPartitions> outputPartitions) throws ProducerFencedException;

Compatibility, Deprecation, and Migration Plan

The new Producer API shall only be available for new producers.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way This solution has less flexibility and especially makes the reinitialization harder, for example a re-initialized producer doesn't know what previous session looks like, so there needs to be special logic to make sure it doesn't rely on any pre-defined state which could be false. And it is harder to understand, as well.