Table of Contents |
---|
Status
Current state: Under DiscussionAdopted
Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg111066.html
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
This request is expected to be sent to partition leaders.
Code Block |
---|
{ "apiKey": NN61, "type": "request", "name": "DescribeProducersRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Topics", "type": "[]TopicRequestData", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", "about": "The indexes of the partitions to list producers for." } ]} ]} ] } |
Response Schema
Code Block |
---|
{ "apiKey": NN61, "type": "response", "name": "DescribeProducersResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]TopicResponse", "versions": "0+", "about": "Each topic in the response.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name" }, { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+", "about": "Each partition in the response.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The partition error code, or 0 if there was no error." }, { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "0+" }, { "name": "ProducerEpoch", "type": "int32", "versions": "0+" }, { "name": "LastSequence", "type": "int32", "versions": "0+" }, { "name": "LastTimestamp", "type": "int64", "versions": "0+" }, { "name": "CurrentTxnStartTimestampTxnStartOffset", "type": "int64", "versions": "0+" }, { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+" }, ]} ]} ]} ] } |
...
The request includes a filter of the states that the use is interested in. As an example, this allows us to filter only the "Ongoing" transactions. We have also included a filter for the `ProducerId`
. This is useful when trying to reverse lookup the `TransactionalId
` as we need to do to support the --find-hanging
command below.
Code Block |
---|
{ "apiKey": 1666, "type": "request", "name": "ListTransactionsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "0+", "about": "The states of transactions we want to list." }, { "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+", "about": "Array of ProducerIds to limit the response to" } ] } |
Response Schema
Code Block |
---|
{ "apiKey": NN66, "type": "response", "name": "ListTransactionsResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+" }, { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "0+" }, { "name": "ProducerId", "type": "int64", "versions": "0+" }, { "name": "TransactionState", "type": "string", "versions": "0+" }, ]} ] } |
...
The request schema is specified below:
Code Block |
---|
{ "apiKey": NN65, "type": "request", "name": "DescribeTransactionsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "TransactionalIds", "type": "[]string", "versions": "0+" } ]} ] } |
...
The response schema is specified below:
Code Block |
---|
{ "apiKey": NN65, "type": "response", "name": "DescribeTransactionsResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+" }, { "name": "TransactionalId", "type": "string", "versions": "0+" }, { "name": "TransactionState", "type": "int8", "versions": "0+" }, { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" }, { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" }, { "name": "ProducerId", "type": "int64", "versions": "0+" }, { "name": "ProducerEpoch", "type": "int32", "versions": "0+" }, { "name": "TopicPartitions", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+" }} ]} ] } |
...
Code Block |
---|
interface Admin { ListTransactionsResult listTransactions(); ListTransactionsResult listTransactions(ListTransactionsOptions options); DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds); DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options); DescribeProducersResult describeProducers(Collection<TopicPartition> partitions); DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options); AbortTransactionResult abortTransaction(TransactionInfo info); AbortTransactionResult abortTransaction(TransactionInfo info, AbortTransactionOptions options); } |
The POJOs above just expose the state from the corresponding APIs. For example:sections below describe the supporting classes in more detail.
Listing Transactions
Code Block |
---|
class ListTransactionsOptions extends TransactionInfoAbstractOptions<ListTransactionsOptions> { long producerIdListTransactionsOptions setBrokerId(int brokerId); intOptionalInt brokerId(); } class DescribeTransactionsResult { KafkaFuture<Map<Integer, Collection<TransactionSummary>> all(); KafkaFuture<Collection<TransactionSummary>> brokerResult(Integer brokerId); static class TransactionSummary { final String transactionalId; final long producerId; final String transactionState; } } |
Describing Transactions
Code Block |
---|
class DescribeTransactionsOptions extends AbstractOptions<DescribeTransactionsOptions> {
}
class DescribeTransactionsResult {
KafkaFuture<Map<String, TransactionState>> all();
KafkaFuture<TransactionState> partitionResult(String transactionalId);
static class TransactionState {
final String transactionState;
final int transactionTimeoutMs;
final long transactionStartTimeMs;
final long producerId;
final int producerEpoch;
final List<TopicPartition> addedPartitions;
}
} |
Describing Producers
Code Block |
---|
class DescribeProducersOptions extends AbstractOptions<DescribeProducersOptions> {
DescribeProducersOptions setBrokerId(int brokerId);
OptionalInt brokerId();
}
class DescribeProducersResult {
KafkaFuture<Map<TopicPartition, PartitionProducerState>> all();
KafkaFuture<PartitionProducerState> partitionResult(TopicPartition partition);
static class PartitionProducerState {
final List<ProducerState> activeProducers;
}
static class ProducerState {
final long producerId;
final int producerEpoch;
final int lastSequence;
final long lastTimestamp;
final OptionalLong currentTransactionStartOffset;
final int coordinatorEpoch;
}
} |
Aborting Transactions
Code Block |
---|
class AbortTransactionOptions extends AbstractOptions<AbortTransactionOptions> { } abstract class TransactionInfo { final TopicPartition topicPartition; final long ProducerId; final int producerEpoch; // One and only one of the following fields must be set final OptionalLong transactionStartOffset; final OptionalInt coordinatorEpoch; } class AbortTransactionResult { KafkaFuture<Void> result(); } producerEpoch; long transactionStartOffset; TopicPartition topicPartition; } |
Metrics
As discussed above, we will add new gauge `PartitionsWithLateTransactionsCount`
, which is tracked in the `ReplicaManager` group (along with `UnderMinIsrPartitionCount`, `OfflineReplicaCount`, etc.). This metric will record the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms
` (plus 5 minutes).
...
--list
: List transactions known to a transaction coordinator--find-hanging
: Find hanging transactions on a specific broker--describe
: Describe details specific to a particular transactionalId--describe-producers
: Describe active producer state for a given topic partition--abort
: Forcefully abort a transaction
Listing Transactions
The --list
command allows a user to list transaction state from the transaction coordinators. It supports an optional --broker
flag to select a specific node.
...
Code Block |
---|
> kafka-transactions.sh --find-hanging --broker 0 --bootstrap-server localhost:9092 Topic Partition ProducerId ProducerEpoch StartOffset LastTimestamp Duration(s) foo 0 134132 23 550 15983835372020-09-17T23:02:23Z 3000030 # Limit the search to a specific topic partition > kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092 Topic Partition ProducerId ProducerEpoch StartOffset LastTimestamp Duration(s) foo 0 134132 23 550 15983835372020-09-17T23:02:23Z 30000 |
Describing Transactions
The --describe
command can be used to describe the state of a transaction when the TransactionalId
is known. It uses the `DescribeTransactions
` API.
Code Block |
---|
# Describe transaction state for my-txn-id > kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092 ProducerId ProducerEpoch Coordinator State TimeoutMs TopicPartitions 134132 24 0 Ongoing 5000 foo-0,foo-1 |
Describing Producers
The --describe-producers command is used to describe the producer state of a specific topic partition. Internally, it is a direct call to the `DescribeProducers` API. It takes an optional --broker
argument to specify a specific replica.
Code Block |
---|
# Describe producers on the leader of a topic partition > kafka-transaction.sh --describe-producers --topic foo --partition 0 --bootstrap-server localhost:9092 ProducerId ProducerEpoch StartOffset LastTimestamp Duration(s) CoordinatorEpoch 134132 23 550 15983835372020-09-17T23:02:23Z 3000030 77 134938 5 439 15983835672020-09-17T23:01:23Z 2997090 64 # Describe producers for a specific replica of a topic partition > kafka-transaction.sh --describe-producers --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092 ProducerId ProducerEpoch StartOffset LastTimestamp Duration(s) CoordinatorEpoch 134132 23 550 15983835372020-09-17T23:02:23Z 3000030 77 134938 5 439 15983835672020-09-17T23:01:23Z 2997090 64 |
Aborting Transactions
The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:
...
Code Block |
---|
> kafka-transactions.sh --abort \
--topic foo \
--partition 0 \
--start-offset 550 \
--bootstrap-server localhost:9092
> kafka-transactions.sh --abort \
--topic foo \
--partition 0 \
--producer-id 134132 \
--producer-epoch 23 \
--coordinator-epoch 15 \
--bootstrap-server localhost:9092
|
Note that we are not providing an option to commit a transaction through this API.
...