...
We only need to retain the PID of the completed transaction along with a timestamp, so we can eventually remove the TransactionalId->PID mapping for the producer. See the Expiring PIDs section below.
Authorization
It is desirable to control access to the transaction log to ensure that clients cannot intentionally or unintentionally interfere with each other’s transactions. In this work, we introduce a new resource type to represent the TransactionalId tied to transactional producers, and an associated error code for authorization failures.
Code Block | ||
---|---|---|
| ||
case object ProducerTransactionalId extends ResourceType {
val name = "ProducerTransactionalId"
val errorCode = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code
} |
The transaction coordinator handles each of the following requests: InitPID, BeginTxn, AddPartitionsToTxn, AddOffsetsToTxn, and EndTxn. Each request to the transaction coordinator includes the producer’s TransactionalId and can be used for authorization. Each of these requests mutates the transaction state of the producer, so they all require Write access to the corresponding ProducerTransactionalId resource.
Discussion on limitations of coordinator authorization
Although we can control access to the transaction log using the TransactionalId, we cannot prevent a malicious producer from hijacking the PID of another producer and writing data to the log. This would allow the attacker to either insert bad data into an active transaction or to fence the authorized producer by forcing an epoch bump. It is not possible for the malicious producer to finish a transaction, however, because the brokers do not allow clients to write control messages. Note also that the malicious producer would have to have Write permission to the same set of topics used by the legitimate producer, so it is still possible to use topic ACLs combined with TransactionalId ACLs to protect sensitive topics. Future work can explore protecting the binding between TransactionalId and PID (e.g. through the use of message authentication codes).
Message Format
In order to add new fields such as PID and epoch into the produced messages for transactional messaging and de-duplication, we need to change Kafka’s message format and bump up its version (i.e. the “magic byte”). More specifically, we need to add the following fields into each message:
...
As the batch size increases, the overhead of the new format grows smaller compared to the old format because of the eliminated redundancy. The overhead per message in the old format is fixed at 34 bytes. For the new format, the message set overhead is 45 bytes, while per-message overhead ranges from 14 to 25 bytes. This makes it more costly to send individual messages, but space is quickly recovered with even modest batching. For example, assuming a fixed message size of 1K with 100 byte keys, the overhead increases by only 17 bytes for each additional batched message:
Batch Size | Old Format Overhead | New Format Overhead |
1 | 34*1 = 34 | 45 + 1*17 = 62 |
3 | 34*3 = 102 | 45 + 3*17 = 96 |
10 | 34*10 = 340 | 45 + 10*17 = 215 |
50 | 34*50 = 1700 | 45 + 50*17 = 895 |
100 | 34*100 = 3400 | 45 + 100*17 = 1745 |
Compatibility, Deprecation, and Migration Plan
...