Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
{
  "type": "data",
  "name": "TransactionLogValue",
  // Version 1 is the first flexible version.
  // KIP-915: bumping the version will no longer make this record backward compatible.
  // We suggest to add/remove only tagged fields to maintain backward compatibility.
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ProducerId", "type": "int64", "versions": "0+",
      "about": "Producer id in use by the transactional id"},
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
      "about": "Epoch associated with the producer id"},
    { "name": "PrevProducerId", "type": "int64", "default": -1, "taggedVersions": "1+", "tag": 0,
      "about": "Producer id in use by client when committing the transaction"},
    { "name": "NextProducerId", "type": "int64", "default": -1, "taggedVersions": "1+", "tag": 1,
      "about": "Producer id returned to the client in the epoch overflow case"},
    { "name": "NextProducerEpoch", "type": "int16", "default": -1, "taggedVersions": "1+", "tag": 2,  // New
      "about": "Producer epoch associated with the producer id returned to the client in the epoch overflow case"},
    { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
      "about": "Transaction timeout in milliseconds"},
    { "name": "TransactionStatus", "type": "int8", "versions": "0+",
      "about": "TransactionState the transaction is in"},
    { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0+", "nullableVersions": "0+",
      "about": "Set of partitions involved in the transaction", "fields": [
      { "name": "Topic", "type": "string", "versions": "0+"},
      { "name": "PartitionIds", "type": "[]int32", "versions": "0+"}]},
    { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+",
      "about": "Time the transaction was last updated"},
    { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
      "about": "Time the transaction was started"}
  ]
}

Note that for transactions with 2PC enabled the TransactionTimeoutMs would be set to -1.

Let's consider some examples of the state transitions and how the various producer ids and epochs are used.

...

  1. InitProducerId(false); TC STATE: Empty, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1, OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
  2. AddPartitionsToTxn; REQUEST: ProducerId=42, ProducerEpoch=MAX-1; TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1
  3. (Transaction is prepared on the client, then client crashed)
  4. InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=0; RESPONSE ProducerId=73, Epoch=0, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
  5. (crash the client)
  6. InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=1; RESPONSE ProducerId=73, Epoch=1, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
  7. (crash the client a few times to drive the NextProducerEpoch to MAX-1)
  8. InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
  9. Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX-1; TC STATE: PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=73, NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85, Epoch=0
  10. (Transition in TC into CompleteCommit); TC STATE: CompleteCommit, ProducerId=85, ProducerEpoch=0, PrevProducerId=73, NextProducerId=-1, NextProducerEpoch=-1

...

  • InitProducerId(true) may be issued multiple times (e.g. client gets into a crash loop).  The ProducerId and ProducerEpoch of the ongoing transaction always stay the same, but the NextProducerEpoch is always incremented.  Eventually, NextProducerEpoch may overflow, in which case we can allocate a new NextProducerId.
  • When a commit request is sent, it uses the latest ProducerId and ProducerEpoch.  We send out markers using the original ongoing transaction's ProducerId and ProducerEpoch + 1, but the next transaction will use the latest ProducerId and ProducerEpoch + 1 (this is what the response is going to contain).  It may happen (like in this example) that the latest ProducerEpoch is already at MAX, in which case we'd need to allocate a new ProducerId.  In order to support retries we store the previous ProducerId in the PrevProducerId.  Thus in such situation the PrepareCommit state can have three distinct producer ids:
    • ProducerId – this is used to send our commit markers
    • NextProducerId – this is the producer id to use for the next transaction
    • PrevProducerId – this is the producer id to avoid self-fencing on retries (i.e. if the commit request times out and the client retries with previous producer id, we can return success and new producer id)

...

If the value is 'true' then the corresponding field is set in the InitProducerIdRequest and the KafkaProducer object is set into a state which only allows calling calling  .commitTransaction, .abortTransaction, or .abortTransactioncompleteTransaction.

New method will be added to KafkaProducer:

...

This would flush all the pending messages and transition the producer into a mode where only .commitTransaction, .abortTransaction, or .completeTransaction could could be called (calling other methods,  e.g. .send , in that mode would result in IllegalStateException being thrown).  If the call is successful (all messages successfully got flushed to all partitions) the transaction is prepared.  If the 2PC is not enabled, we return the INVALID_TXN_STATE error.

...