...
Code Block |
---|
interface Admin { ListTransactionsResult listTransactions(); ListTransactionsResult listTransactions(ListTransactionsOptions options); DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds); DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options); DescribeProducersResult describeProducers(Collection<TopicPartition> partitions); DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options) DescribeProducersOptions options); AbortTransactionResult abortTransaction(TransactionInfo info); AbortTransactionResult abortTransaction(TransactionInfo info, AbortTransactionOptions options); } |
The sections below describe the supporting classes in more detail.
Listing Transactions
Code Block |
---|
class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> {
ListTransactionsOptions setBrokerId(int brokerId);
OptionalInt brokerId();
}
class DescribeTransactionsResult {
KafkaFuture<Map<Integer, Collection<TransactionSummary>> all();
KafkaFuture<Collection<TransactionSummary>> brokerResult(Integer brokerId);
static class TransactionSummary {
final String transactionalId;
final long producerId;
final String transactionState;
}
} |
Describing Transactions
Code Block |
---|
class DescribeTransactionsOptions extends AbstractOptions<DescribeTransactionsOptions> {
}
class DescribeTransactionsResult {
KafkaFuture<Map<String, TransactionState>> all();
KafkaFuture<TransactionState> partitionResult(String transactionalId);
static class TransactionState {
final String transactionState;
final int transactionTimeoutMs;
final long transactionStartTimeMs;
final long producerId;
final int producerEpoch;
final List<TopicPartition> addedPartitions;
}
} |
Describing Producers
Code Block |
---|
class DescribeProducersOptions extends AbstractOptions<DescribeProducersOptions> {
DescribeProducersOptions setBrokerId(int brokerId);
OptionalInt brokerId();
}
class DescribeProducersResult {
KafkaFuture<Map<TopicPartition, PartitionProducerState>> all();
KafkaFuture<PartitionProducerState> partitionResult(TopicPartition partition);
static class PartitionProducerState {
final List<ProducerState> activeProducers;
}
static class ProducerState {
final long producerId;
final int producerEpoch;
final int lastSequence;
final long lastTimestamp;
final OptionalLong currentTransactionStartOffset;
final int coordinatorEpoch;
}
} |
Aborting Transactions
Code Block |
---|
class AbortTransactionOptions extends AbstractOptions<AbortTransactionOptions> { } abstract class TransactionInfo { final TopicPartition topicPartition; final long ProducerId; final int producerEpoch; AbortTransactionResult abortTransaction(TransactionInfo info); AbortTransactionResult abortTransaction(TransactionInfo info, AbortTransactionOptions options); } |
The POJOs above just expose the state from the corresponding APIs. For example:
Code Block |
---|
class TransactionInfo { long producerId; int producerEpoch; long transactionStartOffset; TopicPartition topicPartition; }// One and only one of the following fields must be set final OptionalLong transactionStartOffset; final OptionalInt coordinatorEpoch; } class AbortTransactionResult { KafkaFuture<Void> result(); } |
Metrics
As discussed above, we will add new gauge `PartitionsWithLateTransactionsCount`
, which is tracked in the `ReplicaManager` group (along with `UnderMinIsrPartitionCount`, `OfflineReplicaCount`, etc.). This metric will record the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms
` (plus 5 minutes).
...
--list
: List transactions known to a transaction coordinator--find-hanging
: Find hanging transactions on a specific broker--describe
: Describe details specific to a particular transactionalId--describe-producers
: Describe active producer state for a given topic partition--abort
: Forcefully abort a transaction
Listing Transactions
The --list
command allows a user to list transaction state from the transaction coordinators. It supports an optional --broker
flag to select a specific node.
...
Code Block |
---|
> kafka-transactions.sh --find-hanging --broker 0 --bootstrap-server localhost:9092 Topic Partition ProducerId ProducerEpoch StartOffset LastTimestamp Duration foo 0 134132 23 550 1598383537 30000 # Limit the search to a specific topic partition > kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092 Topic Partition ProducerId ProducerEpoch StartOffset LastTimestamp Duration foo 0 134132 23 550 1598383537 30000 |
Describing Transactions
The --describe
command can be used to describe the state of a transaction when the TransactionalId
is known. It uses the `DescribeTransactions
` API.
Code Block |
---|
# Describe transaction state for my-txn-id > kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092 ProducerId ProducerEpoch Coordinator State TimeoutMs TopicPartitions 134132 24 0 Ongoing 5000 foo-0,foo-1 |
Describing Producers
The --describe-producers command is used to describe the producer state of a specific topic partition. Internally, it is a direct call to the `DescribeProducers` API. It takes an optional --broker
argument to specify a specific replica.
Code Block |
---|
# Describe producers on the leader of a topic partition > kafka-transaction.sh --describe-producers --topic foo --partition 0 --bootstrap-server localhost:9092 ProducerId ProducerEpoch StartOffset LastTimestamp Duration CoordinatorEpoch 134132 23 550 1598383537 30000 77 134938 5 439 1598383567 29970 64 # Describe producers for a specific replica of a topic partition > kafka-transaction.sh --describe-producers --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092 ProducerId ProducerEpoch StartOffset LastTimestamp Duration CoordinatorEpoch 134132 23 550 1598383537 30000 77 134938 5 439 1598383567 29970 64 |
Aborting Transactions
The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:
...
Code Block |
---|
> kafka-transactions.sh --abort \
--topic foo \
--partition 0 \
--start-offset 550 \
--bootstrap-server localhost:9092
> kafka-transactions.sh --abort \
--topic foo \
--partition 0 \
--producer-id 134132 \
--producer-epoch 23 \
--coordinator-epoch 15 \
--bootstrap-server localhost:9092
|
Note that we are not providing an option to commit a transaction through this API.
...