...
Add a new field, DurationFilter
to the ListTransactionsRequest
and bump the API's version to 1
Code Block |
---|
{ "apiKey": 66, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ListTransactionsRequest", "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "StateFilters", "type": "[]string", "versions": "0+", "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned" }, { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", "entityType": "producerId", "about": "The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned" }, // Add a DurationFilter field { "name": "DurationFilter", "type": "long", "versions": "1+", "about": "Return transactions running longer than this time duration, specified in milliseconds" } ] } |
...
Code Block |
---|
@InterfaceStability.Evolving public class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> { ... // return transactions open for more than this time duration specified in milliseconds Duration durationFilter; public ListTransactionsOptions durationFilter(Duration timeDuration) { this.durationFilter = timeDuration; return this; } public Duration durationFilter() { return this.durationFilter; } ... } |
ListTransactionsResponse
Version will be bumped to 1 to match request. No changes otherwise.
DescribeTransactionsResponse
Add a new field, TransactionLastUpdateTimeMs to DescribeTransactionsResponse and bump the API version to version 1
.
Broker will populate this field from txnLastUpdateTime contained at TransactionMetadata.
This field is updated at the broker every time the transaction's state changes.
...
TransactionDescription
is further utilized at org.apache.kafka.tools.TransactionsCommand.DescribeTransactionsCommand#execute
to build a printable description of the transaction. This method will be changed to calculate transactionDuration
as a difference between transactionStartTime
and transactionLastUpdateTimeMs
, if
state == COMPLETE_COMMIT || state == COMPLETE_ABORT
DescribeTransactionsRequest
Version will be bumped to 1 to match response. No changes otherwise.
Command Line Tool Changes
...
Compatibility, Deprecation, and Migration Plan
We do need to will bump API version for ListTransactionsRequest and DescribeTransactionsResponse from 0 to 1. Correspondingly, versions for ListTransactionsResponse and DescribeTransactionsRequest will also need to be bumped, although there are no changes in those message.
When using a new AdminClient to send durationFilter (greater than 0) to an older broker, AdminClient will fail to build the ListTransactionsRequest and throw an UnsupportedVersionException. This check will be made at ListTransactionsRequest.Builder.build(short version) method. A new AdminClient can still generate older version of ListTransactionsRequest when user sets durationFilter to 0 (or does not set a durationFilter).
...