...
Add a new tagged field, TransactionLastUpdateTimeMs
. Broker will populate this field from txnLastUpdateTimeMs from TransactionState.
This field is updated at the broker every time of transaction's state changes.
Code Block |
---|
{ "apiKey": 65, "type": "response", "name": "DescribeTransactionsResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+" }, { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" }, { "name": "TransactionState", "type": "string", "versions": "0+" }, { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" }, { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" }, // New field to indicate the timestamp when transaction state was last changed { "name": "TransactionLastUpdateTimeMs", "type": "int64", "versions": "0+", "tag": 0, "taggedVersions": "0+" }, { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" }, { "name": "ProducerEpoch", "type": "int16", "versions": "0+" }, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.", "fields": [ { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true }, { "name": "Partitions", "type": "[]int32", "versions": "0+" } ] } ]} ] } |
Add transactionLastUpdateTimeMs
to TransactionDescription
class that . This object is used to build APIResult
at AdminClient
org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler#handleResponse
from DescribeTransactionsResponse
Code Block |
---|
package org.apache.kafka.clients.admin;
public class TransactionDescription {
...
private final OptionalLong transactionLastUpdateTimeMs;
...
public TransactionDescription(
int coordinatorId,
TransactionState state,
long producerId,
int producerEpoch,
long transactionTimeoutMs,
OptionalLong transactionStartTimeMs,
OptionalLong transactionLastUpdateTimeMs,
Set<TopicPartition> topicPartitions
) {
...
this.transactionLastUpdateTimeMs = transactionLastUpdateTimeMs;
...
}
...
public OptionalLong transactionLastUpdateTimeMs() {
return transactionLastUpdateTimeMs;
}
...
} |
Fixing the TransactionsCommand tool output
TransactionDescription
is further utilized at org.apache.kafka.tools.TransactionsCommand.DescribeTransactionsCommand#execute
to build a printable description of the transaction. This method will be changed to calculate transactionDuration
as a difference between current time transactionStartTime
and transactionLastUpdateTimeMs
, if
state == COMPLETE_COMMIT || state == COMPLETE_ABORT
...