...
Although we can control access to the transaction log using the TransactionalId, we cannot prevent a malicious producer from hijacking the PID of another producer and writing data to the log. This would allow the attacker to either insert bad data into an active transaction or to fence the authorized producer by forcing an epoch bump. It is not possible for the malicious producer to finish a transaction, however, because the brokers do not allow clients to write control messages. Note also that the malicious producer would have to have Write permission to the same set of topics used by the legitimate producer, so it is still possible to use topic ACLs combined with TransactionalId ACLs to protect sensitive topics. Future work can explore protecting the binding between TransactionalId and PID (e.g. through the use of message authentication codes).
RPC Protocol Summary
We summarize all the new request / response pairs as well as modified requests in this section.
...
FetchRequest/Response
Sent by the consumer to any partition leaders to fetch messages. We bump the API version to allow the consumer to specify the required isolation level. We also modify the response schema to include the list of aborted transactions included in the range of fetched messages.
Code Block | ||||
---|---|---|---|---|
| ||||
// FetchRequest v4 FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] ReplicaId => int32 MaxWaitTime => int32 MinBytes => int32 TopicName => string Partition => int32 FetchOffset => int64 MaxBytes => int32 IsolationLevel => int8 (READ_COMMITTED | READ_UNCOMMITTED) |
Code Block | ||||
---|---|---|---|---|
| ||||
// FetchResponse v4 FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset AbortedTransactions MessageSetSize MessageSet]] ThrottleTime => int32 TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 LastStableOffset => int64 AbortedTransactions => [PID FirstOffset] PID => int64 FirstOffset => int64 MessageSetSize => int32 |
When the consumer sends a request for an older version, the broker assumes the READ_UNCOMMITTED isolation level and converts the message set to the appropriate format before sending back the response. Hence zero-copy cannot be used. This conversion can be costly when compression is enabled, so it is important to update the client as soon as possible.
We have also added the LSO to the fetch response. In READ_COMMMITED, the consumer will use this to compute lag instead of the high watermark.
ProduceRequest/Response
Sent by the producer to any brokers to produce messages. Instead of allowing the protocol to send multiple message sets for each partition, we modify the schema to allow only one message set for each partition. This allows us to remove the message set size since each message set already contains a field for the size. More importantly, since there is only one message set to be written to the log, partial produce failures are no longer possible. The full message set is either successfully written to the log (and replicated) or it is not.
...
We include the TransactionalId in order to ensure that producers using transactional messages (i.e. those with the transaction bit set in the attributes) are authorized to do so. If the client is not using transactions, this field should be null.
...
Code Block | ||||
---|---|---|---|---|
| ||||
// ProduceRequest v3 ProduceRequest => TransactionalId RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] TransactionalId => nullableString RequiredAcks => int16 Timeout => int32 Partition => int32 MessageSetSize => int32 MessageSet => bytes |
...
Code Block | ||||
---|---|---|---|---|
| ||||
// ProduceResponse v3 ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 Timestamp => int64 ThrottleTime => int32 |
Error codes:
DuplicateSequenceNumber [NEW]
InvalidSequenceNumber [NEW]
ProducerFenced [NEW]
UNSUPPORTED_FOR_MESSAGE_FORMAT
...
Sent by the client to search offsets by timestamp and to find the first and last offsets for a partition. In this proposal, we modify this request to also support retrieval of the last stable offset, which is needed by the consumer to implement seekToEnd() in READ_COMMITTED mode.
Code Block | ||||
---|---|---|---|---|
| ||||
// ListOffsetRequestV2 ListOffsetRequest => ReplicaId [TopicName [Partition Time]] ReplicaId => int32 TopicName => string Partition => int32 Time => int64 |
...
Code Block | ||||
---|---|---|---|---|
| ||||
ListOffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode Timestamp [Offset] Partition => int32 ErrorCode => int16 Timestamp => int64 Offset => int64 |
The schema is exactly the same as version 1, but we now support a new sentinel timestamp in the request (-3) to retrieve the LSO.
FindCoordinatorRequest/Response
Sent by client to any broker to find the corresponding coordinator. This is the same API that was previously used to find the group coordinator, but we have changed the name to reflect the more general usage (there is no group for transactional producers). We bump up the version of the request and add a new field indicating the group type, which can be either Consumer or Txn. Request handling details can be found here.
...
Code Block | ||||
---|---|---|---|---|
| ||||
// v2 FindCoordinatorRequest => TransactionalId CoordinatorType TransactionalId => string CoordinatorType => byte /* 0: consumer, 1: transaction */ |
Code Block | ||||
---|---|---|---|---|
| ||||
FindCoordinatorResponse => ErrorCode Coordinator
ErrorCode => int16
Coordinator => NodeId Host Port
NodeId => int32
Host => string
Port => int32 |
Error codes:
Ok
CoordinatorNotAvailable
The node id is the identifier of the broker. We use the coordinator id to identify the connection to the corresponding broker.
InitPidRequest
...
/Response
Sent by producer to its transaction coordinator to to get the assigned PID, increment its epoch, and fence any previous producers sharing the same TransactionalId. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
InitPidRequest => TransactionalId TransactionalId => String |
...
Code Block | ||||
---|---|---|---|---|
| ||||
InitPIDResponse => Error PID Epoch Error => Int16 PID => Int64 Epoch => Int16 |
Error code:
Ok
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
BeginTxnRequest/Response
...
Sent by producer to its transaction coordinator to to begin a new transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
BeginTxnRequest => TransactionalId PID Epoch TransactionalId => string PID => int64 Epoch => int32 |
Code Block | ||||
---|---|---|---|---|
| ||||
BeginTxnResponse => ErrorCode ErrorCode => int16 |
Error code:
Ok
ProducerFenced
InvalidPidMapping
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
InvalidTxnRequest
AddPartitionsToTxnRequest/Response
...
Sent by producer to its transaction coordinator to add a partition to the current ongoing transaction. Request handling details can be found here.
...
Code Block | ||||
---|---|---|---|---|
| ||||
AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]] TransactionalId => string PID => int64 Epoch => int32 Topic => string Partition => int32 |
Code Block | ||||
---|---|---|---|---|
| ||||
AddPartitionsToTxnResponse => ErrorCode ErrorCode: int16 |
Error code:
Ok
ProducerFenced
InvalidPidMapping
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
InvalidTxnRequest
AddOffsetsToTxnRequest
Sent by the producer to its transaction coordinator to indicate a consumer offset commit operation is called as part of the current ongoing transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupID TransactionalId => string PID => int64 Epoch => int32 ConsumerGroupID => string |
Code Block | ||||
---|---|---|---|---|
| ||||
AddOffsetsToTxnResponse => ErrorCode ErrorCode: int16 Coordinator => NodeId Host Port NodeId => int32 Host => string Port => int32 |
Error code:
Ok
ProducerFenced
InvalidPidMapping
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
InvalidTxnRequest
EndTxnRequest/Response
Sent by producer to its transaction coordinator to prepare committing or aborting the current ongoing transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
EndTxnRequest => TransactionalId PID Epoch Command TransactionalId => string PID => int64 Epoch => int32 Command => boolean (0 means ABORT, 1 means COMMIT) |
Code Block | ||||
---|---|---|---|---|
| ||||
EndTxnResponse => ErrorCode ErrorCode => int16 |
Error code:
Ok
ProducerFenced
InvalidPidMapping
CoordinatorNotAvailable
NotCoordinatorForTransactionalId
InvalidTxnRequest
WriteTxnMarkerRequest/Response
Sent by transaction coordinator to broker to commit the transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
WriteTxnMarkerRequest => PID Epoch Marker CoordinatorEpoch [Topic [Partition]] PID => int64 Epoch => int32 CoordinatorEpoch => int32 Marker => int8 (0 = COMMIT, 1 = ABORT) Topic => string Partition => int32 |
Code Block | ||||
---|---|---|---|---|
| ||||
WriteTxnMarkerResponse => ErrorCode ErrorCode => int16 |
Error code:
Ok
TxnOffsetCommitRequest/Response
...
Sent by transactional producers to consumer group coordinator to commit offsets within a single transaction. Request handling details can be found here.
Note that just like consumers, users will not be exposed to set the retention time explicitly, and the default value (-1) will always be used which lets broker to determine its retention time.
...
Code Block | ||||
---|---|---|---|---|
| ||||
TxnOffsetCommitRequest => ConsumerGroupID PID Epoch RetentionTime OffsetAndMetadata ConsumerGroupID => string PID => int64 Epoch => int32 RetentionTime => int64 OffsetAndMetadata => [TopicName [Partition Offset Metadata]] TopicName => string Partition => int32 Offset => int64 Metadata => string |
...
Code Block | ||||
---|---|---|---|---|
| ||||
TxnOffsetCommitResponse => [TopicName [Partition ErrorCode]]] TopicName => string Partition => int32 ErrorCode => int16 |
...
Error code:
- ProducerFenced
Message Format
...