Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under DiscussionVoting"

Discussion thread: here

Voting thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Officially support functionality that can be used (and is used by Flink) anyway via reflection.
  2. Relax / remove Remove the timeout for transactions participating in 2PC.

...

If the transaction is a participant of the 2PC protocol, we don’t limit the transactional timeout by the transaction.max.timeout.ms any more, so if the client passes max integer value, the broker can accept itthe transaction is never aborted automatically.

The admin client would support a new method to abort a transaction with a given transactional id.  The method would just execute InitProducerId.

...

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ProducerId Epoch Enable2Pc KeepPreparedTxn
 TransactionalId => NULLABLE_STRING
 TransactionTimeoutMs => INT32
 ProducerId => INT64
 Epoch => INT16
 Enable2Pc => BOOL  // NEW
 KeepPreparedTxn => BOOL  // NEW

InitProducerIdResponse => Error ProducerId Epoch OngoingTxnProducerId OngoingTxnEpoch
 Error => INT16
 ProducerId => INT64
 Epoch => INT16
 OngoingTxnProducerId => INT64  // NEW
 OngoingTxnEpoch => INT16  // NEW 

We’ll also bump ListTransactionsRequest to support listing only transactions that are running longer than a certain amount of time:

...

Note that the OngoingTxnProducerId and OngoingTxnEpoch can be set to -1 if there is no ongoing transaction.  In this case calling .completeTransaction would be a no-op.

Note that KeepPreparedTxn could be set to true even if Enable2Pc is false

Note that TransactionTimeoutMs value is ignored if Enable2Pc is specified.

Persisted Data Format Changes

We need to store an additional (producerId, epoch) pair in TransactionalLogValue record, so we add 2 new tagged fields without bumping the version, so that it is Note that this KIP is going to be implemented on KIP-890, so the TransactionalLogValue will already has the NextProducerId tagged field to store a additional producer id, we just need to add NextEpoch tagged field to store an additional epoch.  The data format is changed in a way that makes it possible to downgrade to a version that supports KIP-915:

Code Block
languagejs
{
  "type": "data",
  "name": "TransactionLogValue",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "TransactionProducerId", "type": "int64", "versions": "0+",  // Renamed
      "about": "Producer id of the current / last transaction"},
    { "name": "TransactionProducerEpoch", "type": "int16", "versions": "0+",  // Renamed
      "about": "Epoch associated with the producer id of the current / last transaction"},// Version 1 is the first flexible version.
  // KIP-915: bumping the version will no longer make this record backward compatible.
  // We suggest to add/remove only tagged fields to maintain backward compatibility.
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "TransactionTimeoutMsProducerId", "type": "int32int64", "versions": "0+",
      "about": "TransactionProducer timeoutid in millisecondsuse by the transactional id"},
    { "name": "TransactionStatusProducerEpoch", "type": "int8int16", "versions": "0+",
      "about": "TransactionStateEpoch associated with the transactionproducer is inid"},
    { "name": "TransactionPartitionsPrevProducerId", "type": "[]PartitionsSchema"int64", "default": -1, "versionstaggedVersions": "01+", "nullableVersionstag": "0+",
      "about": "Set of partitions involved inProducer id in use by client when committing the transaction"}, "fields": [
      { "name": "TopicNextProducerId", "type": "int64", "stringdefault": -1, "versionstaggedVersions": "01+"}, "tag": 1,
      { "nameabout": "PartitionIds", "type": "[]int32", "versions": "0+"}]Producer id returned to the client in the epoch overflow case"},
    { "name": "TransactionLastUpdateTimestampMsNextProducerEpoch", "type": "int64int16", "versionsdefault": -1, "taggedVersions": "01+", "tag": 2,  // New
      "about": "TimeProducer theepoch transactionassociated waswith last updated"},
    { "the producer id returned to the client in the epoch overflow case"},
    { "name": "TransactionStartTimestampMsTransactionTimeoutMs", "type": "int64int32", "versions": "0+",
      "about": "TimeTransaction thetimeout transactionin was startedmilliseconds"},
    { "name": "ProducerIdTransactionStatus", "type": "int64int8", "tagversions": 0, "taggedVersions": "1"0+",  // New
      "about": "ProducerTransactionState idthe intransaction use by the transactional id." is in"},
    { "name": "ProducerEpochTransactionPartitions", "type": "int16[]PartitionsSchema", "tagversions": 1, "taggedVersions"0+", "nullableVersions": "10+",
      "about": "Set of partitions involved in the  // Newtransaction", "fields": [
      "about{ "name": "Topic", "type": "string", "versions": "Producer id in use by the transactional id." }
  ]
}

...

"0+"},
      { "name": "PartitionIds", "type": "[]int32", "versions": "0+"}]},
    { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+",
      "about": "Time the transaction was last updated"},
    { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
      "about": "Time the transaction was started"}
  ]
}

Note that for transactions with 2PC enabled the TransactionTimeoutMs would be set to -1.

Let's consider some examples of the state transitions and how the various producer ids and epochs are used.

Vanilla KIP-890 transaction case with epoch overflow:

  1. InitProducerId(false); TC STATE: Empty, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1, OngoingTxnProducerId=-1, OngoingTxnEpoch=-1
  2. AddPartitionsToTxn; REQUEST: ProducerId=42, ProducerEpoch=MAX-1; TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1
  3. Commit; REQUEST: ProducerId=42, ProducerEpoch=MAX-1; TC STATE: PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=-1, NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85, Epoch=0
  4. (Transition in TC into CompleteCommit); TC STATE: CompleteCommit, ProducerId=85, ProducerEpoch=0, PrevProducerId=42, NextProducerId=-1, NextProducerEpoch=-1

The extra producer id info is there so that if the commit operation times out (and thus the client doesn't get the new ProducerId and ProducerEpoch) and the client retries with the previous ProducerId and ProducerEpoch we can detect the retry and return success.  This logic is not new in this KIP, it's part of KIP-890.  Note that with vanilla KIP-890 transactions there are no cases when both NextProducerId and PrevProducerId are set – there is at most one of the those extra fields in a given state.

2PC transaction case with two epoch overflows:

  1. InitProducerId(false); TC STATE: Empty, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1, OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
  2. AddPartitionsToTxn; REQUEST: ProducerId=42, ProducerEpoch=MAX-1; TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1
  3. (Transaction is prepared on the client, then client crashed)
  4. InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=0; RESPONSE ProducerId=73, Epoch=0, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
  5. (crash the client)
  6. InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=1; RESPONSE ProducerId=73, Epoch=1, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
  7. (crash the client a few times to drive the NextProducerEpoch to MAX-1)
  8. InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
  9. Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX; TC STATE: PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=73, NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85, Epoch=0
  10. (Transition in TC into CompleteCommit); TC STATE: CompleteCommit, ProducerId=85, ProducerEpoch=0, PrevProducerId=73, NextProducerId=-1, NextProducerEpoch=-1

This example highlights the following interesting details:

  • InitProducerId(true) may be issued multiple times (e.g. client gets into a crash loop).  The ProducerId and ProducerEpoch of the ongoing transaction always stay the same, but the NextProducerEpoch is always incremented.  Eventually, NextProducerEpoch may overflow, in which case we can allocate a new NextProducerId.
  • When a commit request is sent, it uses the latest ProducerId and ProducerEpoch.  We send out markers using the original ongoing transaction's ProducerId and ProducerEpoch + 1, but the next transaction will use the latest ProducerId and ProducerEpoch + 1 (this is what the response is going to contain).  It may happen (like in this example) that the latest ProducerEpoch is already at MAX, in which case we'd need to allocate a new ProducerId.  In order to support retries we store the previous ProducerId in the PrevProducerId.  Thus in such situation the PrepareCommit state can have three distinct producer ids:
    • ProducerId – this is used to send our commit markers
    • NextProducerId – this is the producer id to use for the next transaction
    • PrevProducerId – this is the producer id to avoid self-fencing on retries (i.e. if the commit request times out and the client retries with previous producer id, we can return success and new producer id)

Metric Changes

A new metric will be added

kafka.server:type=transaction-coordinator-metrics,name=active-transaction-totalopen-time-max The max time a currently-open transaction has been open

...

transaction.two.phase.commit.enable The default would be ‘false’.  If set to ‘true’, then the broker is informed that the client is participating in two phase commit protocol and can set transaction timeout to values that exceed transaction.max.timeout.ms setting on the broker (if the timeout is not set explicitly on the client and the two phase commit is set to ‘true’ then the transaction never expires)transactions that this client starts never expire.

transaction.timeout.ms The semantics is not changed, but it can would be set to values that exceed transaction.maxan error to set transaction.timeout.ms if when two.phase.commit.enable is set to ‘true’'true’.

Broker Configuration Changes

...

If the value is 'true' then the corresponding field is set in the InitProducerIdRequest and the KafkaProducer object is set into a state which only allows calling calling  .commitTransaction or , .abortTransaction.If the transaction.two.phase.commit.enable setting is set to ‘false’ but keepPreparedTxn is set to ‘true’ then the call fails with the INVALID_TXN_STATE error., or .completeTransaction.

New method will be added to KafkaProducer:

...

This would flush all the pending messages and transition the producer into a mode where only .commitTransaction, .abortTransaction, or .completeTransaction could could be called (calling other methods,  e.g. .send , in that mode would result in IllegalStateException being thrown).  If the call is successful (all messages successfully got flushed to all partitions) the transaction is prepared.  If the 2PC is not enabled, we return the INVALID_TXN_STATE error.

...

The method would compare the currently prepared transaction state and the state passed in the argument and either commit or abort the transaction.  If the producer is not in prepared state (i.e. either neither prepareTransaction was called or nor initTransaction(true) was called) we return an INVALID_TXN_STATE error.

AdminClient API Changes

Class ListTransactionsOptions will support 2 new methods:

public ListTransactionsOptions runningLongerThanMs(long runningLongerThanMs) // set 

...

The Admin  interface will support a new method:

...

A new value will be added to the enum AclOperation: TWO_PHASE_COMMIT ((byte) 15 .  When InitProducerId comes with enable2Pc=true, it would have to have both WRITE and TWO_PHASE_COMMIT operation enabled on the transactional id resource.

Command Line Tool Changes

The kafka-transactions.sh --list command is going to support a new flag --runningLongerThanMs that would take the number of milliseconds.  If this flag is provided, only transactions that have been running longer than the specified number of milliseconds would be listed.

enabled on the transactional id resource.

Command Line Tool Changes

The kafka-transactions.sh tool is going to support a new command --forceTerminateTransaction.  It has one required argument --transactionalId that would take the transactional id for the transaction to be terminated.

...

The broker can be downgraded to versions that support KIP-915We write the TransactionProducerId and TransactionProducerEpoch that correspond to The ProducerId and ProducerEpoch fields contain the ongoing transaction in place of the ProducerId and ProducerEpoch fields, information so an old broker would be able to properly commit or abort the transaction.

...

Having a Boolean flag and then a timeout value seems redundant, we really need just either one of the other, so technically instead of adding an independent flag we could use a special timeout value to indicate that it’s a 2PC transaction.  This, however, would couple intent with specific implementation; an explicit Boolean seems to reflect the intent better.

...

Disallowing keepPreparedTxn=true without 2PC

Without 2PC the notion of “prepared” transaction is subverted by the transaction.max.timeout.ms so Kafka cannot promise to keep transaction in-doubt until a decision is reached.  So from a purity perspective using keepPreparedTxn=true doesn’t reflect the semantics.

...

Now if either of those commit the transaction, it would have a mix of messages from 2 instances.  With the proper epoch bump, instance1 would get fenced at step 3.

Allow Specifying Client Timeout Even When Enable2Pc=true

Technically, we could still let the client control transaction timeout that could exceed transaction.max.timeout.ms but it seems to be more confusing than useful.