Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

KIP-664 introduced tooling to detect, analyze and resolve hanging Kafka transactions. We propose to enhance this tooling to serve some operationally useful scenarios .KIP-939 will be introducing Kafka’s support for distributed two phase commit transactions. Unlike regular transactional writes, Kafka’s transaction coordinator does not enforce a maximum time duration after which it will abort a distributed transaction. This opens up a set of transactions that can legitimately run beyond the maximum configured time limit. Tooling should be enhanced to provide detection for such like reducing the size of response for ListTransactionsRequest and allowing users to build some observability / automation around long running transactions.

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15546
reported a bug where tooling reports incorrect run duration for completed transactions. This gives us an opportunity to add information about the time of last state change which can be useful to analyze stale transactions.

...

Code Block
{
  "apiKey": 65,
  "type": "response",
  "name": "DescribeTransactionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
        { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" },
        { "name": "TransactionState", "type": "string", "versions": "0+" },
        { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
        { "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
		// New field to indicate the timestamp when transaction state was last changed
        { "name": "TransactionLastUpdateTimeMs", "type": "int64", "versions": "0+", "tag": 100000, "taggedVersions": "0+" },
        { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
        { "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
        { "name": "Topics", "type": "[]TopicData", "versions": "0+",
          "about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.",
          "fields": [
            { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true },
            { "name": "Partitions", "type": "[]int32", "versions": "0+" }
          ]
        }
      ]}
  ]
}

...