Table of Contents |
---|
Status
Current state: Under DiscussionAdopted
Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg111066.html
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
This request is expected to be sent to partition leaders.
Code Block |
---|
{ "apiKey": NN61, "type": "request", "name": "DescribeProducersRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Topics", "type": "[]TopicRequestData", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", "about": "The indexes of the partitions to list producers for." } ]} ]} ] } |
Response Schema
Code Block |
---|
{ "apiKey": NN61, "type": "response", "name": "DescribeProducersResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]TopicResponse", "versions": "0+", "about": "Each topic in the response.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name" }, { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+", "about": "Each partition in the response.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The partition error code, or 0 if there was no error." }, { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "0+" }, { "name": "ProducerEpoch", "type": "int32", "versions": "0+" }, { "name": "LastSequence", "type": "int32", "versions": "0+" }, { "name": "LastTimestamp", "type": "int64", "versions": "0+" }, { "name": "TxnStartOffset", "type": "int64", "versions": "0+" }, { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+" }, ]} ]} ]} ] } |
...
The request includes a filter of the states that the use is interested in. As an example, this allows us to filter only the "Ongoing" transactions. We have also included a filter for the `ProducerId`
. This is useful when trying to reverse lookup the `TransactionalId
` as we need to do to support the --find-hanging
command below.
Code Block |
---|
{ "apiKey": 1666, "type": "request", "name": "ListTransactionsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "0+", "about": "The states of transactions we want to list." }, { "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+", "about": "Array of ProducerIds to limit the response to" } ] } |
Response Schema
Code Block |
---|
{ "apiKey": NN66, "type": "response", "name": "ListTransactionsResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+" }, { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "0+" }, { "name": "ProducerId", "type": "int64", "versions": "0+" }, { "name": "TransactionState", "type": "string", "versions": "0+" }, ]} ] } |
...
The request schema is specified below:
Code Block |
---|
{ "apiKey": NN65, "type": "request", "name": "DescribeTransactionsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "TransactionalIds", "type": "[]string", "versions": "0+" } ]} ] } |
...
The response schema is specified below:
Code Block |
---|
{ "apiKey": NN65, "type": "response", "name": "DescribeTransactionsResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+" }, { "name": "TransactionalId", "type": "string", "versions": "0+" }, { "name": "TransactionState", "type": "int8", "versions": "0+" }, { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" }, { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" }, { "name": "ProducerId", "type": "int64", "versions": "0+" }, { "name": "ProducerEpoch", "type": "int32", "versions": "0+" }, { "name": "TopicPartitions", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+" }} ]} ] } |
...
Code Block |
---|
> kafka-transactions.sh --find-hanging --broker 0 --bootstrap-server localhost:9092 Topic Partition ProducerId ProducerEpoch StartOffset LastTimestamp Duration(s) foo 0 134132 23 550 15983835372020-09-17T23:02:23Z 3000030 # Limit the search to a specific topic partition > kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092 Topic Partition ProducerId ProducerEpoch StartOffset LastTimestamp Duration(s) foo 0 134132 23 550 15983835372020-09-17T23:02:23Z 30000 |
Describing Transactions
...
Code Block |
---|
# Describe producers on the leader of a topic partition > kafka-transaction.sh --describe-producers --topic foo --partition 0 --bootstrap-server localhost:9092 ProducerId ProducerEpoch StartOffset LastTimestamp Duration(s) CoordinatorEpoch 134132 23 550 15983835372020-09-17T23:02:23Z 3000030 77 134938 5 439 15983835672020-09-17T23:01:23Z 2997090 64 # Describe producers for a specific replica of a topic partition > kafka-transaction.sh --describe-producers --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092 ProducerId ProducerEpoch StartOffset LastTimestamp Duration(s) CoordinatorEpoch 134132 23 550 15983835372020-09-17T23:02:23Z 3000030 77 134938 5 439 15983835672020-09-17T23:01:23Z 2997090 64 |
Aborting Transactions
The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:
...