You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »


Status

Current state: Draft

Discussion thread: TBD

JIRA:

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka transaction uses a two-phase model, where the transaction state is stored by transaction coordinator. As of current, the way to make it work is through AddPartitionToTxn call each time we see a new output partition. In reality though, the set of output partition for a producer doesn't change very often, or even deterministic over time, so it is a waste of network round trips to update that set unnecessarily.

Similarly for partition offsets, it is very unlikely to have the transactional producer register more than one consumer group, which means the consumer offset topic partition shall be repeatedly updated. It is favorable to save this round trip as well.

The other issue is the infinite retry mechanism for AddPartitionToTxn when being served as the starting request for a new transaction. Right now the EndTxn call returns immediately when the txn commit message is materialized in transaction log. However broker needs to propagate this information to all the affected parties to complete the transaction, which means there is a black-out period that no more transaction could be started.

Proposed Changes

We propose to make AddPartitionToTxn a blocking call when there is other pending transaction going on. We would also like to retain a transaction session so that we don't need to call AddPartitionsToTxn all the time when a new partition is emerged. On the other hand, we decide to support a pre-registration API for applications that already know which partitions to write to.

Public Interfaces

We shall bump the EndTxn to include the actual written partitions so that the session gets updated. 

EndTxnRequest.json
{
  "apiKey": 26,
  "type": "request",
  "name": "EndTxnRequest",
  // Version 1 is the same as version 0.
  // 
  // Version 2 adds the support for new PRODUCER_FENCED error code.
  //
  // Version 3 
  "validVersions": "0-3",
  "flexibleVersions": "none",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
      "about": "The ID of the transaction to end." },
    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
      "about": "The producer ID." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
      "about": "The current epoch associated with the producer." },
    { "name": "Committed", "type": "bool", "versions": "0+",
      "about": "True if the transaction was committed, false if it was aborted." },
	// -------------- START NEW FIELD -------------- 
	{ "name": "Topics", "type": "[]EndTxnTopic", "versions": "3+",
      "about": "The partitions ending up writing in current transaction.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]int32", "versions": "3+",
        "about": "The partition indexes to add to the transaction" }
    ]}
	// -------------- END NEW FIELD -------------- 
  ]
}


We would also bump beginTransaction API on the KafkaProducer to give user the access for including the pre-registered partitions. The only exception is that during the transaction session the offset topic is still able to be added.

KafkaProducer.java
    /**
     * Should be called before the start of each new transaction. Note that prior to the first invocation
     * of this method, you must invoke {@link #initTransactions()} exactly one time.
     *
     * @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()}
     *         has not yet been invoked
     * @throws ProducerFencedException if another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized. See the exception for more details
     * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
     */
    public void beginTransaction(Set<TopicPartitions> outputPartitions) throws ProducerFencedException;

 


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels