...
We only need to retain the PID of the completed transaction along with a timestamp, so we can eventually remove the TransactionalId->PID mapping for the producer. See the Expiring PIDs section below.
Authorization
It is desirable to control access to the transaction log to ensure that clients cannot intentionally or unintentionally interfere with each other’s transactions. In this work, we introduce a new resource type to represent the TransactionalId tied to transactional producers, and an associated error code for authorization failures.
...
...
Discussion on limitations of coordinator authorization
Although we can control access to the transaction log using the TransactionalId, we cannot prevent a malicious producer from hijacking the PID of another producer and writing data to the log. This would allow the attacker to either insert bad data into an active transaction or to fence the authorized producer by forcing an epoch bump. It is not possible for the malicious producer to finish a transaction, however, because the brokers do not allow clients to write control messages. Note also that the malicious producer would have to have Write permission to the same set of topics used by the legitimate producer, so it is still possible to use topic ACLs combined with TransactionalId ACLs to protect sensitive topics. Future work can explore protecting the binding between TransactionalId and PID (e.g. through the use of message authentication codes).
RPC Protocol Summary
We summarize all the new request / response pairs as well as modified requests in this section.
FetchRequest/Response
Sent by the consumer to any partition leaders to fetch messages. We bump the API version to allow the consumer to specify the required isolation level. We also modify the response schema to include the list of aborted transactions included in the range of fetched messages.
Code Block | ||||
---|---|---|---|---|
| ||||
// FetchRequest v4
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
ReplicaId => int32
MaxWaitTime => int32
MinBytes => int32
TopicName => string
Partition => int32
FetchOffset => int64
MaxBytes => int32
IsolationLevel => int8 (READ_COMMITTED | READ_UNCOMMITTED) |
Code Block | ||||
---|---|---|---|---|
| ||||
// FetchResponse v4
FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset AbortedTransactions MessageSetSize MessageSet]]
ThrottleTime => int32
TopicName => string
Partition => int32
ErrorCode => int16
HighwaterMarkOffset => int64
LastStableOffset => int64
AbortedTransactions => [PID FirstOffset]
PID => int64
FirstOffset => int64
MessageSetSize => int32 |
When the consumer sends a request for an older version, the broker assumes the READ_UNCOMMITTED isolation level and converts the message set to the appropriate format before sending back the response. Hence zero-copy cannot be used. This conversion can be costly when compression is enabled, so it is important to update the client as soon as possible.
We have also added the LSO to the fetch response. In READ_COMMMITED, the consumer will use this to compute lag instead of the high watermark.
ProduceRequest/Response
Sent by the producer to any brokers to produce messages. Instead of allowing the protocol to send multiple message sets for each partition, we modify the schema to allow only one message set for each partition. This allows us to remove the message set size since each message set already contains a field for the size. More importantly, since there is only one message set to be written to the log, partial produce failures are no longer possible. The full message set is either successfully written to the log (and replicated) or it is not.
We include the TransactionalId in order to ensure that producers using transactional messages (i.e. those with the transaction bit set in the attributes) are authorized to do so. If the client is not using transactions, this field should be null.
Code Block | ||||
---|---|---|---|---|
| ||||
// ProduceRequest v3
ProduceRequest => TransactionalId
RequiredAcks
Timeout
[TopicName [Partition MessageSetSize MessageSet]]
TransactionalId => nullableString
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
MessageSet => bytes |
Code Block | ||||
---|---|---|---|---|
| ||||
// ProduceResponse v3
ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]]
ThrottleTime
TopicName => string
Partition => int32
ErrorCode => int16
Offset => int64
Timestamp => int64
ThrottleTime => int32 |
Error codes:
DuplicateSequenceNumber [NEW]
InvalidSequenceNumber [NEW]
ProducerFenced [NEW]
UNSUPPORTED_FOR_MESSAGE_FORMAT
Note that clients sending version 3 of the produce request MUST use the new message set format. The broker may still down-convert the message to an older format when writing to the log, depending on the internal message format specified.
ListOffsetRequest/Response
Sent by the client to search offsets by timestamp and to find the first and last offsets for a partition. In this proposal, we modify this request to also support retrieval of the last stable offset, which is needed by the consumer to implement seekToEnd() in READ_COMMITTED mode.
Code Block | ||||
---|---|---|---|---|
| ||||
// ListOffsetRequestV2
ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64 |
Code Block | ||||
---|---|---|---|---|
| ||||
ListOffsetResponse => [TopicName [PartitionOffsets]]
PartitionOffsets => Partition ErrorCode Timestamp [Offset]
Partition => int32
ErrorCode => int16
Timestamp => int64
Offset => int64 |
The schema is exactly the same as version 1, but we now support a new sentinel timestamp in the request (-3) to retrieve the LSO.
FindCoordinatorRequest
Sent by client to any broker to find the corresponding coordinator. This is the same API that was previously used to find the group coordinator, but we have changed the name to reflect the more general usage (there is no group for transactional producers). We bump up the version of the request and add a new field indicating the group type, which can be either Consumer or Txn. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
// v2
FindCoordinatorRequest => TransactionalId CoordinatorType
TransactionalId => string
CoordinatorType => byte /* 0: consumer, 1: transaction */ |
Code Block | ||||
---|---|---|---|---|
| ||||
FindCoordinatorResponse => ErrorCode Coordinator
ErrorCode => int16
Coordinator => NodeId Host Port
NodeId => int32
Host => string
Port => int32 |
Error codes:
Ok
CoordinatorNotAvailable
The node id is the identifier of the broker. We use the coordinator id to identify the connection to the corresponding broker.
InitPidRequest
Sent by producer to its transaction coordinator to to get the assigned PID, increment its epoch, and fence any previous producers sharing the same TransactionalId. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
InitPidRequest => TransactionalId
TransactionalId => String |
Code Block | ||||
---|---|---|---|---|
| ||||
InitPIDResponse => Error PID Epoch
Error => Int16
PID => Int64
Epoch => Int16 |
Error code:
Ok
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
BeginTxnRequest/Response
Sent by producer to its transaction coordinator to to begin a new transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
BeginTxnRequest => TransactionalId PID Epoch
TransactionalId => string
PID => int64
Epoch => int32 |
Code Block | ||||
---|---|---|---|---|
| ||||
BeginTxnResponse => ErrorCode
ErrorCode => int16 |
Error code:
Ok
ProducerFenced
InvalidPidMapping
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
InvalidTxnRequest
AddPartitionsToTxnRequest/Response
Sent by producer to its transaction coordinator to add a partition to the current ongoing transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]
TransactionalId => string
PID => int64
Epoch => int32
Topic => string
Partition => int32 |
Code Block | ||||
---|---|---|---|---|
| ||||
AddPartitionsToTxnResponse => ErrorCode
ErrorCode: int16 |
Error code:
Ok
ProducerFenced
InvalidPidMapping
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
InvalidTxnRequest
AddOffsetsToTxnRequest
Sent by the producer to its transaction coordinator to indicate a consumer offset commit operation is called as part of the current ongoing transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupID
TransactionalId => string
PID => int64
Epoch => int32
ConsumerGroupID => string |
Code Block | ||||
---|---|---|---|---|
| ||||
AddOffsetsToTxnResponse => ErrorCode
ErrorCode: int16
Coordinator => NodeId Host Port
NodeId => int32
Host => string
Port => int32 |
Error code:
Ok
ProducerFenced
InvalidPidMapping
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
InvalidTxnRequest
EndTxnRequest/Response
Sent by producer to its transaction coordinator to prepare committing or aborting the current ongoing transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
EndTxnRequest => TransactionalId PID Epoch Command
TransactionalId => string
PID => int64
Epoch => int32
Command => boolean (0 means ABORT, 1 means COMMIT) |
Code Block | ||||
---|---|---|---|---|
| ||||
EndTxnResponse => ErrorCode
ErrorCode => int16 |
Error code:
Ok
ProducerFenced
InvalidPidMapping
CoordinatorNotAvailable
NotCoordinatorForTransactionalId
InvalidTxnRequest
WriteTxnMarkerRequest/Response
Sent by transaction coordinator to broker to commit the transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
WriteTxnMarkerRequest => PID Epoch Marker CoordinatorEpoch [Topic [Partition]]
PID => int64
Epoch => int32
CoordinatorEpoch => int32
Marker => int8 (0 = COMMIT, 1 = ABORT)
Topic => string
Partition => int32 |
Code Block | ||||
---|---|---|---|---|
| ||||
WriteTxnMarkerResponse => ErrorCode
ErrorCode => int16 |
Error code:
Ok
TxnOffsetCommitRequest/Response
Sent by transactional producers to consumer group coordinator to commit offsets within a single transaction. Request handling details can be found here.
Note that just like consumers, users will not be exposed to set the retention time explicitly, and the default value (-1) will always be used which lets broker to determine its retention time.
Code Block | ||||
---|---|---|---|---|
| ||||
TxnOffsetCommitRequest => ConsumerGroupID
PID
Epoch
RetentionTime
OffsetAndMetadata
ConsumerGroupID => string
PID => int64
Epoch => int32
RetentionTime => int64
OffsetAndMetadata => [TopicName [Partition Offset Metadata]]
TopicName => string
Partition => int32
Offset => int64
Metadata => string |
Code Block | ||||
---|---|---|---|---|
| ||||
TxnOffsetCommitResponse => [TopicName [Partition ErrorCode]]]
TopicName => string
Partition => int32
ErrorCode => int16 |
Error code:
- ProducerFenced
Message Format
In order to add new fields such as PID and epoch into the produced messages for transactional messaging and de-duplication, we need to change Kafka’s message format and bump up its version (i.e. the “magic byte”). More specifically, we need to add the following fields into each message:
...