Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add new command to find hanging transactions directly

...

In KAFKA-9144, we encountered a bug which could lead to a transaction being left in a hanging state. When this happens, the last stable offset (LSO) is unable to advance which means that consumers using the read_committed isolation level will be stuck. Furthermore, if the topic is compacted, then the cleaner would be unable to clean beyond this offset which can lead to unbounded growth. Today there are no good recovery options for users who hit this problem. The only practical option is to wait for the data from the hanging transaction to get removed from the log once retention time has been reached. In the case of a compacted topic, options are even more scarce.  Users in some cases have just deleted and recreated topics to get around this bug.

In this proposal, we aim to address this problem by providing the tools to recover from hanging transactions. As a part of this, we want to improve visibility into transactional state. Unlike the group coordinator, there are no APIs currently which let us introspect the state of the transaction coordinator or the producer state associated with each topic partition. Hence we want to add these APIs so that hanging transactions and any other EOS-related problems can be more easily diagnosed.

...

We expect that users will alert on positive values of `PartitionsWithLateTransactionsCount`. They can then use `MaxActiveTransactionDuration` or one of the APIs described below to find the topic partition.

Note that it is possible to have transaction timeouts which are exactly equal to `transaction.max.timeout.ms`. To account for some extra latency and avoid spurious alerts, we will add 5 minutes of padding before a transaction is counted in `PartitionsWithLateTransactionsCount`.

Analysis

Hanging transactions are the result of an inconsistent state between the replicas and the transaction coordinator. It is not easy to analyze a hanging transaction if one is expected today because there is little visibility into either the producer state maintained by each replica or the transaction state of the coordinator. We propose to add three new APIs to address this gap:

...

There will be a new tool: `kafka-transactions.sh` which supports the . The following commands will be supported:

  • --list: List the transactions known to a transaction coordinator
  • --describe: Describe find-hanging: Find hanging transactions on a specific broker
  • --abort:

...

  • describe: Describe details specific to a particular transactionalId
  • --abort: Forcefully abort a transaction

Listing Transactions

The --list command also supports an optional --coordinator flag in order to query the transactions on allows a user to list transaction state from the transaction coordinators. It supports an optional `–coordinator` flag to select a specific node.

Code Block
> kafka-transactions.sh --list --bootstrap-server localhost:9092
TransactionalId	ProducerId	Coordinator State
my-txn-id1	    134132      0		    Ongoing
my-txn-id2	    134147      0	        Ongoing
my-txn-id3	    134191      1           PrepareCommit
my-txn-id4	    134193      2           CompleteAbort

> kafka-transactions.sh --list --coordinator 0 --bootstrap-server localhost:9092
TransactionalId	ProducerId	Coordinator State
my-txn-id1	    134132      0		    Ongoing
my-txn-id2	    134147      0	        Ongoing

...

Finding Hanging Transactions

The --describe command can be find-hanging command is used to either describe either automate the set process of open transactions for , or the state of finding hanging transactions on a specific `TransactionalId`.broker. It has one required argument --max-transaction-timeout. Internally what this will do is the following:

  1. Use the `Metadata` API to find all of the partitions.
  2. Send `DescribeProducers` including each of the partitions found in the first step.
  3. Collect any open transactions which have been open longer than the provided --max-transaction-timeout.
  4. Use `ListTransactions` on each available coordinator to find the respective TransactionalId and coordinator for each ProducerId found in the step above.
    1. If no coordinator could be found, then the transaction is considered hanging.
    2. Otherwise, use `DescribeTransactions` to determine the state of the transaction and decide whether it should be considered hanging (e.g. if the epoch does not match or the partition is not included in the current transaction).

Here is an example:

Code Block
> kafka-transactions.sh --find-hanging --broker
Code Block
# Describe open transactions for foo-0
> kafka-transactions.sh --describe --topic foo --partition 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	Duration
foo		0			134132		23				550			1598383537		30000

# Limit the search to a specific topic partition
> kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	Duration
foo		0			134132		23				550			1598383537		30000

Describing Transactions

The --describe command can be used to describe the state of a transaction when the TransactionalId is known. It uses the `DescribeTransactions` API.

Code Block
9092
ProducerId ProducerEpoch LastSequence CurrentTxnStartOffset LastTimestamp Duration
134132     23            9838         550                   1598383537    1000

# Describe transaction state for my-txn-id
> kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092
ProducerId ProducerEpoch Coordinator State 	 TimeoutMs TopicPartitions
134132     24            0           Ongoing 5000      foo-0,foo-1

...

The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:

  • --topic:
  • --partition:
  • --producerstart-id:
  • --producer-epoch:
  • --transaction-start-offset:
  • offset

Internally, the tool will first use `DescribeProducers` to validate that there is an open transaction beginning at that offset and collect the ProducerId and ProducerEpoch.

As described above, the partition leader will allow an administrative abort only if the producer epoch matches the latest value and there is an open transaction beginning at the indicated offset. Users are required to use the --describe command to find this information.

Code Block
> kafka-transactions.sh --abort \
  --topic foo  \
  --partition 0  \
  --producer-id 134132  \
  --producer-epoch 23  \
  --transaction-start-offset 550 \
  --bootstrap-server localhost:9092

...