Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Set api key number is new RPCs

Table of Contents

Status

Current state: Under DiscussionAdopted

Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg111066.html

JIRA: 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10442

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Ideally, we would prefer a metric which takes transaction timeout into account. Although partition leaders do not know the transaction timeout associated with each transactional write, they do know the upper bound indicated by `transaction.max.timeout.ms`. Any transaction which has remained open longer than this timeout ought to be treated as suspicious. We propose two new metrics:

...

to add a new metric

...

`PartitionsWithLateTransactionsCount`, which tracks the count of the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms`.

...

We expect that users will alert on positive values of `PartitionsWithLateTransactionsCount`. They can then use `MaxActiveTransactionDuration` or one of the APIs described below to find the topic partitionThis gives users a simple alert criteria.

Note that it is possible to have transaction timeouts which are exactly equal to `transaction.max.timeout.ms`. To account for some extra latency and avoid spurious alerts, we will add 5 minutes of padding before a transaction is counted in `PartitionsWithLateTransactionsCount`.

...

The DescribeProducers API will require `DescribeRead` permission on the associated topic. It is expected to be sent to any replica of a topic partition leaders.

Request Schema

This request is expected to be sent to partition leaders.

Code Block
{
  "apiKey": NN61,
  "type": "request",
  "name": "DescribeProducersRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "Topics", "type": "[]TopicRequestData", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
          "about": "The indexes of the partitions to list producers for." }
       ]}
    ]}
  ]
}

Response Schema

Code Block
{
  "apiKey": NN61,
  "type": "response",
  "name": "DescribeProducersResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]TopicResponse", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name" },
      { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+",
        "about": "Each partition in the response.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error code, or 0 if there was no error." },
        { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "0+" },
          { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
          { "name": "LastSequence", "type": "int32", "versions": "0+" },
          { "name": "LastTimestamp", "type": "int64", "versions": "0+" },
	      { "name": "CurrentTxnStartTimestampTxnStartOffset", "type": "int64", "versions": "0+" },
          { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+" },
        ]}
      ]}
    ]}
  ]
}


Possible errors:

...

The request includes a filter of the states that the use is interested in. As an example, this allows us to filter only the "Ongoing" transactions. We have also included a filter for the `ProducerId`. This is useful when trying to reverse lookup the `TransactionalId` as we need to do to support the --find-hanging command below.  

Code Block
Code Block
{
  "apiKey": 1666,
  "type": "request",
  "name": "ListTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "0+",
      "about": "The states of transactions we want to list."
    },
  ]
}

Response Schema

Code Block
{
   { "apiKeyname": NN"ProducerIdFilter",
  "type": "response[]int64",
  "nameversions": "DescribeTransactionsResponse0+",
   "validVersions": "0",
  "flexibleVersionsabout": "Array of ProducerIds to limit the response to"
    }
  ]
}

Response Schema

Code Block
{
  "apiKey": 66,
  "type": "response",
 0+",
  "fields": [
      { "name": "ThrottleTimeMsListTransactionsResponse",
  "typevalidVersions": "int320",
  "versionsflexibleVersions": "0+",
  "ignorablefields": true,[
       { "aboutname": "ThrottleTimeMs"The duration in milliseconds for , "type": "int32", "versions": "0+", "ignorable": true,
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+" },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "TransactionalId", "type": "string", "versions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
    ]}
  ]
}

...

The request schema is specified below:

Code Block
{
  "apiKey": NN65,
  "type": "request",
  "name": "DescribeTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "TransactionalIds", "type": "[]string", "versions": "0+" }
    ]}
  ]
}

...

The response schema is specified below:

Code Block
{
  "apiKey": NN65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+" },
        { "name": "TransactionState", "type": "int8", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+" },
        { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
        { "name": "TopicPartitions", "type": "[]TopicData", "versions": "0+", "fields": [
          { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+" }}
    ]}
  ]
}

...

  • NOT_COORDINATOR: If the coordinator receiving the request does not own a transactionalId in the request.
  • COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the process of loading its state.
  • COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the request is being shutdown.
  • TRANSACTIONAL_ID_NOT_FOUND (NEW): New error code which indicates that the requested TransactionalId could not be found 
  • TRANSACTIONAL_ID_AUTHORIZATION_FAILED: If the principal does not have Describe permission one of the transactionalIds in the request.

...

Code Block
{
  "apiKey": 27,
  "type": "request",
  "name": "WriteTxnMarkersRequest",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "Markers", "type": "[]WritableTxnMarker", "versions": "0+",
      "about": "The transaction markers to be written.", "fields": [
      { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
        "about": "The current producer ID."},
      { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
        "about": "The current epoch associated with the producer ID." },
      { "name": "TransactionResult", "type": "bool", "versions": "0+",
        "about": "The result of the transaction to write to the partitions (false = ABORT, true = COMMIT)." },
      { "name": "Topics", "type": "[]RequestTopic", "versions": "0+",
        "about": "Each topic that we want to write transaction marker(s) for.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "0",
          "about": "The indexes of the partitions to write transaction markers for." }

	    // NEW: The array { "name":below replaces "PartitionsPartitionIndexes", "type": "[]RequestPartition",and adds "versionsTxnStartOffset":
 "1+", "fields":       //      as an optional field, which will only be included by the AdminClient
        { "name": "Partitions", "type": "[]RequestPartition", "versions": "1+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "1+",
            "about": "The partition index." },
          { "name": "TxnStartOffset", "type": "int64", "versions": "1+", "taggedVersions": "1+", "tag": 0, 
            "about": "The start offset of the transaction that should be completed" }
        ]}
      ]},
      { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+",
        "about": "Epoch associated with the transaction state partition hosted by this transaction coordinator" }
    ]}
  ]
}

...

There are no changes to the response schema, but it will be bumped. Note that we are also enabling flexible version support. Note also that INVALID_TXN_STATE is now a possible error code when the `TxnStartOffset` is included in the request.

Code Block
Code Block
{
  "apiKey": 27,
  "type": "response",
  "name": "WriteTxnMarkersResponse",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "Markers", "type": "[]WritableTxnMarkerResult", "versions": "0+",
      "about": "The results for writing makers.", "fields": [
      { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
        "about": "The current producer ID in use by the transactional ID." },
      { "name": "Topics", "type": "[]TopicResult", "versions": "0+",
        "about": "The results by topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
          "about": "The results by partition.", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The error code, or 0 if there was no error." }
        ]}
      ]}
    ]}
  ]
}

...

  • NOT_LEADER_OR_FOLLOWER: If the replica receiving the request is not the current leader of a topic partition
  • CLUSTER_AUTHORIZATION_FAILED: If the user does not have ClusterAction permission.
  • UNKNOWN_TOPIC_OR_PARTITION: If either the topic or partition is not known to exist.
  • INVALID_PRODUCER_EPOCH: If no cluster epoch is provided and the provided epoch does not exactly match the current 
  • INVALID_TXN_STATE (NEW): If there is no transaction in progress at the indicated offset

...

Code Block
interface Admin {

  ListTransactionsResult listTransactions();
  ListTransactionsResult listTransactions(ListTransactionsOptions options);
  
  DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds);
  DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options);

  DescribeProducersResult describeProducers(Collection<TopicPartition> partitions);
  DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options);

  AbortTransactionResult abortTransaction(TransactionInfo info);  
  AbortTransactionResult abortTransaction(TransactionInfo info, AbortTransactionOptions options);  

}

The POJOs above just expose the state from the corresponding APIs. For example:sections below describe the supporting classes in more detail.

Listing Transactions

Code Block
class ListTransactionsOptions extends TransactionInfoAbstractOptions<ListTransactionsOptions> {
  ListTransactionsOptions long producerIdsetBrokerId(int brokerId);
  intOptionalInt producerEpochbrokerId();
}

class  long transactionStartOffset;
  TopicPartition topicPartition;
}

Metrics

As discussed above, we will add a few new gauges to help detect hanging transactions. The table below summarizes the metrics that are relevant to detect hanging transactions including LastStableOffsetLag, which already exists.

...

DescribeTransactionsResult {
  KafkaFuture<Map<Integer, Collection<TransactionSummary>> all();
  KafkaFuture<Collection<TransactionSummary>> brokerResult(Integer brokerId);

  static class TransactionSummary {
    final String transactionalId;
    final long producerId;
    final String transactionState;
  }
}

Describing Transactions

Code Block
class DescribeTransactionsOptions extends AbstractOptions<DescribeTransactionsOptions> {
}

class DescribeTransactionsResult {
  KafkaFuture<Map<String, TransactionState>> all();
  KafkaFuture<TransactionState> partitionResult(String transactionalId);

  static class TransactionState {
    final String transactionState;
    final int transactionTimeoutMs;
    final long transactionStartTimeMs;
    final long producerId;
    final int producerEpoch;
    final List<TopicPartition> addedPartitions;
  }
}

Describing Producers

Code Block
class DescribeProducersOptions extends AbstractOptions<DescribeProducersOptions> {
  DescribeProducersOptions setBrokerId(int brokerId);
  OptionalInt brokerId();
}

class DescribeProducersResult {
  KafkaFuture<Map<TopicPartition, PartitionProducerState>> all();
  KafkaFuture<PartitionProducerState> partitionResult(TopicPartition partition);

  static class PartitionProducerState {
    final List<ProducerState> activeProducers;
  }

  static class ProducerState {
    final long producerId;
    final int producerEpoch;
    final int lastSequence;
    final long lastTimestamp;
    final OptionalLong currentTransactionStartOffset;
    final int coordinatorEpoch;
  }
}

Aborting Transactions

Code Block
class AbortTransactionOptions extends AbstractOptions<AbortTransactionOptions> {
}

abstract class TransactionInfo {
  final TopicPartition topicPartition;
  final long ProducerId;
  final int producerEpoch;

  // One and only one of the following fields must be set
  final OptionalLong transactionStartOffset;
  final OptionalInt coordinatorEpoch;
}

class AbortTransactionResult {
  KafkaFuture<Void> result();
}

Metrics

As discussed above, we will add  new gauge `PartitionsWithLateTransactionsCount`, which is tracked in the `ReplicaManager` group (along with `UnderMinIsrPartitionCount`, `OfflineReplicaCount`, etc.). This metric will record the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms` (plus 5 minutes).

Command Line Tool

There will be a new tool: `kafka-transactions.sh`. The following commands will be supported:

  • --list: List transactions known to a transaction coordinator
  • --find-hanging: Find hanging transactions on a specific broker
  • --describe: Describe details specific to a particular transactionalId
  • --describe-producers: Describe active producer state for a given topic partition
  • --abort: Forcefully abort a transaction

Listing Transactions

The --list command allows a user to list transaction state from the transaction coordinators. It supports an optional --broker flag to select a specific node.

Code Block
> kafka-transactions.sh --list --bootstrap-server localhost:9092
TransactionalId	ProducerId	Coordinator State
my-txn-id1	    134132      0		    Ongoing
my-txn-id2	    134147      0	        Ongoing
my-txn-id3	    134191      1           PrepareCommit
my-txn-id4	    134193      2           CompleteAbort

> kafka-transactions.sh --list --broker 0 --bootstrap-server localhost:9092
TransactionalId	ProducerId	Coordinator State
my-txn-id1	    134132      0		    Ongoing
my-txn-id2	    134147      0	        Ongoing

Finding Hanging Transactions

The --find-hanging command is used to automate the process of finding hanging transactions on a specific broker. It has one required argument --max-transaction-timeout. Internally what this will do is the following:

  1. Use the `Metadata` API to find all of the partitions.
  2. Send `DescribeProducers` including each of the partitions found in the first step.
  3. Collect any open transactions which have been open longer than the provided --max-transaction-timeout.
  4. Use `ListTransactions` on each available coordinator to find the respective TransactionalId and coordinator for each ProducerId found in the step above.
    1. If no coordinator could be found, then the transaction is considered hanging.
    2. Otherwise, use `DescribeTransactions` to determine the state of the transaction and decide whether it should be considered hanging (e.g. if the epoch does not match or the partition is not included in the current transaction).

Here is an example:

Code Block
> kafka-transactions.sh --find-hanging --broker 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	            Duration(s)
foo		0			134132		23				550			2020-09-17T23:02:23Z		30

# Limit the search to a specific topic partition
> kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0

Command Line Tool

There will be a new tool: `kafka-transactions.sh`. The following commands will be supported:

  • --list: List transactions known to a transaction coordinator
  • --find-hanging: Find hanging transactions on a specific broker
  • --describe: Describe details specific to a particular transactionalId
  • --abort: Forcefully abort a transaction

Listing Transactions

The --list command allows a user to list transaction state from the transaction coordinators. It supports an optional `–coordinator` flag to select a specific node.

Code Block
> kafka-transactions.sh --list --bootstrap-server localhost:9092
TransactionalIdTopic	Partition	ProducerId	Coordinator State
my-txn-id1	    134132      0		    Ongoing
my-txn-id2ProducerEpoch	StartOffset	LastTimestamp	    134147      0	        Ongoing
my-txn-id3	    134191      1           PrepareCommit
my-txn-id4	    134193      2           CompleteAbort

> kafka-transactions.sh --list --coordinator 0Duration(s)
foo		0			134132		23				550			2020-09-17T23:02:23Z		30000

Describing Transactions

The --describe command can be used to describe the state of a transaction when the TransactionalId is known. It uses the `DescribeTransactions` API.

Code Block
# Describe transaction state for my-txn-id
> kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092
TransactionalId	ProducerId	 ProducerEpoch Coordinator State
my-txn-id1 	 TimeoutMs   TopicPartitions
134132      0		    Ongoing
my-txn-id2	24      134147      0	           Ongoing

Finding Hanging Transactions

The --find-hanging command is used to automate the process of finding hanging transactions on a specific broker. It has one required argument --max-transaction-timeout. Internally what this will do is the following:

  1. Use the `Metadata` API to find all of the partitions.
  2. Send `DescribeProducers` including each of the partitions found in the first step.
  3. Collect any open transactions which have been open longer than the provided --max-transaction-timeout.
  4. Use `ListTransactions` on each available coordinator to find the respective TransactionalId and coordinator for each ProducerId found in the step above.
    1. If no coordinator could be found, then the transaction is considered hanging.
    2. Otherwise, use `DescribeTransactions` to determine the state of the transaction and decide whether it should be considered hanging (e.g. if the epoch does not match or the partition is not included in the current transaction).

Here is an example:

 5000      foo-0,foo-1

Describing Producers

The --describe-producers command is used to describe the producer state of a specific topic partition. Internally, it is a direct call to the `DescribeProducers` API. It takes an optional --broker argument to specify a specific replica.

Code Block
# Describe producers on the leader of a topic partition
> kafka-transaction.sh --describe-producers --topic foo --partition
Code Block
> kafka-transactions.sh --find-hanging --broker 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp		        Duration
foo		0			134132		23				550			1598383537		30000

# Limit the search to(s)	CoordinatorEpoch
134132		23				550			2020-09-17T23:02:23Z		30   	    77
134938		5				439			2020-09-17T23:01:23Z		90   	    64

# Describe producers for a specific replica of a topic partition
> kafka-transactionstransaction.sh --finddescribe-hangingproducers --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp		        Duration
foo(s)	CoordinatorEpoch
134132		23				0550			134132		23				550			1598383537		30000

Describing Transactions

The --describe command can be used to describe the state of a transaction when the TransactionalId is known. It uses the `DescribeTransactions` API.

Code Block
# Describe transaction state for my-txn-id
> kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092
ProducerId ProducerEpoch Coordinator State 	 TimeoutMs TopicPartitions
134132     24            0           Ongoing 5000      foo-0,foo-1

Aborting Transactions

2020-09-17T23:02:23Z		30    	    77
134938		5				439			2020-09-17T23:01:23Z		90   	    64

Aborting Transactions

The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:

  • --topic
  • --partition
  • --start-offset

Internally, the tool will first use `DescribeProducers` to validate that there is an open transaction beginning at that offset and collect the ProducerId and ProducerEpoch.

For compatibility with older brokers, we also support the ability to directly specify additional parameters. Although we cannot use the new APIs in this case to validate the state, this is still better than nothing for users who have hit this problem. In this case, users have no choice but to do the analysis themselves by dumping the log of a suspected topic partition. In this case, the command takes the following The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:

  • --topic
  • --partition
  • --start-offset

...

  • -producer-id
  • --producer-epoch
  • --coordinator-epoch

As described above, the partition leader will allow an administrative abort only if the producer epoch matches the latest value and there is an open transaction beginning at the indicated offset. Users are required to use the --describe command to find this information.open transaction beginning at the indicated offset. Users are required to use the --describe command to find this information.

Code Block
> kafka-transactions.sh --abort \
  --topic foo  \
  --partition 0  \
  --start-offset 550 \
  --bootstrap-server localhost:9092

Code Block
> kafka-transactions.sh --abort \
  --topic foo  \
  --partition 0  \
  --producer-id 134132 \
  --producer-epoch 23 \
  --startcoordinator-offsetepoch 55015 \
  --bootstrap-server localhost:9092

Note that we are not providing an option to commit a transaction through this API.

...

This proposal adds new tools to facilitate the recovery of hanging transactions. These tools will not generally be compatible with older versions since they rely on new APIs. However, the --abort command described above will be compatible with older brokers.

Rejected Alternatives

If there were a simple and safe way to automatically detect which transactions were left hanging, then an automatic recovery option might be preferable. We have considered two options: 

...