...
Add a new field, DurationFilter
to the ListTransactionsRequest
and bump the API's version to 1
Code Block |
---|
{ "apiKey": 66, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ListTransactionsRequest", "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "StateFilters", "type": "[]string", "versions": "0+", "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned" }, { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", "entityType": "producerId", "about": "The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned" }, // Add a DurationFilter field { "name": "DurationFilter", "type": "long", "versions": "1+", "about": "Return transactions running longer than this time duration, specified in milliseconds" } ] } |
...
Code Block |
---|
public class TransactionDescription { ... private final OptionalLong transactionLastUpdateTimeMs; ... public TransactionDescription( int coordinatorId, TransactionState state, long producerId, int producerEpoch, long transactionTimeoutMs, OptionalLong transactionStartTimeMs, OptionalLong transactionLastUpdateTimeMs, Set<TopicPartition> topicPartitions ) { ... this.transactionLastUpdateTimeMs = transactionLastUpdateTimeMs; ... } ... public OptionalLong transactionLastUpdateTimeMs() { return transactionLastUpdateTimeMs; } ... } |
Fixing the TransactionsCommand tool
TransactionDescription
is further utilized at org.apache.kafka.tools.TransactionsCommand.DescribeTransactionsCommand#execute
to build a printable description of the transaction. This method will be changed to calculate transactionDuration
as a difference between current time and transactionLastUpdateTimeMs
, if
state == COMPLETE_COMMIT || state == COMPLETE_ABORT
Command Line Tool Changes
...