Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
interface Admin {

  ListTransactionsResult listTransactions();
  ListTransactionsResult listTransactions(ListTransactionsOptions options);
  
  DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds);
  DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options);

  DescribeProducersResult describeProducers(Collection<TopicPartition> partitions);
  DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options) DescribeProducersOptions options);

  AbortTransactionResult abortTransaction(TransactionInfo info);  
  AbortTransactionResult abortTransaction(TransactionInfo info, AbortTransactionOptions options);  
}

The sections below describe the supporting classes in more detail.

Listing Transactions

Code Block
class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> {
  ListTransactionsOptions setBrokerId(int brokerId);
  OptionalInt brokerId();
}

class DescribeTransactionsResult {
  KafkaFuture<Map<Integer, Collection<TransactionSummary>> all();
  KafkaFuture<Collection<TransactionSummary>> brokerResult(Integer brokerId);

  static class TransactionSummary {
    final String transactionalId;
    final long producerId;
    final String transactionState;
  }
}

Describing Transactions

Code Block
class DescribeTransactionsOptions extends AbstractOptions<DescribeTransactionsOptions> {
}

class DescribeTransactionsResult {
  KafkaFuture<Map<String, TransactionState>> all();
  KafkaFuture<TransactionState> partitionResult(String transactionalId);

  static class TransactionState {
    final String transactionState;
    final int transactionTimeoutMs;
    final long transactionStartTimeMs;
    final long producerId;
    final int producerEpoch;
    final List<TopicPartition> addedPartitions;
  }
}

Describing Producers

Code Block
class DescribeProducersOptions extends AbstractOptions<DescribeProducersOptions> {
  DescribeProducersOptions setBrokerId(int brokerId);
  OptionalInt brokerId();
}

class DescribeProducersResult {
  KafkaFuture<Map<TopicPartition, PartitionProducerState>> all();
  KafkaFuture<PartitionProducerState> partitionResult(TopicPartition partition);

  static class PartitionProducerState {
    final List<ProducerState> activeProducers;
  }

  static class ProducerState {
    final long producerId;
    final int producerEpoch;
    final int lastSequence;
    final long lastTimestamp;
    final OptionalLong currentTransactionStartOffset;
    final int coordinatorEpoch;
  }
}

Aborting Transactions

Code Block
class AbortTransactionOptions extends AbstractOptions<AbortTransactionOptions> {
}

abstract class TransactionInfo {
  final TopicPartition topicPartition;
  final long ProducerId;
  final int producerEpoch;

  AbortTransactionResult abortTransaction(TransactionInfo info);  
  AbortTransactionResult abortTransaction(TransactionInfo info, AbortTransactionOptions options);  

}

The POJOs above just expose the state from the corresponding APIs. For example:

Code Block
class TransactionInfo {
  long producerId;
  int producerEpoch;
  long transactionStartOffset;
  TopicPartition topicPartition;
}// One and only one of the following fields must be set
  final OptionalLong transactionStartOffset;
  final OptionalInt coordinatorEpoch;
}

class AbortTransactionResult {
  KafkaFuture<Void> result();
}

Metrics

As discussed above, we will add  new gauge `PartitionsWithLateTransactionsCount`, which is tracked in the `ReplicaManager` group (along with `UnderMinIsrPartitionCount`, `OfflineReplicaCount`, etc.). This metric will record the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms` (plus 5 minutes).

...

  • --list: List transactions known to a transaction coordinator
  • --find-hanging: Find hanging transactions on a specific broker
  • --describe: Describe details specific to a particular transactionalId
  • --describe-producers: Describe active producer state for a given topic partition
  • --abort: Forcefully abort a transaction

Listing Transactions

The --list command allows a user to list transaction state from the transaction coordinators. It supports an optional --broker flag to select a specific node.

...

Code Block
> kafka-transactions.sh --find-hanging --broker 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	Duration
foo		0			134132		23				550			1598383537		30000

# Limit the search to a specific topic partition
> kafka-transactions.sh --find-hanging --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
Topic	Partition	ProducerId	ProducerEpoch	StartOffset	LastTimestamp	Duration
foo		0			134132		23				550			1598383537		30000

Describing Transactions

The --describe command can be used to describe the state of a transaction when the TransactionalId is known. It uses the `DescribeTransactions` API.

Code Block
# Describe transaction state for my-txn-id
> kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092
ProducerId ProducerEpoch Coordinator State 	 TimeoutMs TopicPartitions
134132     24            0           Ongoing 5000      foo-0,foo-1

Describing Producers

The --describe-producers command is used to describe the producer state of a specific topic partition. Internally, it is a direct call to the `DescribeProducers` API. It takes an optional --broker argument to specify a specific replica.

Code Block
# Describe producers on the leader of a topic partition
> kafka-transaction.sh --describe-producers --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId	ProducerEpoch	StartOffset	LastTimestamp	Duration	CoordinatorEpoch
134132		23				550			1598383537		30000   	77
134938		5				439			1598383567		29970   	64

# Describe producers for a specific replica of a topic partition
> kafka-transaction.sh --describe-producers --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId	ProducerEpoch	StartOffset	LastTimestamp	Duration	CoordinatorEpoch
134132		23				550			1598383537		30000   	77
134938		5				439			1598383567		29970   	64

Aborting Transactions

The --abort command can be used to abort an ongoing transaction for a topic partition. It has several required arguments:

...

Code Block
> kafka-transactions.sh --abort \
  --topic foo  \
  --partition 0  \
  --start-offset 550 \
  --bootstrap-server localhost:9092

> kafka-transactions.sh --abort \
  --topic foo  \
  --partition 0  \
  --producer-id 134132 \
  --producer-epoch 23 \
  --coordinator-epoch 15 \
  --bootstrap-server localhost:9092

Note that we are not providing an option to commit a transaction through this API.

...