Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Add a new tagged field, TransactionLastUpdateTimeMs . Broker will populate this field from txnLastUpdateTimeMs from TransactionState. This field is updated at the broker every time of transaction's state changes.

Code Block
{
  "apiKey": 65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
		// New field to indicate the timestamp when transaction state was last changed
        { "name": "TransactionLastUpdateTimeMs", "type": "int64", "versions": "0+", "tag": 0, "taggedVersions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
        { "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
        { "name": "Topics", "type": "[]TopicData", "versions": "0+",
          "about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.",
          "fields": [
            { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true },
            { "name": "Partitions", "type": "[]int32", "versions": "0+" }
          ]
        }
      ]}
  ]
}

Add transactionLastUpdateTimeMs to TransactionDescription  class that . This object is used to build APIResult  at AdminClient org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler#handleResponse  from DescribeTransactionsResponse 

Code Block
package org.apache.kafka.clients.admin;

public class TransactionDescription {
    ...
    private final OptionalLong transactionLastUpdateTimeMs;
	...

    public TransactionDescription(
        int coordinatorId,
        TransactionState state,
        long producerId,
        int producerEpoch,
        long transactionTimeoutMs,
        OptionalLong transactionStartTimeMs,
		OptionalLong transactionLastUpdateTimeMs,
        Set<TopicPartition> topicPartitions
    ) {
		...
        this.transactionLastUpdateTimeMs = transactionLastUpdateTimeMs;
        ...
    }
	...
 	public OptionalLong transactionLastUpdateTimeMs() {
        return transactionLastUpdateTimeMs;
    }
	...
}

Fixing the TransactionsCommand tool output

TransactionDescription  is further utilized at org.apache.kafka.tools.TransactionsCommand.DescribeTransactionsCommand#execute to build a printable description of the transaction. This method will be changed to calculate transactionDuration  as a difference between current time transactionStartTime  and transactionLastUpdateTimeMs , if state == COMPLETE_COMMIT || state == COMPLETE_ABORT

...