Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Set api key number is new RPCs

Table of Contents

Status

Current state: Under DiscussionAdopted

Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg111066.html

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10442

...

This request is expected to be sent to partition leaders.

Code Block
{
  "apiKey": NN61,
  "type": "request",
  "name": "DescribeProducersRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "Topics", "type": "[]TopicRequestData", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
          "about": "The indexes of the partitions to list producers for." }
       ]}
    ]}
  ]
}

Response Schema

Code Block
{
  "apiKey": NN61,
  "type": "response",
  "name": "DescribeProducersResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]TopicResponse", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name" },
      { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+",
        "about": "Each partition in the response.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error code, or 0 if there was no error." },
        { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "0+" },
          { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
          { "name": "LastSequence", "type": "int32", "versions": "0+" },
          { "name": "LastTimestamp", "type": "int64", "versions": "0+" },
	      { "name": "CurrentTxnStartTimestampTxnStartOffset", "type": "int64", "versions": "0+" },
          { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+" },
        ]}
      ]}
    ]}
  ]
}

...

The request includes a filter of the states that the use is interested in. As an example, this allows us to filter only the "Ongoing" transactions. We have also included a filter for the `ProducerId`. This is useful when trying to reverse lookup the `TransactionalId` as we need to do to support the --find-hanging command below.  

Code Block
{
  "apiKey": 1666,
  "type": "request",
  "name": "ListTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "0+",
      "about": "The states of transactions we want to list."
    },
    { "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+",
      "about": "Array of ProducerIds to limit the response to"
    }
  ]
}

Response Schema

Code Block
{
  "apiKey": NN66,
  "type": "response",
  "name": "ListTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+" },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "TransactionalId", "type": "string", "versions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
    ]}
  ]
}

...

The request schema is specified below:

Code Block
{
  "apiKey": NN65,
  "type": "request",
  "name": "DescribeTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "TransactionalIds", "type": "[]string", "versions": "0+" }
    ]}
  ]
}

...

The response schema is specified below:

Code Block
{
  "apiKey": NN65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+" },
        { "name": "TransactionState", "type": "int8", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+" },
        { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
        { "name": "TopicPartitions", "type": "[]TopicData", "versions": "0+", "fields": [
          { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+" }}
    ]}
  ]
}

...

Code Block
> kafka-transactions.sh --find-hanging --broker 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	            Duration(s)
foo		0			134132		23				550			15983835372020-09-17T23:02:23Z		3000030

# Limit the search to a specific topic partition
> kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	            Duration(s)
foo		0			134132		23				550			15983835372020-09-17T23:02:23Z		30000

Describing Transactions

...

Code Block
# Describe producers on the leader of a topic partition
> kafka-transaction.sh --describe-producers --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId	ProducerEpoch	StartOffset	LastTimestamp		        Duration(s)	CoordinatorEpoch
134132		23				550			15983835372020-09-17T23:02:23Z		3000030   	    77
134938		5				439			15983835672020-09-17T23:01:23Z		2997090   	    64

# Describe producers for a specific replica of a topic partition
> kafka-transaction.sh --describe-producers --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId	ProducerEpoch	StartOffset	LastTimestamp		        Duration(s)	CoordinatorEpoch
134132		23				550			15983835372020-09-17T23:02:23Z		3000030    	    77
134938		5				439			15983835672020-09-17T23:01:23Z		2997090   	    64

Aborting Transactions

The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:

...