...
Code Block |
---|
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ProducerId Epoch Enable2Pc KeepPreparedTxn TransactionalId => NULLABLE_STRING TransactionTimeoutMs => INT32 ProducerId => INT64 Epoch => INT16 Enable2Pc => BOOL // NEW KeepPreparedTxn => BOOL // NEW InitProducerIdResponse => Error ProducerId Epoch OngoingTxnProducerId OngoingTxnEpoch Error => INT16 ProducerId => INT64 Epoch => INT16 OngoingTxnProducerId => INT64 // NEW OngoingTxnEpoch => INT16 // NEW |
We’ll also bump ListTransactionsRequest
to support listing only transactions that are running longer than a certain amount of time:
Code Block |
---|
ListTransactionsRequest => StatesFilter ProducerIdFilter RunningLongerThanMs
StatesFilter => []STRING
ProducerIdFilter => []STRING
RunningLongerThanMs => INT64 // NEW |
Persisted Data Format Changes
Note that the OngoingTxnProducerId
and OngoingTxnEpoch
can be set to -1 if there is no ongoing transaction. In this case calling .completeTransaction
would be a no-op.
Note that KeepPreparedTxn could be set to true
even if Enable2Pc is false
.
Persisted Data Format Changes
Note that this KIP is going to be implemented on KIP-890, so the TransactionalLogValue
will already has the NextProducerId
tagged field to store a additional producer id, we just need to add NextEpoch
tagged field to store an additional epoch. The data format is changed in a way that makes it We need to store an additional (producerId, epoch)
pair in TransactionalLogValue
record, so we add 2 new tagged fields without bumping the version, so that it is possible to downgrade to a version that supports KIP-915:
Code Block | ||
---|---|---|
| ||
{ "type": "data", "name": "TransactionLogValue", "validVersions": "0-1", "flexibleVersions": "1+", "fields": [ { "name": "TransactionProducerId", "type": "int64", "versions": "0+", // Renamed "about": "Producer id of the current / last transaction"},// Version 1 is the first flexible version. // KIP-915: bumping the version will no longer make this record backward compatible. // We suggest to add/remove only tagged fields to maintain backward compatibility. "validVersions": "0-1", "flexibleVersions": "1+", "fields": [ { "name": "TransactionProducerEpochProducerId", "type": "int16int64", "versions": "0+", // Renamed "about": "EpochProducer associatedid within theuse producer idby of the current / last transactiontransactional id"}, { "name": "TransactionTimeoutMsProducerEpoch", "type": "int32int16", "versions": "0+", "about": "Transaction timeout in millisecondsEpoch associated with the producer id"}, { "name": "TransactionStatusPrevProducerId", "type": "int64", "int8default": -1, "versionstaggedVersions": "01+", "tag": 0, "about": "TransactionStateProducer id in use by client when committing the transaction is in"}, { "name": "TransactionPartitionsNextProducerId", "type": "[]PartitionsSchemaint64", "versionsdefault": "0-1, "taggedVersions": "1+", "nullableVersionstag": "0+"1, "about": "Set of partitions involvedProducer id returned to the client in the transactionepoch overflow case"}, "fields": [ { "name": "TopicNextProducerEpoch", "type": "string"int16", "default": -1, "versionstaggedVersions": "01+"}, , "tag": 2, // New { "name": "PartitionIds", "typeabout": "[]int32", "versions": "0+"}]}, Producer epoch associated with the producer id returned to the client in the epoch overflow case"}, { "name": "TransactionLastUpdateTimestampMsTransactionTimeoutMs", "type": "int64int32", "versions": "0+", "about": "TimeTransaction thetimeout transaction was last updatedin milliseconds"}, { "name": "TransactionStartTimestampMsTransactionStatus", "type": "int64int8", "versions": "0+", "about": "TimeTransactionState the transaction wasis startedin"}, { "name": "ProducerIdTransactionPartitions", "type": "int64[]PartitionsSchema", "tagversions": "0+", "taggedVersionsnullableVersions": "10+", // New "about": "ProducerSet idof inpartitions useinvolved byin the transactional id." },transaction", "fields": [ { "name": "ProducerEpochTopic", "type": "int16string", "tagversions": 1, "taggedVersions"0+"}, { "name": "1+PartitionIds", // New"type": "[]int32", "versions": "0+"}]}, { "aboutname": "Producer id in use by the transactional id." } ] } |
...
TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+",
"about": "Time the transaction was last updated"},
{ "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
"about": "Time the transaction was started"}
]
} |
Let's consider some examples of the state transitions and how the various producer ids and epochs are used.
Vanilla KIP-890 transaction case with epoch overflow:
- InitProducerId(false); TC STATE: Empty, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1, OngoingTxnProducerId=-1, OngoingTxnEpoch=-1
- AddPartitionsToTxn; REQUEST: ProducerId=42, ProducerEpoch=MAX-1; TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1
- Commit; REQUEST: ProducerId=42, ProducerEpoch=MAX-1; TC STATE: PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=-1, NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85, Epoch=0
- (Transition in TC into CompleteCommit); TC STATE: CompleteCommit, ProducerId=85, ProducerEpoch=0, PrevProducerId=42, NextProducerId=-1, NextProducerEpoch=-1
The extra producer id info is there so that if the commit operation times out (and thus the client doesn't get the new ProducerId and ProducerEpoch) and the client retries with the previous ProducerId and ProducerEpoch we can detect the retry and return success. This logic is not new in this KIP, it's part of KIP-890. Note that with vanilla KIP-890 transactions there are no cases when both NextProducerId and PrevProducerId are set – there is at most one of the those extra fields in a given state.
2PC transaction case with two epoch overflows:
- InitProducerId(false); TC STATE: Empty, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1, OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
- AddPartitionsToTxn; REQUEST: ProducerId=42, ProducerEpoch=MAX-1; TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, NextProducerEpoch=-1
- (Transaction is prepared on the client, then client crashed)
- InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=0; RESPONSE ProducerId=73, Epoch=0, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
- (crash the client)
- InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=1; RESPONSE ProducerId=73, Epoch=1, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
- (crash the client a few times to drive the NextProducerEpoch to MAX-1)
- InitProducerId(true); TC STATE: Ongoing, ProducerId=42, ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1, OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1
- Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX-1; TC STATE: PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=73, NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85, Epoch=0
- (Transition in TC into CompleteCommit); TC STATE: CompleteCommit, ProducerId=85, ProducerEpoch=0, PrevProducerId=73, NextProducerId=-1, NextProducerEpoch=-1
This example highlights the following interesting details:
- InitProducerId(true) may be issued multiple times (e.g. client gets into a crash loop). The ProducerId and ProducerEpoch of the ongoing transaction always stay the same, but the NextProducerEpoch is always incremented. Eventually, NextProducerEpoch may overflow, in which case we can allocate a new NextProducerId.
- When a commit request is sent, it uses the latest ProducerId and ProducerEpoch. We send out markers using the original ongoing transaction ProducerId and ProducerEpoch, but the next transaction will use the latest ProducerId and ProducerEpoch + 1 (this is what the response is going to contain). It may happen (like in this example) that the latest ProducerEpoch is already at MAX, in which case we'd need to allocate a new ProducerId. In order to support retries we store the previous ProducerId in the PrevProducerId. Thus in such situation the PrepareCommit state can have three distinct producer ids:
- ProducerId – this is used to send our commit markers
- NextProducerId – this is the producer id to use for the next transaction
- PrevProducerId – this is the producer id to avoid self-fencing on retries (i.e. if the commit request times out and the client retries with previous producer id, we can return success and new producer id)
Metric Changes
A new metric will be added
kafka.server:type=transaction-coordinator-metrics,name=active-transaction-totalopen-time-max The max time a currently-open transaction has been open
...
The method would compare the currently prepared transaction state and the state passed in the argument and either commit or abort the transaction. If the producer is not in prepared state (i.e. either neither prepareTransaction was called or nor initTransaction(true) was called) we return an INVALID_TXN_STATE error.
AdminClient API Changes
Class ListTransactionsOptions
will support 2 new methods:
public ListTransactionsOptions runningLongerThanMs(long runningLongerThanMs) // set
...
.
AdminClient API Changes
The Admin
interface will support a new method:
...
Command Line Tool Changes
The kafka-transactions.sh --list
command is going to support a new flag --runningLongerThanMs
that would take the number of milliseconds. If this flag is provided, only transactions that have been running longer than the specified number of milliseconds would be listed.The kafka-transactions.sh
tool is going to support a new command --forceTerminateTransaction
. It has one required argument --transactionalId
that would take the transactional id for the transaction to be terminated.
...
The broker can be downgraded to versions that support KIP-915. We write the TransactionProducerId
and TransactionProducerEpoch
that correspond to The ProducerId and ProducerEpoch fields contain the ongoing transaction in place of the ProducerId
and ProducerEpoch
fields, information so an old broker would be able to properly commit or abort the transaction.
...