Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateIn DiscussionAccepted

Discussion thread: here (Vote thread: here)

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Add a new field, DurationFilter  to the ListTransactionsRequest and bump the API's version to 1

Code Block
{
  "apiKey": 66,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListTransactionsRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "StateFilters", "type": "[]string", "versions": "0+",
      "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned"
    },
    { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", "entityType": "producerId",
      "about": "The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned"
    },
	// Add a DurationFilter field
	{ "name": "DurationFilter", "type": "long", "versions": "1+",
	  "about": "Return transactions running longer than this time duration, specified in milliseconds"
	}
  ]
}

...

Code Block
@InterfaceStability.Evolving
public class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> {
...
// return transactions open for more than this time duration specified in milliseconds
Duration durationFilter;

public ListTransactionsOptions durationFilter(Duration timeDuration) {
	this.durationFilter = timeDuration;
	return this;
}

public Duration durationFilter() {
	return this.durationFilter;
}
...
}
  • ListTransactionsResponse

Version will be bumped to 1 to match request. No changes otherwise.

  • DescribeTransactionsResponse 

Add a new tagged field, TransactionLastUpdateTimeMs to DescribeTransactionsResponse and bump the version to 1.

Broker will populate this field from txnLastUpdateTime txnLastUpdateTimestamp  contained at TransactionMetadata. This field is updated at the broker every time the transaction's state changes.

Code Block
{
  "apiKey": 65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
		// New field to indicate the timestamp when transaction state was last changed
        { "name": "TransactionLastUpdateTimeMs", "type": "int64", "versions": "0+", "tag": 0, "taggedVersions": "0+" 1+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
        { "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
        { "name": "Topics", "type": "[]TopicData", "versions": "0+",
          "about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.",
          "fields": [
            { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true },
            { "name": "Partitions", "type": "[]int32", "versions": "0+" }
          ]
        }
      ]}
  ]
}

Add transactionLastUpdateTimeMs to TransactionDescription  class. This object is used to build APIResult  at org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler#handleResponse  from DescribeTransactionsResponse .

We will also add an overloaded public constructor to this class to incorporate the value for transactionLastUpdateTimeMs  field.

Code Block
package org.apache.kafka.clients.admin;

public class TransactionDescription {
    ...
    private final OptionalLong transactionLastUpdateTimeMs;
	...

    public TransactionDescription(
        int coordinatorId,
        TransactionState state,
        long producerId,
        int producerEpoch,
        long transactionTimeoutMs,
        OptionalLong transactionStartTimeMs,
        Set<TopicPartition> topicPartitions
    ) {
		new TransactionDescription(coordinatorId, state, producerId, producerEpoch, transactionTimeoutMs, transactionStartTimeMs, OptionalLong.empty(), topicPartitions);
    }

    // new overloaded public constructor
    public TransactionDescription(
        int coordinatorId,
        TransactionState state,
        long producerId,
        int producerEpoch,
        long transactionTimeoutMs,
        OptionalLong transactionStartTimeMs,
		OptionalLong transactionLastUpdateTimeMs,
        Set<TopicPartition> topicPartitions
    ) {
		...
        this.transactionLastUpdateTimeMs = transactionLastUpdateTimeMs;
        ...
    }
	...
 	public OptionalLong transactionLastUpdateTimeMs() {
        return transactionLastUpdateTimeMs;
    }
	...
}

...

TransactionDescription  is further utilized at org.apache.kafka.tools.TransactionsCommand.DescribeTransactionsCommand#execute to build a printable description of the transaction. This method will be changed to calculate transactionDuration  as a difference between transactionStartTime  and transactionLastUpdateTimeMs , if state == COMPLETE_COMMIT || state == COMPLETE_ABORT

  • DescribeTransactionsRequest

Version will be bumped to 1 to match response. No changes otherwise.

Command Line Tool Changes

...

If TransactionLastUpdateTimeMs field is present in DescribeTransactionsResponse, transaction duration (for completed transactions) will be printed as the difference of TransactionLastUpdateTimeMs  and transactionStartTime. On the other hand, if this field is not present, transaction duration value cannot be determined correctly (for completed transactions) and we will print -1.

...

Compatibility, Deprecation, and Migration Plan

Since we are adding tagged field to DescribeTransactionsResponse , this does not need a version bump and there is no compatibility issue. The enhanced functionality will work only when both AdminClient  and Kafka broker are updated.

We do need to We will bump API version for ListTransactionsRequest and DescribeTransactionsResponse from 0 to 1.

When using a new AdminClient to send durationFilter (value greater than 0) to an older broker, AdminClient will fail to build the ListTransactionsRequest and throw an UnsupportedVersionException. This check will be made at ListTransactionsRequest.Builder.build(short version) method. A new AdminClient can still generate older version of ListTransactionsRequest when user sets durationFilter to 0 (or does not set a durationFilter).

...