Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Delete bad whitespace introduced by the wiki

...

Although we can control access to the transaction log using the TransactionalId, we cannot prevent a malicious producer from hijacking the PID of another producer and writing data to the log. This would allow the attacker to either insert bad data into an active transaction or to fence the authorized producer by forcing an epoch bump. It is not possible for the malicious producer to finish a transaction, however, because the brokers do not allow clients to write control messages. Note also that the malicious producer would have to have Write permission to the same set of topics used by the legitimate producer, so it is still possible to use topic ACLs combined with TransactionalId ACLs to protect sensitive topics. Future work can explore protecting the binding between TransactionalId and PID (e.g. through the use of message authentication codes).

RPC Protocol Summary

 

We summarize all the new request / response pairs as well as modified requests in this section.

...

 

FetchRequest/Response

 

Sent by the consumer to any partition leaders to fetch messages. We bump the API version to allow the consumer to specify the required isolation level.  We also modify the response schema to include the list of aborted transactions included in the range of fetched messages.

Code Block
languagetext
titleFetchRequest
// FetchRequest v4

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
 ReplicaId => int32
 MaxWaitTime => int32
 MinBytes => int32
 TopicName => string
 Partition => int32
 FetchOffset => int64
 MaxBytes => int32
 IsolationLevel => int8 (READ_COMMITTED | READ_UNCOMMITTED)

 

Code Block
languagetext
titleFetchResponse
// FetchResponse v4

FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset AbortedTransactions MessageSetSize MessageSet]]
 ThrottleTime => int32
 TopicName => string
 Partition => int32
 ErrorCode => int16
 HighwaterMarkOffset => int64
 LastStableOffset => int64
 AbortedTransactions => [PID FirstOffset]
   PID => int64
   FirstOffset => int64
 MessageSetSize => int32

When the consumer sends a request for an older version, the broker assumes the READ_UNCOMMITTED isolation level and converts the message set to the appropriate format before sending back the response. Hence zero-copy cannot be used. This conversion can be costly when compression is enabled, so it is important to update the client as soon as possible.

 

We have also added the LSO to the fetch response. In READ_COMMMITED, the consumer will use this to compute lag instead of the high watermark.

 

ProduceRequest/Response

 

Sent by the producer to any brokers to produce messages. Instead of allowing the protocol to send multiple message sets for each partition, we modify the schema to allow only one message set for each partition. This allows us to remove the message set size since each message set already contains a field for the size. More importantly, since there is only one message set to be written to the log, partial produce failures are no longer possible. The full message set is either successfully written to the log (and replicated) or it is not.

...

We include the TransactionalId in order to ensure that producers using transactional messages (i.e. those with the transaction bit set in the attributes) are authorized to do so. If the client is not using transactions, this field should be null.

 

 

...

Code Block
languagetext
titleProduceRequest
// ProduceRequest v3

ProduceRequest => TransactionalId 
                  RequiredAcks
                  Timeout
                  [TopicName [Partition MessageSetSize MessageSet]]
 TransactionalId => nullableString
 RequiredAcks => int16
 Timeout => int32
 Partition => int32
 MessageSetSize => int32
 MessageSet => bytes

 

...

 

Code Block
languagetext
titleProduceResponse
// ProduceResponse v3
ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]]
                   ThrottleTime
 TopicName => string
 Partition => int32
 ErrorCode => int16
 Offset => int64
 Timestamp => int64
 ThrottleTime => int32

 

Error codes:

 

 

  • DuplicateSequenceNumber [NEW]

  • InvalidSequenceNumber [NEW]

  • ProducerFenced [NEW]

  • UNSUPPORTED_FOR_MESSAGE_FORMAT

...

Sent by the client to search offsets by timestamp and to find the first and last offsets for a partition. In this proposal, we modify this request to also support retrieval of the last stable offset, which is needed by the consumer to implement seekToEnd() in READ_COMMITTED mode.

 

 

 

Code Block
languagetext
titleListOffsetRequest
// ListOffsetRequestV2

ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
 ReplicaId => int32
 TopicName => string
 Partition => int32
 Time => int64

 

 

...

 

 

 

Code Block
languagetext
titleListOffsetResponse
ListOffsetResponse => [TopicName [PartitionOffsets]]
 PartitionOffsets => Partition ErrorCode Timestamp [Offset]
 Partition => int32
 ErrorCode => int16
 Timestamp => int64
 Offset => int64

 

The schema is exactly the same as version 1, but we now support a new sentinel timestamp in the request (-3) to retrieve the LSO.

 

 

FindCoordinatorRequest/Response

 

Sent by client to any broker to find the corresponding coordinator. This is the same API that was previously used to find the group coordinator, but we have changed the name to reflect the more general usage (there is no group for transactional producers). We bump up the version of the request and add a new field indicating the group type, which can be either Consumer or Txn. Request handling details can be found here.

...

 

 

 

Code Block
languagetext
titleFindCoordinatorRequest
// v2
FindCoordinatorRequest => TransactionalId CoordinatorType
 TransactionalId => string
 CoordinatorType => byte /* 0: consumer, 1: transaction */

 

 

 

Code Block
languagetext
titleFindCoordinatorResponse

FindCoordinatorResponse => ErrorCode Coordinator
 ErrorCode => int16
 Coordinator => NodeId Host Port
   NodeId => int32
   Host => string
   Port => int32

Error codes:

 

  • Ok

  • CoordinatorNotAvailable

The node id is the identifier of the broker. We use the coordinator id to identify the connection to the corresponding broker.

InitPidRequest

...

/Response

Sent by producer to its transaction coordinator to to get the assigned PID, increment its epoch, and fence any previous producers sharing the same TransactionalId. Request handling details can be found here.

 

 

 

Code Block
languagetext
titleInitPidRequest
InitPidRequest => TransactionalId
 TransactionalId => String

...

 

 

Code Block
languagetext
titleInitPidResponse
InitPIDResponse => Error PID Epoch
 Error => Int16
 PID => Int64
 Epoch => Int16

 

Error code:

 

 

  • Ok

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

BeginTxnRequest/Response

...

Sent by producer to its transaction coordinator to to begin a new transaction. Request handling details can be found here.

Code Block
languagetext
titleBeginTxnRequest
BeginTxnRequest => TransactionalId PID Epoch
 TransactionalId => string
 PID => int64
 Epoch => int32
Code Block
languagetext
titleBeginTxnResponse
BeginTxnResponse => ErrorCode
 ErrorCode => int16

Error code:

 

  • Ok

  • ProducerFenced

  • InvalidPidMapping

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

  • InvalidTxnRequest

AddPartitionsToTxnRequest/Response

...

Sent by producer to its transaction coordinator to add a partition to the current ongoing transaction. Request handling details can be found here.

...

 

 

 

Code Block
languagetext
titleAddPartitionsToTxnRequest
AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]
 TransactionalId => string
 PID => int64
 Epoch => int32
 Topic => string
 Partition => int32

 

 

 

Code Block
languagetext
titleAddPartitionsToTxnResponse
AddPartitionsToTxnResponse => ErrorCode
 ErrorCode: int16

 

Error code:

 

 

  • Ok

  • ProducerFenced

  • InvalidPidMapping

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

  • InvalidTxnRequest

AddOffsetsToTxnRequest

 

Sent by the producer to its transaction coordinator to indicate a consumer offset commit operation is called as part of the current ongoing transaction. Request handling details can be found here.

 

 

 

Code Block
languagetext
titleAddOffsetsToTxnRequest
AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupID
 TransactionalId => string
 PID => int64
 Epoch => int32
 ConsumerGroupID => string

 

 

 

Code Block
languagetext
titleAddOffsetsToTxnResponse
AddOffsetsToTxnResponse => ErrorCode
 ErrorCode: int16
 Coordinator => NodeId Host Port
   NodeId => int32
   Host => string
   Port => int32

 

Error code:

 

 

  • Ok

  • ProducerFenced

  • InvalidPidMapping

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

  • InvalidTxnRequest

EndTxnRequest/Response

 

Sent by producer to its transaction coordinator to prepare committing or aborting the current ongoing transaction. Request handling details can be found here.

Code Block
languagetext
titleEndTxnRequest
EndTxnRequest => TransactionalId PID Epoch Command
 TransactionalId => string
 PID => int64
 Epoch => int32
 Command => boolean (0 means ABORT, 1 means COMMIT)
Code Block
languagetext
titleEndTxnResponse
EndTxnResponse => ErrorCode
 ErrorCode => int16

Error code:

 

  • Ok

  • ProducerFenced

  • InvalidPidMapping

  • CoordinatorNotAvailable

  • NotCoordinatorForTransactionalId

  • InvalidTxnRequest

WriteTxnMarkerRequest/Response

 

Sent by transaction coordinator to broker to commit the transaction. Request handling details can be found here.

Code Block
languagetext
titleWriteTxnMarkerRequest
WriteTxnMarkerRequest => PID Epoch Marker CoordinatorEpoch [Topic [Partition]]
 PID => int64
 Epoch => int32
 CoordinatorEpoch => int32
 Marker => int8 (0 = COMMIT, 1 = ABORT)
 Topic => string
 Partition => int32
Code Block
languagetext
titleWriteTxnMarkerResponse
WriteTxnMarkerResponse => ErrorCode
 ErrorCode => int16

Error code:

 

  • Ok

TxnOffsetCommitRequest/Response

...

Sent by transactional producers to consumer group coordinator to commit offsets within a single transaction. Request handling details can be found here.

 

Note that just like consumers, users will not be exposed to set the retention time explicitly, and the default value (-1) will always be used which lets broker to determine its retention time.

 

...

 

Code Block
languagetext
titleTxnOffsetCommitRequest
TxnOffsetCommitRequest   => ConsumerGroupID
                           PID
                           Epoch
                           RetentionTime
                           OffsetAndMetadata
 ConsumerGroupID => string
 PID => int64
 Epoch => int32
 RetentionTime => int64
 OffsetAndMetadata => [TopicName [Partition Offset Metadata]]
   TopicName => string
   Partition => int32
   Offset => int64
   Metadata => string

...

Code Block
languagetext
titleTxnOffsetCommitResponse
TxnOffsetCommitResponse => [TopicName [Partition ErrorCode]]]
 TopicName => string
 Partition => int32
 ErrorCode => int16

...

Error code:

 

 

  • ProducerFenced

Message Format

...