Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Set api key number is new RPCs

Table of Contents

Status

Current state: Under DiscussionAdopted

Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg111066.html

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10442

...

This request is expected to be sent to partition leaders.

Code Block
{
  "apiKey": NN61,
  "type": "request",
  "name": "DescribeProducersRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "Topics", "type": "[]TopicRequestData", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
          "about": "The indexes of the partitions to list producers for." }
       ]}
    ]}
  ]
}

Response Schema

Code Block
{
  "apiKey": NN61,
  "type": "response",
  "name": "DescribeProducersResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]TopicResponse", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name" },
      { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+",
        "about": "Each partition in the response.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error code, or 0 if there was no error." },
        { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "0+" },
          { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
          { "name": "LastSequence", "type": "int32", "versions": "0+" },
          { "name": "LastTimestamp", "type": "int64", "versions": "0+" },
	      { "name": "CurrentTxnStartTimestampTxnStartOffset", "type": "int64", "versions": "0+" },
          { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+" },
        ]}
      ]}
    ]}
  ]
}

...

The request includes a filter of the states that the use is interested in. As an example, this allows us to filter only the "Ongoing" transactions. We have also included a filter for the `ProducerId`. This is useful when trying to reverse lookup the `TransactionalId` as we need to do to support the --find-hanging command below.  

Code Block
{
  "apiKey": 1666,
  "type": "request",
  "name": "ListTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "0+",
      "about": "The states of transactions we want to list."
    },
    { "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+",
      "about": "Array of ProducerIds to limit the response to"
    }
  ]
}

Response Schema

Code Block
{
  "apiKey": NN66,
  "type": "response",
  "name": "ListTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+" },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "TransactionalId", "type": "string", "versions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
    ]}
  ]
}

...

The request schema is specified below:

Code Block
{
  "apiKey": NN65,
  "type": "request",
  "name": "DescribeTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "TransactionalIds", "type": "[]string", "versions": "0+" }
    ]}
  ]
}

...

The response schema is specified below:

Code Block
{
  "apiKey": NN65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+" },
        { "name": "TransactionState", "type": "int8", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+" },
        { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
        { "name": "TopicPartitions", "type": "[]TopicData", "versions": "0+", "fields": [
          { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+" }}
    ]}
  ]
}

...

Code Block
interface Admin {

  ListTransactionsResult listTransactions();
  ListTransactionsResult listTransactions(ListTransactionsOptions options);
  
  DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds);
  DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options);

  DescribeProducersResult describeProducers(Collection<TopicPartition> partitions);
  DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options);

  AbortTransactionResult abortTransaction(TransactionInfo info);  
  AbortTransactionResult abortTransaction(TransactionInfo info, AbortTransactionOptions options);  

}

The POJOs above just expose the state from the corresponding APIs. For example:sections below describe the supporting classes in more detail.

Listing Transactions

Code Block
class ListTransactionsOptions extends TransactionInfoAbstractOptions<ListTransactionsOptions> {
  long producerIdListTransactionsOptions setBrokerId(int brokerId);
  intOptionalInt brokerId();
}

class DescribeTransactionsResult {
  KafkaFuture<Map<Integer, Collection<TransactionSummary>> all();
  KafkaFuture<Collection<TransactionSummary>> brokerResult(Integer brokerId);

  static class TransactionSummary {
    final String transactionalId;
    final long producerId;
    final String transactionState;
  }
}

Describing Transactions

Code Block
class DescribeTransactionsOptions extends AbstractOptions<DescribeTransactionsOptions> {
}

class DescribeTransactionsResult {
  KafkaFuture<Map<String, TransactionState>> all();
  KafkaFuture<TransactionState> partitionResult(String transactionalId);

  static class TransactionState {
    final String transactionState;
    final int transactionTimeoutMs;
    final long transactionStartTimeMs;
    final long producerId;
    final int producerEpoch;
    final List<TopicPartition> addedPartitions;
  }
}

Describing Producers

Code Block
class DescribeProducersOptions extends AbstractOptions<DescribeProducersOptions> {
  DescribeProducersOptions setBrokerId(int brokerId);
  OptionalInt brokerId();
}

class DescribeProducersResult {
  KafkaFuture<Map<TopicPartition, PartitionProducerState>> all();
  KafkaFuture<PartitionProducerState> partitionResult(TopicPartition partition);

  static class PartitionProducerState {
    final List<ProducerState> activeProducers;
  }

  static class ProducerState {
    final long producerId;
    final int producerEpoch;
    final int lastSequence;
    final long lastTimestamp;
    final OptionalLong currentTransactionStartOffset;
    final int coordinatorEpoch;
  }
}

Aborting Transactions

Code Block
class AbortTransactionOptions extends AbstractOptions<AbortTransactionOptions> {
}

abstract class TransactionInfo {
  final TopicPartition topicPartition;
  final long ProducerId;
  final int producerEpoch;

  // One and only one of the following fields must be set
  final OptionalLong transactionStartOffset;
  final OptionalInt coordinatorEpoch;
}

class AbortTransactionResult {
  KafkaFuture<Void> result();
}

producerEpoch;
  long transactionStartOffset;
  TopicPartition topicPartition;
}

Metrics

As discussed above, we will add  new gauge `PartitionsWithLateTransactionsCount`, which is tracked in the `ReplicaManager` group (along with `UnderMinIsrPartitionCount`, `OfflineReplicaCount`, etc.). This metric will record the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms` (plus 5 minutes).

...

  • --list: List transactions known to a transaction coordinator
  • --find-hanging: Find hanging transactions on a specific broker
  • --describe: Describe details specific to a particular transactionalId
  • --describe-producers: Describe active producer state for a given topic partition
  • --abort: Forcefully abort a transaction

Listing Transactions

The --list command allows a user to list transaction state from the transaction coordinators. It supports an optional --broker flag to select a specific node.

...

Code Block
> kafka-transactions.sh --find-hanging --broker 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	            Duration(s)
foo		0			134132		23				550			15983835372020-09-17T23:02:23Z		3000030

# Limit the search to a specific topic partition
> kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	            Duration(s)
foo		0			134132		23				550				15983835372020-09-17T23:02:23Z		30000

Describing Transactions

The --describe command can be used to describe the state of a transaction when the TransactionalId is known. It uses the `DescribeTransactions` API.

Code Block
# Describe transaction state for my-txn-id
> kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092
ProducerId ProducerEpoch Coordinator State 	 TimeoutMs TopicPartitions
134132     24            0           Ongoing 5000      foo-0,foo-1

Describing Producers

The --describe-producers command is used to describe the producer state of a specific topic partition. Internally, it is a direct call to the `DescribeProducers` API. It takes an optional --broker argument to specify a specific replica.

Code Block
# Describe producers on the leader of a topic partition
> kafka-transaction.sh --describe-producers --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId	ProducerEpoch	StartOffset	LastTimestamp		        Duration(s)	CoordinatorEpoch
134132		23				550			15983835372020-09-17T23:02:23Z		3000030   	    77
134938		5				439			15983835672020-09-17T23:01:23Z		2997090   	    64

# Describe producers for a specific replica of a topic partition
> kafka-transaction.sh --describe-producers --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId	ProducerEpoch	StartOffset	LastTimestamp		        Duration(s)	CoordinatorEpoch
134132		23				550			15983835372020-09-17T23:02:23Z		3000030    	    77
134938		5				439			15983835672020-09-17T23:01:23Z		2997090   	    64

Aborting Transactions

The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:

...

Code Block
> kafka-transactions.sh --abort \
  --topic foo  \
  --partition 0  \
  --start-offset 550 \
  --bootstrap-server localhost:9092

> kafka-transactions.sh --abort \
  --topic foo  \
  --partition 0  \
  --producer-id 134132 \
  --producer-epoch 23 \
  --coordinator-epoch 15 \
  --bootstrap-server localhost:9092

Note that we are not providing an option to commit a transaction through this API.

...