Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added section on transaction authorization

...

We only need to retain the PID of the completed transaction along with a timestamp, so we can eventually remove the TransactionalId->PID mapping for the producer. See the Expiring PIDs section below.

Authorization 

 

It is desirable to control access to the transaction log to ensure that clients cannot intentionally or unintentionally interfere with each other’s transactions. In this work, we introduce a new resource type to represent the TransactionalId tied to transactional producers, and an associated error code for authorization failures.

Code Block
languagescala
case object ProducerTransactionalId extends ResourceType {
 val name = "ProducerTransactionalId"
 val errorCode = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code
}

The transaction coordinator handles each of the following requests: InitPIDBeginTxnAddPartitionsToTxnAddOffsetsToTxn, and EndTxn. Each request to the transaction coordinator includes the producer’s TransactionalId and can be used for authorization. Each of these requests mutates the transaction state of the producer, so they all require Write access to the corresponding ProducerTransactionalId resource.

Discussion on limitations of coordinator authorization

Although we can control access to the transaction log using the TransactionalId, we cannot prevent a malicious producer from hijacking the PID of another producer and writing data to the log. This would allow the attacker to either insert bad data into an active transaction or to fence the authorized producer by forcing an epoch bump. It is not possible for the malicious producer to finish a transaction, however, because the brokers do not allow clients to write control messages. Note also that the malicious producer would have to have Write permission to the same set of topics used by the legitimate producer, so it is still possible to use topic ACLs combined with TransactionalId ACLs to protect sensitive topics. Future work can explore protecting the binding between TransactionalId and PID (e.g. through the use of message authentication codes).

Message Format

In order to add new fields such as PID and epoch into the produced messages for transactional messaging and de-duplication, we need to change Kafka’s message format and bump up its version (i.e. the “magic byte”). More specifically, we need to add the following fields into each message:

...

As the batch size increases, the overhead of the new format grows smaller compared to the old format because of the eliminated redundancy. The overhead per message in the old format is fixed at 34 bytes. For the new format, the message set overhead is 45 bytes, while per-message overhead ranges from 14 to 25 bytes. This makes it more costly to send individual messages, but space is quickly recovered with even modest batching. For example, assuming a fixed message size of 1K with 100 byte keys, the overhead increases by only 17 bytes for each additional batched message:

 

Batch Size

Old Format Overhead

New Format Overhead

1

34*1 = 34

45 + 1*17 = 62

3

34*3 = 102

45 + 3*17 = 96

10

34*10 = 340

45 + 10*17 = 215

50

34*50 = 1700

45 + 50*17 = 895

100

34*100 = 3400

45 + 100*17 = 1745

 

Compatibility, Deprecation, and Migration Plan

...