Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]Draft

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-664 introduced tooling to detect, analyze and resolve hanging Kafka transactions. We propose to enhance this tooling to serve some operationally useful scenarios.

...

https://issues.apache.org/jira/browse/KAFKA-15546 reported a bug where tooling reports incorrect run duration for completed transactions. This gives us an opportunity to add information about the time of last state change which can be useful to analyze stale transactions.

Public Interfaces

  • ListTransactionsRequest 

Add a new tagged field, DurationFilter  to the ListTransactionsRequest 

Code Block
{
  "apiKey": 66,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListTransactionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "StateFilters", "type": "[]string", "versions": "0+",
      "about": "The transaction states to filter by: if empty, all transactions are returned; if non-empty, then only transactions matching one of the filtered states will be returned"
    },
    { "name": "ProducerIdFilters", "type": "[]int64", "versions": "0+", "entityType": "producerId",
      "about": "The producerIds to filter by: if empty, all transactions will be returned; if non-empty, only transactions which match one of the filtered producerIds will be returned"
    },
		{ "name": "DurationFilter", "type": "long", "versions": "0+", "tag": 10000, "taggedVersions": "0+",
			"about": "Return trsanactions running longer than this time duration, specified in milliseconds"
		}
  ]
}

...

  • DescribeTransactionsResponse 

Add a new tagged field, TransactionLastUpdateTimeMs 

Code Block
{
  "apiKey": 65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
        { "name": "TransactionLastUpdateTimeMs", "type": "int64", "versions": "0+", "tag": 10000, "taggedVersions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
        { "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
        { "name": "Topics", "type": "[]TopicData", "versions": "0+",
          "about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.",
          "fields": [
            { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true },
            { "name": "Partitions", "type": "[]int32", "versions": "0+" }
          ]
        }
      ]}
  ]
}

Command Line Tool Changes

kafka-transactions.sh --list  command will have a new option, --runningLongerThanMs  to return transactions running longer than this time duration.

Proposed Changes

We propose to add a Duration filter to ListTransactionsOptions in order to list only the transactions older than a certain time duration.

We also propose to add lastUpdateTimestamp to the DescribeTransactionsResponse. This bit can be used to calculate the correct run duration for a completed transaction and can be helpful in analyzing stale transactions.

Compatibility, Deprecation, and Migration Plan

Given are adding tagged fields to ListTransactionsRequest  and DescribeTransactionsResponse , this does not need a version bump and there is no compatibility issue. The enhanced functionality will work only when both AdminClient  and Kafka broker are updated.

Rejected Alternatives

An alternative to enhancing these tools is to enable debug logging and parse through coordinator logs to get information like completion time and run duration for a transaction. Its better to enhance the tools (fix in case of DescribeTransactions ) to provide a unified and convenient user experience.