Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "apiKey": 66,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListTransactionsRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "StateFilters", "type": "[]string", "versions": "0+",
      "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned"
    },
    { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", "entityType": "producerId",
      "about": "The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned"
    },
	// Add a DurationFilter field
	{ "name": "DurationFilter", "type": "long", "versions": "1+",
			  "about": "Return trsanactions running longer than this time duration, specified in milliseconds"
		}
  ]
}

Add DurationFilter  to the ListTransactionsOptions  class used by AdminClient  to pass on filters to the Kafka broker.

Code Block
@InterfaceStability.Evolving
public class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> {
...
// return transactions open for more than this time duration specified in millisceondsmilliseconds
Duration durationFilter;

public ListTransactionsOptions durationFilter(long timeDuration) {
	this.durationFilter = timeDuration;
	return this;
}

public long durationFilter() {
	return this.durationFilter;
}
...
}

...

Code Block
{
  "apiKey": 65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
		// New field to indicate the timestamp when transaction state was last changed
        { "name": "TransactionLastUpdateTimeMs", "type": "int64", "versions": "0+", "tag": 10000, "taggedVersions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
        { "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
        { "name": "Topics", "type": "[]TopicData", "versions": "0+",
          "about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.",
          "fields": [
            { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true },
            { "name": "Partitions", "type": "[]int32", "versions": "0+" }
          ]
        }
      ]}
  ]
}

...

An alternative to enhancing these tools is to enable debug logging and parse through coordinator logs to get information like completion time and run duration for a transaction. Its better to enhance the tools (fix in case of DescribeTransactions ) to provide a unified and convenient user experience.

We considered adding DurationFilter  as a tagged field to `ListTransactionsRequest` and not bump the API version. This approach had a down side in terms of usability when a new client is talking to an older broker. New client can send durationFilter  and assume that the returned transactions are running longer than the specified duration. It can further try to build follow up actions on these long running transactions like aborting them. This is dangerous if the broker supports older version of the API and does not recognize the new field. Broker will simply return all transactions. Therefore client can not build automated follow ups on the transactions returned by such ListTransactionsRequest