...
In KAFKA-9144, we encountered a bug which could lead to a transaction being left in a hanging state. When this happens, the last stable offset (LSO) is unable to advance which means that consumers using the read_committed
isolation level will be stuck. Furthermore, if the topic is compacted, then the cleaner would be unable to clean beyond this offset which can lead to unbounded growth. Today there are no good recovery options for users who hit this problem. The only practical option is to wait for the data from the hanging transaction to get removed from the log once retention time has been reached. In the case of a compacted topic, options are even more scarce. Users in some cases have just deleted and recreated topics to get around this bug.
In this proposal, we aim to address this problem by providing the tools to recover from hanging transactions. As a part of this, we want to improve visibility into transactional state. Unlike the group coordinator, there are no APIs currently which let us introspect the state of the transaction coordinator or the producer state associated with each topic partition. Hence we want to add these APIs so that hanging transactions and any other EOS-related problems can be more easily diagnosed.
...
We expect that users will alert on positive values of `PartitionsWithLateTransactionsCount`
. They can then use `MaxActiveTransactionDuration`
or one of the APIs described below to find the topic partition.
Note that it is possible to have transaction timeouts which are exactly equal to `transaction.max.timeout.ms`
. To account for some extra latency and avoid spurious alerts, we will add 5 minutes of padding before a transaction is counted in `PartitionsWithLateTransactionsCount`.
Analysis
Hanging transactions are the result of an inconsistent state between the replicas and the transaction coordinator. It is not easy to analyze a hanging transaction if one is expected today because there is little visibility into either the producer state maintained by each replica or the transaction state of the coordinator. We propose to add three new APIs to address this gap:
...
There will be a new tool: `kafka-transactions.sh
` which supports the . The following commands will be supported:
--list
: List the transactions known to a transaction coordinator--describe
: Describefind-hanging
: Find hanging transactions on a specific broker--abort
:
...
describe
: Describe details specific to a particular transactionalId--abort
: Forcefully abort a transaction
Listing Transactions
The --list
command also supports an optional --coordinator
flag in order to query the transactions on allows a user to list transaction state from the transaction coordinators. It supports an optional `–coordinator` flag to select a specific node.
Code Block |
---|
> kafka-transactions.sh --list --bootstrap-server localhost:9092 TransactionalId ProducerId Coordinator State my-txn-id1 134132 0 Ongoing my-txn-id2 134147 0 Ongoing my-txn-id3 134191 1 PrepareCommit my-txn-id4 134193 2 CompleteAbort > kafka-transactions.sh --list --coordinator 0 --bootstrap-server localhost:9092 TransactionalId ProducerId Coordinator State my-txn-id1 134132 0 Ongoing my-txn-id2 134147 0 Ongoing |
...
Finding Hanging Transactions
The --describe
command can be find-hanging
command is used to either describe either automate the set process of open transactions for , or the state of finding hanging transactions on a specific `TransactionalId`
.broker. It has one required argument --max-transaction-timeout.
Internally what this will do is the following:
- Use the `Metadata` API to find all of the partitions.
- Send `DescribeProducers` including each of the partitions found in the first step.
- Collect any open transactions which have been open longer than the provided
--max-transaction-timeout
. - Use `ListTransactions` on each available coordinator to find the respective
TransactionalId
and coordinator for eachProducerId
found in the step above.- If no coordinator could be found, then the transaction is considered hanging.
- Otherwise, use `DescribeTransactions` to determine the state of the transaction and decide whether it should be considered hanging (e.g. if the epoch does not match or the partition is not included in the current transaction).
Here is an example:
Code Block |
---|
> kafka-transactions.sh --find-hanging --broker |
Code Block |
# Describe open transactions for foo-0 > kafka-transactions.sh --describe --topic foo --partition 0 --bootstrap-server localhost:9092 Topic Partition ProducerId ProducerEpoch StartOffset LastTimestamp Duration foo 0 134132 23 550 1598383537 30000 # Limit the search to a specific topic partition > kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092 Topic Partition ProducerId ProducerEpoch StartOffset LastTimestamp Duration foo 0 134132 23 550 1598383537 30000 |
Describing Transactions
The --describe
command can be used to describe the state of a transaction when the TransactionalId
is known. It uses the `DescribeTransactions
` API.
Code Block |
---|
9092
ProducerId ProducerEpoch LastSequence CurrentTxnStartOffset LastTimestamp Duration
134132 23 9838 550 1598383537 1000
# Describe transaction state for my-txn-id
> kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092
ProducerId ProducerEpoch Coordinator State TimeoutMs TopicPartitions
134132 24 0 Ongoing 5000 foo-0,foo-1 |
...
The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:
--topic
:--partition
:--producerstart-id
:--producer-epoch
:offset
--transaction-start-offset
:Internally, the tool will first use `DescribeProducers
` to validate that there is an open transaction beginning at that offset and collect the ProducerId
and ProducerEpoch
.
As described above, the partition leader will allow an administrative abort only if the producer epoch matches the latest value and there is an open transaction beginning at the indicated offset. Users are required to use the --describe
command to find this information.
Code Block |
---|
> kafka-transactions.sh --abort \
--topic foo \
--partition 0 \
--producer-id 134132 \
--producer-epoch 23 \
--transaction-start-offset 550 \
--bootstrap-server localhost:9092 |
...