Current state: Under Discussion

In KAFKA-9144, we encountered a bug which could lead to a transaction being left in a hanging state. When this happens, the last stable offset (LSO) is unable to advance which means that consumers using the read_committed isolation level will be stuck. Furthermore, if the topic is compacted, then the cleaner would be unable to clean beyond this offset which can lead to unbounded growth. Today there are no good recovery options for users who hit this problem. The only practical option is to wait for the data from the hanging transaction to get removed from the log once retention time has been reached. In the case of a compacted topic, options are even more scarce. 

In this proposal, we aim to address this problem by providing the tools to recover from hanging transactions. As a part of this, we want to improve visibility into transactional state. Unlike the group coordinator, there are no APIs currently which let us introspect the state of the transaction coordinator or the producer state associated with each topic partition. 

Proposed Changes

This proposal provides the tooling support needed to detect, analyze, and safely recover from a hanging transaction. 

Detection: The first issue to address is how a user can find topic partitions that may have hanging transactions. Preferably we want a metric so that alerts can be triggered proactively rather than waiting for users to complain. Today, Kafka exposes a partition-level "LastStableOffsetLag" metric which indicates how far behind the LSO is from the log end offset. When there is a hanging transaction, the LSO lag will tend to grow indefinitely. However, it is difficult to assign an alert threshold because it depends on the characteristics of the application (e.g. transaction duration and throughput).

Ideally, we would prefer a metric which takes transaction duration into account. Although partition leaders do not know the transaction timeout associated with each transactional write, they do know the upper bound indicated by ``. Any transaction which has remained open longer than this timeout ought to be treated as suspicious. We propose two new metrics:

  • MaxActiveTransactionDuration: At the partition level, we will add a new metric for the maximum duration of any open transaction
  • PartitionsWithLateTransactionsCount: A count of the number of partitions which have open transactions with durations exceeding ``.

We expect that users will alert on positive values of `PartitionsWithLateTransactionsCount`. They can then use `MaxActiveTransactionDuration` or one of the APIs described below to find the topic partition.

Analysis: Hanging transactions are the result of an inconsistent state between the replicas and the transaction coordinator. It is not easy to analyze a hanging transaction if one is expected today because there is little visibility into either the producer state maintained by each replica or the transaction state of the coordinator. We propose to add three new APIs to address this gap:

  • DescribeProducers: this request is sent to partition leaders to describe the state of the active idempotent/transactional producers.
  • ListTransactions: this request is sent to any broker to collect the current set of TransactionalId/ProducerId which have pending transactions.
  • DescribeTransactions: this request is sent to transaction coordinators to describe the transaction state in more detail (including the topic partitions 

The schemas for these APIs are provided in the "Public Interfaces" section below. Combined, they give the user a way to inspect ongoing transaction state when a specific topic partition is suspected to have a hanging transaction.

At a high level, we suggest the following workflow once a topic partition with a suspected hanging transaction has been identified:

  1. Use `DescribeProducers` to collect the set of ProducerIds which have transactions exceeding the max transaction timeout
  2. Use `ListTransactions` to the available brokers to find the the TransactionalIds associated with these ProducerIds.
  3. Finally, use `DescribeTransactions` to validate the transaction state and ensure it is safe to abort.

Recovery: The remaining problem to solve is how to safely abort a hanging transaction. We propose to extend the `WriteTxnMarker` API so that it can be used by the Kafka AdminClient. Currently we use the coordinator epoch (which is the leader epoch of the associated __transaction_state partition) as a kind of concurrency control. Basically partition leaders will not accept non-monotonic updates for a given `ProducerId`. We need to ensure that writes from the AdminClient do not interfere with this mechanism.

The approach we take here is to add the start offset of the transaction that is intended to be aborted. The partition leader will accept the request only if there is an open transaction beginning at the offset specified in the request. Additionally we require the producer epoch in the request to exactly match the latest value. This ensures that the transaction coordinator remains the only one that can bump the epoch of a transactional producer. So we couldn't hit a case where the epoch in some partition's state got ahead of the coordinator which might cause worse failures. Finally, markers written from the AdminClient will set `coordinator_epoch` in the request to -1. This avoids race conditions with concurrent requests from existing coordinators and gives us the ability to tell that the source of the marker was the AdminClient.

Public Interfaces


The DescribeProducers API will require `Describe` permission on the associated topic. It is expected to be sent to topic partition leaders.

Request Schema

This request is expected to be sent to partition leaders.

  "apiKey": NN,
  "type": "request",
  "name": "DescribeProducersRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "Topics", "type": "[]TopicRequestData", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
          "about": "The indexes of the partitions to list producers for." }

Response Schema

  "apiKey": NN,
  "type": "response",
  "name": "DescribeProducersResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]TopicResponse", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name" },
      { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+",
        "about": "Each partition in the response.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error code, or 0 if there was no error." },
        { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "0+" },
          { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
          { "name": "LastSequence", "type": "int32", "versions": "0+" },
          { "name": "LastTimestamp", "type": "int64", "versions": "0+" },
	      { "name": "CurrentTxnStartTimestamp", "type": "int64", "versions": "0+" }

Possible errors:

  • NOT_LEADER_OR_FOLLOWER: If the replica receiving the request is not the current leader of a topic partition
  • TOPIC_AUTHORIZATION_FAILED: If the user does not have Describe access on a requested topic.
  • UNKNOWN_TOPIC_OR_PARTITION: If either the topic or partition is not known to exist.


This API is analogous to `ListGroups` for the group coordinator and requires `Describe` permission on the `TransactionId` resource. That is, it will only include transactions for which the principal has `Describe` permission on the corresponding `TransactionalId`.

It is meant to provide a summary of the transactions managed by a given coordinator.

Request Schema

The request includes a filter of the states that the use is interested in. As an example, this allows us to filter only the "Ongoing" transactions.

  "apiKey": 16,
  "type": "request",
  "name": "ListTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "0+",
      "about": "The states of transactions we want to list."

Response Schema

  "apiKey": NN,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+" },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "TransactionalId", "type": "string", "versions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },

Possible Error Codes

  • COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the process of loading its state.
  • COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the request is being shutdown.


The DescribeTransactions API will require Describe permission on the associated TransactionalId resource.

Request Schema

The DescribeTransactions request is sent to transaction coordinators. Similarly to the DescribeGroups API, we expect the AdminClient to use FindCoordinator first in order to find the right coordinator when a specific transactionalId is requested. Note that the array of transactional IDs is not nullable. The API does not provide a way to list the states of all TransactionalIds on a given broker.

The request schema is specified below:

  "apiKey": NN,
  "type": "request",
  "name": "DescribeTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "TransactionalIds", "type": "[]string", "versions": "0+" }

Response Schema

The response schema is specified below:

  "apiKey": NN,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+" },
        { "name": "TransactionState", "type": "int8", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+" },
        { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
        { "name": "TopicPartitions", "type": "[]TopicData", "versions": "0+", "fields": [
          { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+" }}

These fields are derived from the transaction log itself. The API is mainly just providing a more convenient way to access the state than dumping the log.

The transaction state is the same field included in the `ListTransactions` API. 

When there is no transaction in progress, the `TransactionStartTimeMs` field will be set to -1 and the set of topic partitions will be empty. Additionally we will reserve the status code of -1 to handle states which are unknown to the client.

Possible Error Codes

  • NOT_COORDINATOR: If the coordinator receiving the request does not own a transactionalId in the request.
  • COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the process of loading its state.
  • COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the request is being shutdown.
  • TRANSACTIONAL_ID_NOT_FOUND: New error code which indicates that the requested TransactionalId could not be found 
  • TRANSACTIONAL_ID_AUTHORIZATION_FAILED: If the principal does not have Describe permission one of the transactionalIds in the request.


As described above, we are bumping the `WriteTxnMarkers` API so that we can include the start offset of the transaction that should be administratively aborted. Transaction coordinators will leave this field unspecified.

Note that we are not changing the required permission for this API. It will still request ClusterAction: only administrators should be allowed to abort a transaction manually.

Request Schema

Below we define the changes to the request schema:

  "apiKey": 27,
  "type": "request",
  "name": "WriteTxnMarkersRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "Markers", "type": "[]WritableTxnMarker", "versions": "0+",
      "about": "The transaction markers to be written.", "fields": [
      { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
        "about": "The current producer ID."},
      { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
        "about": "The current epoch associated with the producer ID." },
      { "name": "TransactionResult", "type": "bool", "versions": "0+",
        "about": "The result of the transaction to write to the partitions (false = ABORT, true = COMMIT)." },
      { "name": "Topics", "type": "[]RequestTopic", "versions": "0+",
        "about": "Each topic that we want to write transaction marker(s) for.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "0",
          "about": "The indexes of the partitions to write transaction markers for." }
        { "name": "Partitions", "type": "[]RequestPartition", "versions": "1+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "1+",
            "about": "The partition index." },
          { "name": "TxnStartOffset", "type": "int64", "versions": "1+", "taggedVersions": "1+", "tag": 0, 
            "about": "The start offset of the transaction that should be completed" }
      { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+",
        "about": "Epoch associated with the transaction state partition hosted by this transaction coordinator" }

Response Schema

There are no changes to the response schema, but it will be bumped. Note that we are also enabling flexible version support.

  "apiKey": 27,
  "type": "response",
  "name": "WriteTxnMarkersResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "Markers", "type": "[]WritableTxnMarkerResult", "versions": "0+",
      "about": "The results for writing makers.", "fields": [
      { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
        "about": "The current producer ID in use by the transactional ID." },
      { "name": "Topics", "type": "[]TopicResult", "versions": "0+",
        "about": "The results by topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
          "about": "The results by partition.", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The error code, or 0 if there was no error." }

Possible Error Codes

  • NOT_LEADER_OR_FOLLOWER: If the replica receiving the request is not the current leader of a topic partition
  • CLUSTER_AUTHORIZATION_FAILED: If the user does not have ClusterAction permission.
  • UNKNOWN_TOPIC_OR_PARTITION: If either the topic or partition is not known to exist.
  • INVALID_PRODUCER_EPOCH: If no cluster epoch is provided and the provided epoch does not exactly match the current 
  • INVALID_TXN_STATE: If there is no transaction in progress at the indicated offset

AdminClient APIs

The new APIs will be exposed from the AdminClient following the usual conventions:

interface Admin {

  ListTransactionsResult listTransactions();
  ListTransactionsResult listTransactions(ListTransactionsOptions options);
  DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds);
  DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options);

  DescribeProducersResult describeProducers(Collection<TopicPartition> partitions);
  DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options);

  AbortTransactionResult abortTransaction(TransactionInfo info);  
  AbortTransactionResult abortTransaction(TransactionInfo info, AbortTransactionOptions options);  


The POJOs above just expose the state from the corresponding APIs. For example:

class TransactionInfo {
  long producerId;
  int producerEpoch;
  long transactionStartOffset;
  TopicPartition topicPartition;


As discussed above, we will add a few new gauges to help detect hanging transactions. The table below summarizes hte

MetricGroupIs New?

Command Line Tool

There will be a new tool: `` which supports the following commands:

  • --list: List the 
  • --describe: Describe 
  • --abort:

Examples are provided below:

Listing Transactions

The --list command also supports an optional --coordinator flag in order to query the transactions on a specific node.

> --list --bootstrap-server localhost:9092
TransactionalId	ProducerId	Coordinator State
my-txn-id1	    134132      0		    Ongoing
my-txn-id2	    134147      0	        Ongoing
my-txn-id3	    134191      1           PrepareCommit
my-txn-id4	    134193      2           CompleteAbort

> --list --coordinator 0 --bootstrap-server localhost:9092
TransactionalId	ProducerId	Coordinator State
my-txn-id1	    134132      0		    Ongoing
my-txn-id2	    134147      0	        Ongoing

Describing Transactions

The --describe command can be used to either describe either the set of open transactions for , or the state of a specific `TransactionalId`.

# Describe open transactions for foo-0
> --describe --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId ProducerEpoch LastSequence CurrentTxnStartOffset LastTimestamp Duration
134132     23            9838         550                   1598383537    1000

# Describe transaction state for my-txn-id
> --describe --transactional-id my-txn-id --bootstrap-server localhost:9092
ProducerId ProducerEpoch Coordinator State 	 TimeoutMs TopicPartitions
134132     24            0           Ongoing 5000      foo-0,foo-1

Aborting Transactions

The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:

  • --topic:
  • --partition:
  • --producer-id:
  • --producer-epoch:
  • --transaction-start-offset:

As described above, the partition leader will allow an administrative abort only if the producer epoch matches the latest value and there is an open transaction beginning at the indicated offset. Users are required to use the --describe command to find this information.

> --abort \
  --topic foo  \
  --partition 0  \
  --producer-id 134132  \
  --producer-epoch 23  \
  --transaction-start-offset 550 \
  --bootstrap-server localhost:9092

Note that we are not providing an option to commit a transaction through this API.

Compatibility, Deprecation, and Migration Plan

This proposal adds new tools to facilitate the recovery of hanging transactions. These tools will not generally be compatible with older versions since they rely on new APIs.

Rejected Alternatives

If there were a simple and safe way to automatically detect which transactions were left hanging, then an automatic recovery option might be preferable. We have considered two options: 

Transaction State Reconciliation: A hanging transaction is the result of an inconsistent state between the transaction coordinator and the individual topic partitions which were included in a transaction. The replicas of a topic partition maintain some state about the ongoing transactions which affect that topic partition. They know which transactions are open, but not necessarily which ones have been left hanging. In order to detect the hanging transactions, there needs to be some way to reconcile the state of the transaction coordinator.

Unfortunately, there is a challenge here. The transaction coordinator mapping is done using the transactionalId of the producer, but the replicas of a topic partition only see the producerId. There is no way to reverse map from the producerId to the transactionalId, which means there is also no way for a replica to find the respective transaction coordinator. This means that reconciling the state of an open transaction requires querying all transaction coordinators, which suggests an expensive process and a lot of bookkeeping.

Transaction Timeout: Another way to detect a hanging transaction is through the transaction timeout. Although a replica has no way to know the precise transaction timeout that is used by a producer, it does know the upper bound as indicated through Any transaction which has been left open longer than this has potentially been left hanging by the transaction coordinator. The main problem is ruling out false positives. Another explanation for a transaction which has been left open is that the coordinator has been unable to send the WriteTxnMarker request (e.g. because of a network partition). If we abort the transaction without verifying the state with the transaction coordinator, then we may violate the transactional guarantee.

Again, we emphasize that hanging transactions are not an expected state of the system, but the result of a bug. Even if automatic recovery could be done safely, there is a risk that it would cause the underlying bug to go unnoticed. 

