[This KIP proposal is a joint work between Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram, and guozhang Wang]
Table of Contents |
---|
Status
Current state: Adopted
...
An Example Application
Here is an a simple application which demonstrates the use of the APIs introduced above.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class KafkaTransactionsExample { public static void main(String args[]) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); // Note that the ‘transactional.id’ configuration _must_ be specified in the // producer config in order to use transactions. KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); // We need to initialize transactions once per producer instance. To use transactions, // it is assumed that the application id is specified in the config with the key // transactional.id. // // This method will recover or abort transactions initiated by previous instances of a // producer with the same app id. Any other transactional messages will report an error // if initialization was not performed. // // The response indicates success or failure. Some failures are irrecoverable and will // require a new producer instance. See the documentation for TransactionMetadata for a // list of error codes. producer.initTransactions(); while(true) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); if (!records.isEmpty()) { // Start a new transaction. This will begin the process of batching the consumed // records as well // as an records produced as a result of processing the input records. // // We need to check the response to make sure that this producer is able to initiate // a new transaction. producer.beginTransaction(); // Process the input records and send them to the output topic(s). List<ProducerRecord<String, String>> outputRecords = processRecords(records); for (ProducerRecord<String, String> outputRecord : outputRecords) { producer.send(outputRecord); } // To ensure that the consumed and produced messages are batched, we need to commit // the offsets through // the producer and not the consumer. // // If this returns an error, we should abort the transaction. sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets()); // Now that we have consumed, processed, and produced a batch of messages, let's // commit the results. // If this does not report success, then the transaction will be rolled back. producer.endTransactioncommitTransaction(); } } } } |
New Configurations
Broker configs
transactional.id.timeout.ms | The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer TransactionalId without receiving any transaction status updates from it. Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids. |
max.transaction.timeout.ms | The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return a InvalidTransactionTimeout error in InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction. Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent. |
transaction.state.log.replication.factor | The number of replicas for the transaction state topic. Default: 3 |
transaction.state.log.num.partitions | The number of partitions for the transaction state topic. Default: 50 |
transaction.state.log.min.isr | The minimum number of insync replicas the each partition of the transaction state topic needs to have to be considered online. Default: 2 |
transaction.state.log.segment.bytes | The segment size for the transaction state topic. Default: 104857600 bytes. |
Producer configs
enable.idempotence | Whether or not idempotence is enabled (false by default). If disabled, the producer will not set the PID field in produce requests and the current producer delivery semantics will be in effect. Note that idempotence must be enabled in order to use transactions. When idempotence is enabled, we enforce that acks=all, retries > 1, and max.inflight.requests.per.connection=1. Without these values for these configurations, we cannot guarantee idempotence. If these settings are not explicitly overidden by the application, the producer will set acks=all, retries=Integer.MAX_VALUE, and max.inflight.requests.per.connection=1 when idempotence is enabled. |
transaction.timeout.ms | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. This config value will be sent to the transaction coordinator along with the InitPidRequest. If this value is larger than the max.transaction.timeout.ms setting in the broker, the request will fail with a `InvalidTransactionTimeout` error. Default is 60000. This makes a transaction to not block downstream consumption more than a minute, which is generally allowable in real-time apps. |
transactional.id | The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that The default is empty, which means transactions cannot be used. |
Consumer configs
isolation.level | Here are the possible values (default is read_uncommitted): read_uncommitted: consume both committed and uncommitted messages in offset ordering. read_committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction. |
Proposed Changes
Summary of Guarantees
Idempotent Producer Guarantees
To implement idempotent producer semantics, we introduce the concepts of a producer id, henceforth called the PID, and sequence numbers for Kafka messages. Every new producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users and is never exposed by clients.
Transactional Guarantees
At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit.
Exactly one active producer with a given TransactionalId. This is achieved by fencing off old generations when a new instance with the same TransactionalId comes online.
- Transaction recovery across application sessions. If an application instance dies, the next instance can be guaranteed that any unfinished transactions have been completed (whether aborted or committed), leaving the new instance in a clean state prior to resuming work.
Note that the transactional guarantees mentioned here are from the point of view of the producer. On the consumer side, the guarantees are a bit weaker. In particular, we cannot guarantee that all the messages of a committed transaction will be consumed all together. This is for several reasons:
- For compacted topics, some messages of a transaction maybe overwritten by newer versions.
- Transactions may straddle log segments. Hence when old segments are deleted, we may lose some messages in the first part of a transaction.
- Consumers may seek to arbitrary points within a transaction, hence missing some of the initial messages.
- Consumer may not consume from all the partitions which participated in a transaction. Hence they will never be able to read all the messages that comprised the transaction.
Key Concepts
To implement transactions, ie. ensuring that a group of messages are produced and consumed atomically, we introduce several new concepts:
- We introduce a new entity called a Transaction Coordinator. Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning PIDs and managing transactions is done by the transaction coordinator.
- We introduce a new internal kafka topic called the Transaction Log. Similar to the Consumer Offsets topic, the transaction log is a persistent and replicated record of every transaction. The transaction log is the state store for the transaction coordinator, with the snapshot of the latest version of the log encapsulating the current state of each active transaction.
- We introduce the notion of Control Messages. These are special messages written to user topics, processed by clients, but never exposed to users. They are used, for instance, to let brokers indicate to consumers if the previously fetched messages have been committed atomically or not. Control messages have been previously proposed here.
- We introduce a notion of TransactionalId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same TransactionalId will be able to resume (or abort) any transactions instantiated by the previous instance.
- We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given TransactionalId, and hence enables us to maintain transaction guarantees in the event of failures.
...
When a producer is finished with a transaction, the newly introduced KafkaProducer.endTransaction commitTransaction or KafkaProducer.abortTransaction must be called. The former makes the data produced in 4 available to downstream consumers. The latter effectively erases the produced data from the log: it will never be accessible to the user, ie. downstream consumers will read and discard the aborted messages.
...
Ok
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
ConcurrentTransactions
InvalidTransactionTimeout
...
Code Block | ||||
---|---|---|---|---|
| ||||
AddPartitionsToTxnResponse => ErrorCode ErrorCode: int16 |
Error code:
Ok
InvalidProducerEpochNotCoordinator
InvalidPidMappingCoordinatorNotAvailable
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
CoordinatorLoadInProgress
InvalidPidMapping
InvalidTxnState
ConcurrentTransactions
GroupAuthorizationFailedInvalidTxnRequest
AddOffsetsToTxnRequest
Sent by the producer to its transaction coordinator to indicate a consumer offset commit operation is called as part of the current ongoing transaction. Request handling details can be found here.
...
Code Block | ||||
---|---|---|---|---|
| ||||
AddOffsetsToTxnResponse => ErrorCode
ErrorCode: int16
Coordinator => NodeId Host Port
NodeId => int32
Host => string
Port => int32 |
Error code:
Ok
InvalidProducerEpoch
InvalidPidMapping
NotCoordinatorForTransactionalId
CoordinatorNotAvailable
ConcurrentTransactions
InvalidTxnRequest
EndTxnRequest/Response
...
Code Block | ||||
---|---|---|---|---|
| ||||
EndTxnRequest => TransactionalId PID Epoch Command TransactionalId => string PID => int64 Epoch => int16 Command => boolean (false(0) means ABORT, true(1) means COMMIT) |
Code Block | ||||
---|---|---|---|---|
| ||||
EndTxnResponse => ErrorCode ErrorCode => int16 |
...
Ok
InvalidProducerEpoch
InvalidPidMapping
CoordinatorNotAvailable
ConcurrentTransactions
NotCoordinatorForTransactionalId
InvalidTxnRequest
...
WriteTxnMarkersRequest/Response
Sent by transaction coordinator to broker to commit the transaction. Request handling details can be found here.
Code Block | ||||
---|---|---|---|---|
| ||||
WriteTxnMarkerRequestWriteTxnMarkersRequest => [CoorinadorEpoch PID Epoch Marker CoordinatorEpoch [Topic [Partition]]] PIDCoordinatorEpoch => int64int32 EpochPID => int16int64 CoordinatorEpochEpoch => int32int16 Marker => int8boolean (false(0) =means COMMITABORT, true(1) =means ABORTCOMMIT) Topic => string Partition => int32 |
...
Code Block | ||||
---|---|---|---|---|
| ||||
WriteTxnMarkerResponseWriteTxnMarkersResponse => ErrorCode ErrorCode [PID [TopicName [Partition ErrorCode]]] PID => int64 TopicName => string Partition => int32 ErrorCode => int16 |
Error code:
Ok
TxnOffsetCommitRequest/Response
...
Error code:
- InvalidProducerEpoch
Note: The following is tangential to the TxnOffsetCommitRequest/Response: When an OffsetCommitRequest from a consumer failed with a retriable error, we return RetriableOffsetCommitException to the application callback. Previously, this 'RetriableOffsetCommitException' would include the underlying exception. With the changes in KIP-98, we no longer include the underlying exception in the 'RetriableOffsetCommitException'.
Message Format
In order to add new fields such as PID and epoch into the produced messages for transactional messaging and de-duplication, we need to change Kafka’s message format and bump up its version (i.e. the “magic byte”). More specifically, we need to add the following fields into each message:
...
Code Block | ||
---|---|---|
| ||
MessageSet => FirstOffset => int64 Length => int32 PartitionLeaderEpoch => int32 /* Added for KIP-101 */ Magic => int8 /* bump up to “2” */ CRC => int32 /* CRC32C which covers everything from Attributes on */ Attributes => int16 LastOffsetDelta => int32 {NEW} FirstTimestamp => int64 {NEW} MaxTimestamp => int64 {NEW} PID => int64 {NEW} ProducerEpoch => int16 {NEW} FirstSequence => int32 {NEW} Messages => [Message] Message => {ALL FIELDS NEW} Length => varint Attributes => int8 TimestampDelta => varint OffsetDelta => varint KeyLen => varint Key => data ValueLen => varint Value => data Headers => [Header] /* See KIP-82. Note the array uses a varint for the number of headers. */ Header => HeaderKey HeaderVal HeaderKeyLen => varint HeaderKey => string HeaderValueLen => varint HeaderValue => data |
The ability to store some fields only at the message set level allows us to conserve space considerably when batching messages into a message set. For example, there is no need to write the PID within each message since it will always be the same for all messages within each message set. In addition, by separating the message level format and message set format, now we can also use variable-length types for the inner (relative) offsets and save considerably over a fixed 8-byte field size.
...
Message Set Attributes: The message set attributes are essentially the same as in the existing format, though we have added an additional byte for future use. In addition to the existing 3 bits used to indicate the compression codec and 1 bit for timestamp type, we will use another bit to indicate that the message set is transactional (see Transaction Markers section). This lets consumers in READ_COMMITTED know whether a transaction marker is expected for a given message set.
Compression (3) | Timestamp type (1) | Transactional (1) | Control(1) | Unused ( |
10) |
Discussion on Maximum Message Size. The broker’s configuration max.message.size previously controlled the maximum size of a single uncompressed message or a compressed set of messages. With this design, it now controls the maximum message set size, compressed or not. In practice, the difference is minor because a single message can be written as a singleton message set, with the small increase in overhead mentioned above.
...
The length field of the message format is encoded as an unsigned a signed variable-length int, abbr. “uintVar”integer. Similarly the offset delta and key length fields are encoded as unitVar as well. The message’s offset can then be calculated as the offset of the message set + offset delta.
Message Attributes: In this format, we have also added a single byte for individual message attributes. Only message sets can be compressed, so there is no need to reserve some of these attributes for the compression type. The control flag indicates that the message is a control message, which means it is not intended for application consumption. The remaining bits are currently unused, though one could be used for KIP-87 (message tombstone marker).All of the message-level attributes are available for future use.
Unused (8) |
Control Messages
We use control messages to represent transaction markers. All messages contained in a batch with the control attribute set (see above) are considered control messages and follow a specific format. Each control message must
Control Flag (1)
Unused (5)
Control messages will always have a non-null key, which is used to indicate the type of control message type with the following schema:
...
As the batch size increases, the overhead of the new format grows smaller compared to the old format because of the eliminated redundancy. The overhead per message in the old format is fixed at 34 bytes. For the new format, the message set overhead is 53 bytes, while per-message overhead ranges from 6 to 25 bytes. This makes it more costly to send individual messages, but space is quickly recovered with even modest batching. For example, assuming a fixed message size of 1K with 100 byte keys and reasonably close timestamps, the overhead increases by only 7 bytes for each additional batched message (2 bytes for the message size, 1 byte for attributes, 2 bytes for timestamp delta, 1 byte for offset delta, and 1 byte for key size) :
...
Batch Size | Old Format Overhead | New Format Overhead |
1 | 34*1 = 34 | 53 + 1*7 = 60 |
3 | 34*3 = 102 | 53 + 3*7 = 74 |
10 | 34*10 = 340 | 53 + 10*7 = 123 |
50 | 34*50 = 1700 | 53 + 50*7 = 403 |
100 | 34*100 = 3400 |
53 + 100*7 = |
753 |
Metrics
As part of this work, we would need to expose new metrics to make the system operable. These would include:
- Number of live PIDs (a proxy for the size of the PID->Sequence map)
- Current LSO per partition (useful to detect stuck consumers and lost commit/abort markers).
- Number of active transactionalIds (proxy for the memory consumed by the transaction coordinator).
...
Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.
Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.
Notify clients that they can upgrade, BUT should not start using the idempotent / transactional message APIs yet.
[When observed that most of the clients have upgraded] Restart the brokers, with the message format version set to the latest.
Notify upgraded clients that they can now start using the idempotent / transactional message APIs.
The reason for step 3 is to avoid the performance cost for down-converting messages to an older format, which effectively loses the “zero-copy” optimization. Ideally, all consumers are upgraded before the producers even begin writing to the new message format.
Note: Since the old producer has long since been deprecated and the old consumer will be deprecated in 0.11.0, these clients will not support the new format. In order to avoid the conversion hit, users will have to upgrade to the new clients. It is possible to selectively enable the message format on topics which are already using the new clients.
Test Plan
Correctness
The new features will be tested through unit, integration, and system tests.
The integration tests will focus on ensuring that the basic guarantees (outlined in the Summary of Guarantees section) are satisfied across components.
The system tests will focus on ensuring that the guarantees are satisfied even with failing components, ie. that the system works even when consumers, producers, brokers are killed in various states.
We will also add to existing compatibility system tests to ensure that old clients can still talk to the new brokers with the new message format.
Performance
This KIP introduces significant changes to the message format along with the new features.
We plan on introducing changes in a staged fashion, with the first change being to the message format. We will run our performance test suite on these message format changes and ensure that there is a minimal performance impact thanks to these changes at worst. Note that the message format changes are the only ones which can affect users who don't enable the idempotent producer and don't use transactions.
Then, we will benchmark the performance of the idempotent producer and the transactional producer separately. Finally, we will benchmark the consumer and broker performance when transactions are in use and read_committed mode is enabled. We will publish the results of all these benchmarks so that users can make informed decisions about when and how to use these features.
Rejected Alternatives
As mentioned earlier, we have a separate design document which explores the design space --including rejected alternatives-- as well as all the implementation details. The latter also includes the specifics of message format changes, new RPCs, error handling, etc.
The design document is available here.
...