You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current stateIn Discussion

Discussion thread: here

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-664 introduced tooling to detect, analyze and resolve hanging Kafka transactions. We propose to enhance this tooling to serve some operationally useful scenarios like reducing the size of response for ListTransactionsRequest and allowing users to build some observability / automation around long running transactions.

Unable to render Jira issues macro, execution error. reported a bug where tooling reports incorrect run duration for completed transactions. This gives us an opportunity to add information about the time of last state change which can be useful to analyze stale transactions.

Public Interfaces

  • ListTransactionsRequest 

Add a new field, DurationFilter  to the ListTransactionsRequest and bump the API's version

{
  "apiKey": 66,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListTransactionsRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "StateFilters", "type": "[]string", "versions": "0+",
      "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned"
    },
    { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", "entityType": "producerId",
      "about": "The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned"
    },
	// Add a DurationFilter field
	{ "name": "DurationFilter", "type": "long", "versions": "1+",
	  "about": "Return trsanactions running longer than this time duration, specified in milliseconds"
	}
  ]
}

Add DurationFilter  to the ListTransactionsOptions  class used by AdminClient  to pass on filters to the Kafka broker.

@InterfaceStability.Evolving
public class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> {
...
// return transactions open for more than this time duration specified in milliseconds
Duration durationFilter;

public ListTransactionsOptions durationFilter(long timeDuration) {
	this.durationFilter = timeDuration;
	return this;
}

public long durationFilter() {
	return this.durationFilter;
}
...
}
  • DescribeTransactionsResponse 

Add a new tagged field, TransactionLastUpdateTimeMs 

{
  "apiKey": 65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
		// New field to indicate the timestamp when transaction state was last changed
        { "name": "TransactionLastUpdateTimeMs", "type": "int64", "versions": "0+", "tag": 0, "taggedVersions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
        { "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
        { "name": "Topics", "type": "[]TopicData", "versions": "0+",
          "about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.",
          "fields": [
            { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true },
            { "name": "Partitions", "type": "[]int32", "versions": "0+" }
          ]
        }
      ]}
  ]
}

Command Line Tool Changes

kafka-transactions.sh --list  command will have a new option, --runningLongerThanMs  to return transactions running longer than this time duration.

Proposed Changes

We propose to add a Duration filter to ListTransactionsOptions in order to list only the transactions older than a certain time duration.

We also propose to add lastUpdateTimestamp to the DescribeTransactionsResponse. This bit can be used to calculate the correct run duration for a completed transaction and can be helpful in analyzing stale transactions.

Compatibility, Deprecation, and Migration Plan

Since we are adding tagged field to DescribeTransactionsResponse , this does not need a version bump and there is no compatibility issue. The enhanced functionality will work only when both AdminClient  and Kafka broker are updated.

We do need to bump API version for ListTransactionsRequest from 0 to 1. In case a new AdminClient is sending durationFilter (greater than 0) to an older broker, ListTransactionsRequest will fail to build at the client side. This will require some check to be made at ListTransactionsRequest.Builder.build(short version) method. A new AdminClient can still generate older version of ListTransactionsRequest when it sets durationFilter to 0.

Rejected Alternatives

An alternative to enhancing these tools is to enable debug logging and parse through coordinator logs to get information like completion time and run duration for a transaction. Its better to enhance the tools (fix in case of DescribeTransactions ) to provide a unified and convenient user experience.

We considered adding DurationFilter  as a tagged field to `ListTransactionsRequest` and not bump the API version. This approach had a down side in terms of usability when a new client is talking to an older broker. New client can send durationFilter  and assume that the returned transactions are running longer than the specified duration. It can further try to build follow up actions on these long running transactions like aborting them. This is dangerous if the broker supports older version of the API and does not recognize the new field. Broker will simply return all transactions. Therefore client can not build automated follow ups on the transactions returned by such ListTransactionsRequest 

  • No labels