...
Ideally, we would prefer a metric which takes transaction timeout into account. Although partition leaders do not know the transaction timeout associated with each transactional write, they do know the upper bound indicated by `transaction.max.timeout.ms
`. Any transaction which has remained open longer than this timeout ought to be treated as suspicious. We propose two new metrics:
...
to add a new metric
...
`PartitionsWithLateTransactionsCount`
, which tracks the count of the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms
`.
...
We expect that users will alert on positive values of `PartitionsWithLateTransactionsCount`
. They can then use `MaxActiveTransactionDuration`
or one of the APIs described below to find the topic partition.This gives users a simple alert criteria.
Note that it is possible to have transaction timeouts which are exactly equal to `transaction.max.timeout.ms`
. To account for some extra latency and avoid spurious alerts, we will add 5 minutes of padding before a transaction is counted in `PartitionsWithLateTransactionsCount`.
...
The DescribeProducers
API will require `Describe
Read` permission on the associated topic. It is expected to be sent to any replica of a topic partition leaders.
Request Schema
This request is expected to be sent to partition leaders.
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DescribeProducersResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]TopicResponse", "versions": "0+", "about": "Each topic in the response.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name" }, { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+", "about": "Each partition in the response.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The partition error code, or 0 if there was no error." }, { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "0+" }, { "name": "ProducerEpoch", "type": "int32", "versions": "0+" }, { "name": "LastSequence", "type": "int32", "versions": "0+" }, { "name": "LastTimestamp", "type": "int64", "versions": "0+" }, { "name": "CurrentTxnStartTimestamp", "type": "int64", "versions": "0+" }, ]} { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+" }, ]} ]} ]} ] } |
Possible errors:
- NOT_LEADER_OR_FOLLOWER: If the replica receiving the request is not the current leader of a topic partition
- TOPIC_AUTHORIZATION_FAILED: If the user does not have Describe access on a requested topic.
- UNKNOWN_TOPIC_OR_PARTITION: If either the topic or partition is not known to exist.
...
The request includes a filter of the states that the use is interested in. As an example, this allows us to filter only the "Ongoing" transactions. We have also included a filter for the `ProducerId`
. This is useful when trying to reverse lookup the `TransactionalId
` as we need to do to support the --find-hanging
command below.
Code Block |
---|
{ "apiKey": 16, "type": "request", "name": "ListTransactionsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "StatesFilter", "type": "[]string", "versions": "0+", "about": "The states of transactions we want to list." }, ] } |
Response Schema
{ "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+",
"about": "Array of ProducerIds to limit the response to"
}
]
} |
Response Schema
Code Block |
---|
{
" |
Code Block |
{ "apiKey": NN, "type": "response", "name": "DescribeTransactionsResponseListTransactionsResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+" }, { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "0+" }, { "name": "ProducerId", "type": "int64", "versions": "0+" }, { "name": "TransactionState", "type": "string", "versions": "0+" }, ]} ] } |
...
- NOT_COORDINATOR: If the coordinator receiving the request does not own a transactionalId in the request.
- COORDINATOR_LOAD_IN_PROGRESS: The coordinator is in the process of loading its state.
- COORDINATOR_NOT_AVAILABLE: If the coordinator receiving the request is being shutdown.
- TRANSACTIONAL_ID_NOT_FOUND (NEW): New error code which indicates that the requested
TransactionalId
could not be found - TRANSACTIONAL_ID_AUTHORIZATION_FAILED: If the principal does not have Describe permission one of the transactionalIds in the request.
...
Code Block |
---|
{ "apiKey": 27, "type": "request", "name": "WriteTxnMarkersRequest", "validVersions": "0-1", "flexibleVersions": "1+", "fields": [ { "name": "Markers", "type": "[]WritableTxnMarker", "versions": "0+", "about": "The transaction markers to be written.", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId", "about": "The current producer ID."}, { "name": "ProducerEpoch", "type": "int16", "versions": "0+", "about": "The current epoch associated with the producer ID." }, { "name": "TransactionResult", "type": "bool", "versions": "0+", "about": "The result of the transaction to write to the partitions (false = ABORT, true = COMMIT)." }, { "name": "Topics", "type": "[]RequestTopic", "versions": "0+", "about": "Each topic that we want to write transaction marker(s) for.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "PartitionIndexes", "type": "[]int32", "versions": "0", "about": "The indexes of the partitions to write transaction markers for." } // NEW: The array below replaces "PartitionIndexes" and adds "TxnStartOffset" // as an optional field, which will only be included by the AdminClient { "name": "Partitions", "type": "[]RequestPartition", "versions": "1+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "1+", "about": "The partition index." }, { "name": "TxnStartOffset", "type": "int64", "versions": "1+", "taggedVersions": "1+", "tag": 0, "about": "The start offset of the transaction that should be completed" } ]} ]}, { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+", "about": "Epoch associated with the transaction state partition hosted by this transaction coordinator" } ]} ] } |
...
There are no changes to the response schema, but it will be bumped. Note that we are also enabling flexible version support. Note also that INVALID_TXN_STATE is now a possible error code when the `TxnStartOffset
` is included in the request.
Code Block |
---|
{
|
Code Block |
{
"apiKey": 27,
"type": "response",
"name": "WriteTxnMarkersResponse",
"validVersions": "0-1",
"flexibleVersions": "1+",
"fields": [
{ "name": "Markers", "type": "[]WritableTxnMarkerResult", "versions": "0+",
"about": "The results for writing makers.", "fields": [
{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
"about": "The current producer ID in use by the transactional ID." },
{ "name": "Topics", "type": "[]TopicResult", "versions": "0+",
"about": "The results by topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
"about": "The results by partition.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }
]}
]}
]}
]
} |
...
- NOT_LEADER_OR_FOLLOWER: If the replica receiving the request is not the current leader of a topic partition
- CLUSTER_AUTHORIZATION_FAILED: If the user does not have
ClusterAction
permission. - UNKNOWN_TOPIC_OR_PARTITION: If either the topic or partition is not known to exist.
- INVALID_PRODUCER_EPOCH: If no cluster epoch is provided and the provided epoch does not exactly match the current
- INVALID_TXN_STATE (NEW): If there is no transaction in progress at the indicated offset
...
As discussed above, we will add a few new gauges to help detect hanging transactions. The table below summarizes the metrics that are relevant to detect hanging transactions including LastStableOffsetLag
, which already exists.
...
add new gauge `PartitionsWithLateTransactionsCount`
, which is tracked in the `ReplicaManager` group (along with `UnderMinIsrPartitionCount`, `OfflineReplicaCount`, etc.). This metric will record the number of partitions which have open transactions with durations exceeding `transaction.max.timeout.ms
` (plus 5 minutes).
Command Line Tool
There will be a new tool: `kafka-transactions.sh
`. The following commands will be supported:
--list
: List transactions known to a transaction coordinator--find-hanging
: Find hanging transactions on a specific broker--describe
: Describe details specific to a particular transactionalId--describe-producers
: Describe active producer state for a given topic partition--abort
: Forcefully abort a transaction
...
The --list
command allows a user to list transaction state from the transaction coordinators. It supports an optional `–coordinator` flag optional --broker
flag to select a specific node.
Code Block |
---|
> kafka-transactions.sh --list --bootstrap-server localhost:9092 TransactionalId ProducerId Coordinator State my-txn-id1 134132 0 Ongoing my-txn-id2 134147 0 Ongoing my-txn-id3 134191 1 PrepareCommit my-txn-id4 134193 2 CompleteAbort > kafka-transactions.sh --list --coordinatorbroker 0 --bootstrap-server localhost:9092 TransactionalId ProducerId Coordinator State my-txn-id1 134132 0 Ongoing my-txn-id2 134147 0 Ongoing |
...
Code Block |
---|
# Describe transaction state for my-txn-id > kafka-transaction.sh --describe --transactional-id my-txn-id --bootstrap-server localhost:9092 ProducerId ProducerEpoch Coordinator State TimeoutMs TopicPartitions 134132 24 0 Ongoing 5000 foo-0,foo-1 |
...
Describing Producers
The --describe-abort producers command can be is used to describe the producer state of a specific topic partition. Internally, it is a direct call to the `DescribeProducers` API. It takes an optional --broker
argument to specify a specific replica.
Code Block |
---|
# Describe producers on the leader of a topic partition
> kafka-transaction.sh --describe-producers --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId ProducerEpoch StartOffset LastTimestamp Duration CoordinatorEpoch
134132 23 550 1598383537 30000 77
134938 5 439 1598383567 29970 64
# Describe producers for a specific replica of a topic partition
> kafka-transaction.sh --describe-producers --broker 0 --topic foo --partition 0 --bootstrap-server localhost:9092
ProducerId ProducerEpoch StartOffset LastTimestamp Duration CoordinatorEpoch
134132 23 550 1598383537 30000 77
134938 5 439 1598383567 29970 64 |
Aborting Transactions
The --abort command can be used to abort an ongoing transaction for a topic partition. It abort an ongoing transaction for a topic partition. It has several required arguments:
...
For compatibility with older brokers, we also support the ability to directly specify additional parameters. Although we cannot use the new APIs in this case to validate the state, this is still better than nothing for users who have hit this problem. In this case, users have no choice but to do the analysis themselves by dumping the log of a suspected topic partition. In this case, we the command takes the following arguments:
--topic
--partition
--producer-id
--producer-epoch
--coordinator-epoch
...