Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

[This KIP proposal is a joint work between Jason GustafsonFlavio Junqueira,  Apurva Mehta, Sriram, and guozhang Wang]

 


Table of Contents

Status

Current stateAdopted

...

An Example Application

Here is an a simple application which demonstrates the use of the APIs introduced above.


 

Code Block
languagejava
titleKafkaTransactionsExample.java
linenumberstrue
public class KafkaTransactionsExample {
 
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);


    // Note that the ‘transactional.id’ configuration _must_ be specified in the 
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

    // We need to initialize transactions once per producer instance. To use transactions, 
    // it is assumed that the application id is specified in the config with the key 
    // transactional.id.
    // 
    // This method will recover or abort transactions initiated by previous instances of a 
    // producer with the same app id. Any other transactional messages will report an error 
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will 
    // require a new producer  instance. See the documentation for TransactionMetadata for a 
    // list of error codes.
    producer.initTransactions();
    
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed 
        // records as well
        // as an records produced as a result of processing the input records. 
        //
        // We need to check the response to make sure that this producer is able to initiate 
        // a new transaction.
        producer.beginTransaction();
        
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord); 
        }
        
        // To ensure that the consumed and produced messages are batched, we need to commit 
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
        
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        
     
        // Now that we have consumed, processed, and produced a batch of messages, let's 
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransactioncommitTransaction();
      }
    } 
  }
}

New Configurations

Broker configs

 


transactional.id.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer TransactionalId without receiving any transaction status updates from it.

Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids.

max.transaction.timeout.ms

The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return a InvalidTransactionTimeout error in InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.

transaction.state.log.replication.factor

The number of replicas for the transaction state topic.

Default: 3

transaction.state.log.num.partitions

The number of partitions for the transaction state topic.

Default: 50

transaction.state.log.min.isr

The minimum number of insync replicas the each partition of the transaction state topic needs to have to be considered online.

Default: 2

transaction.state.log.segment.bytes

The segment size for the transaction state topic.

Default: 104857600 bytes.

Producer configs


 

enable.idempotence

Whether or not idempotence is enabled (false by default). If disabled, the producer will not set the PID field in produce requests and the current producer delivery semantics will be in effect. Note that idempotence must be enabled in order to use transactions.

When idempotence is enabled, we enforce that acks=all, retries > 1, and max.inflight.requests.per.connection=1. Without these values for these configurations, we cannot guarantee idempotence. If these settings are not explicitly overidden by the application, the producer will set acks=all, retries=Integer.MAX_VALUE, and max.inflight.requests.per.connection=1 when idempotence is enabled.

transaction.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

This config value will be sent to the transaction coordinator along with the InitPidRequest. If this value is larger than the max.transaction.timeout.ms setting in the broker, the request will fail with a `InvalidTransactionTimeout` error.

Default is 60000. This makes a transaction to not block downstream consumption more than a minute, which is generally allowable in real-time apps.

transactional.id

The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery.

Note that enable.idempotence must be enabled if a TransactionalId is configured.

The default is empty, which means transactions cannot be used.

Consumer configs


 

isolation.level

Here are the possible values (default is read_uncommitted):

read_uncommitted: consume both committed and uncommitted messages in offset ordering.

read_committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction.

Proposed Changes

Summary of Guarantees

Idempotent Producer Guarantees

To implement idempotent producer semantics, we introduce the concepts of a producer id, henceforth called the PID, and sequence numbers for Kafka messages. Every new producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users and is never exposed by clients.


For a given PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. The sequence number will be incremented by the producer on every message sent to the broker. The broker maintains in memory the sequence numbers it receives for each topic partition from every PID. The broker will reject a produce request if its sequence number is not exactly one greater than the last committed message from that PID/TopicPartition pair. Messages with a lower sequence number result in a duplicate error, which can be ignored by the producer. Messages with a higher number result in an out-of-sequence error, which indicates that some messages have been lost, and is fatal.

This ensures that, even though a producer must retry requests upon failures, every message will be persisted in the log exactly once. Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session.

These idempotent producer semantics are potentially useful for stateless applications like metrics tracking and auditing.

Transactional Guarantees

At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit. 


Further, since consumer progress is recorded as a write to the offsets topic, the above capability is leveraged to enable applications to batch consumed and produced messages into a single atomic unit, ie. a set of messages may be considered consumed only if the entire ‘consume-transform-produce’ executed in its entirety.

Additionally, stateful applications will also be able to ensure continuity across multiple sessions of the application. In other words, Kafka can guarantee idempotent production and transaction recovery across application bounces.

To achieve this, we require that the application provides a unique id which is stable across all sessions of the application. For the rest of this document, we refer to such an id as the TransactionalId. While there may be a 1-1 mapping between an TransactionalId and the internal PID, the main difference is the the TransactionalId is provided by users, and is what enables idempotent guarantees across producers sessions described below.

When provided with such an TransactionalId, Kafka will guarantee:
  1. Exactly one active producer with a given TransactionalId. This is achieved by fencing off old generations when a new instance with the same TransactionalId comes online.

  2. Transaction recovery across application sessions. If an application instance dies, the next instance can be guaranteed that any unfinished transactions have been completed (whether aborted or committed), leaving the new instance in a clean state prior to resuming work.

Note that the transactional guarantees mentioned here are from the point of view of the producer. On the consumer side, the guarantees are a bit weaker. In particular, we cannot guarantee that all the messages of a committed transaction will be consumed all together. This is for several reasons:

  1. For compacted topics, some messages of a transaction maybe overwritten by newer versions.
  2. Transactions may straddle log segments. Hence when old segments are deleted, we may lose some messages in the first part of a transaction.
  3. Consumers may seek to arbitrary points within a transaction, hence missing some of the initial messages.
  4. Consumer may not consume from all the partitions which participated in a transaction. Hence they will never be able to read all the messages that comprised the transaction.

Key Concepts

To implement transactions, ie. ensuring that a group of messages are produced and consumed atomically, we introduce several new concepts:

  1. We introduce a new entity called a Transaction Coordinator. Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning PIDs and managing transactions is done by the transaction coordinator.
  2. We introduce a new internal kafka topic called the Transaction Log. Similar to the Consumer Offsets topic, the transaction log is a persistent and replicated record of every transaction. The transaction log is the state store for the transaction coordinator, with the snapshot of the latest version of the log encapsulating the current state of each active transaction.
  3. We introduce the notion of Control Messages. These are special messages written to user topics, processed by clients, but never exposed to users. They are used, for instance, to let brokers indicate to consumers if the previously fetched messages have been committed atomically or not. Control messages have been previously proposed here.
  4. We introduce a notion of TransactionalId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same TransactionalId will be able to resume (or abort) any transactions instantiated by the previous instance.
  5. We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given TransactionalId, and hence enables us to maintain transaction guarantees in the event of failures.

...

When a producer is finished with a transaction, the newly introduced KafkaProducer.endTransaction commitTransaction or KafkaProducer.abortTransaction must be called. The former makes the data produced in 4 available to downstream consumers. The latter effectively erases the produced data from the log: it will never be accessible to the user, ie. downstream consumers will read and discard the aborted messages.

...

  • Ok

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

  • ConcurrentTransactions

  • InvalidTransactionTimeout

...

Code Block
languagetext
titleAddPartitionsToTxnResponse
AddPartitionsToTxnResponse => ErrorCode
 ErrorCode: int16

Error code:

  • Ok

  • InvalidProducerEpochNotCoordinator

  • InvalidPidMappingCoordinatorNotAvailable

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

  • CoordinatorLoadInProgress

  • InvalidPidMapping

  • InvalidTxnState

  • ConcurrentTransactions

  • GroupAuthorizationFailedInvalidTxnRequest

AddOffsetsToTxnRequest

Sent by the producer to its transaction coordinator to indicate a consumer offset commit operation is called as part of the current ongoing transaction. Request handling details can be found here.

...

Code Block
languagetext
titleAddOffsetsToTxnResponse
AddOffsetsToTxnResponse => ErrorCode
 ErrorCode: int16
 Coordinator => NodeId Host Port
   NodeId => int32
   Host => string
   Port => int32

Error code:

  • Ok

  • InvalidProducerEpoch

  • InvalidPidMapping

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

  • ConcurrentTransactions

  • InvalidTxnRequest

EndTxnRequest/Response

...

Code Block
languagetext
titleEndTxnRequest
EndTxnRequest => TransactionalId PID Epoch Command
 TransactionalId => string
 PID => int64
 Epoch => int16
 Command => boolean (false(0) means ABORT, true(1) means COMMIT)


Code Block
languagetext
titleEndTxnResponse
EndTxnResponse => ErrorCode
 ErrorCode => int16

...

  • Ok

  • InvalidProducerEpoch

  • InvalidPidMapping

  • CoordinatorNotAvailable

  • ConcurrentTransactions

  • NotCoordinatorForTransactionalId

  • InvalidTxnRequest

...

WriteTxnMarkersRequest/Response

Sent by transaction coordinator to broker to commit the transaction. Request handling details can be found here.

Code Block
languagetext
titleWriteTxnMarkerRequestWriteTxnMarkersRequest
WriteTxnMarkerRequestWriteTxnMarkersRequest => [CoorinadorEpoch PID Epoch Marker CoordinatorEpoch [Topic [Partition]]]
 PIDCoordinatorEpoch => int64int32
 EpochPID => int16int64
 CoordinatorEpochEpoch => int32int16
 Marker => int8boolean (false(0) =means COMMITABORT, true(1) =means ABORTCOMMIT)
 Topic => string
 Partition => int32

...

Code Block
languagetext
titleWriteTxnMarkerResponseWriteTxnMarkersResponse
WriteTxnMarkerResponseWriteTxnMarkersResponse => ErrorCode
 ErrorCode [PID [TopicName [Partition ErrorCode]]]
 PID => int64
 TopicName => string
 Partition => int32
 ErrorCode => int16

Error code:

  • Ok

TxnOffsetCommitRequest/Response

...

Error code:

  • InvalidProducerEpoch

Note: The following is tangential to the TxnOffsetCommitRequest/Response: When an  OffsetCommitRequest from a consumer failed with a retriable error, we return RetriableOffsetCommitException to the application callback. Previously, this 'RetriableOffsetCommitException' would include the underlying exception. With the changes in KIP-98, we no longer include the underlying exception in the 'RetriableOffsetCommitException'.

Message Format

In order to add new fields such as PID and epoch into the produced messages for transactional messaging and de-duplication, we need to change Kafka’s message format and bump up its version (i.e. the “magic byte”). More specifically, we need to add the following fields into each message:

...

Code Block
languagetext
MessageSet => 
  FirstOffset => int64
  Length => int32
  PartitionLeaderEpoch => int32 /* Added for KIP-101 */
  Magic => int8  /* bump up to “2” */
  CRC => int32 /* CRC32C which covers everything from Attributes on */
  Attributes => int16
  LastOffsetDelta => int32 {NEW}
  FirstTimestamp => int64 {NEW}
  MaxTimestamp => int64 {NEW}
  PID => int64 {NEW}
  ProducerEpoch => int16 {NEW}
  FirstSequence => int32 {NEW}
  Messages => [Message]

Message => {ALL FIELDS NEW}
  Length => varint
  Attributes => int8
  TimestampDelta => varint
  OffsetDelta => varint
  KeyLen => varint
  Key => data
  ValueLen => varint
  Value => data

  Headers => [Header] /* See KIP-82. Note the array uses a varint for the number of headers. */
 
Header => HeaderKey HeaderVal
  HeaderKeyLen => varint
  HeaderKey => string
  HeaderValueLen => varint
  HeaderValue => data

 


The ability to store some fields only at the message set level allows us to conserve space considerably when batching messages into a message set. For example, there is no need to write the PID within each message since it will always be the same for all messages within each message set. In addition, by separating the message level format and message set format, now we can also use variable-length types for the inner (relative) offsets and save considerably over a fixed 8-byte field size.

...

Message Set Attributes: The message set attributes are essentially the same as in the existing format, though we have added an additional byte for future use. In addition to the existing 3 bits used to indicate the compression codec  and 1 bit for timestamp type, we will use another bit to indicate that the message set is transactional (see Transaction Markers section). This lets consumers in READ_COMMITTED know whether a transaction marker is expected for a given message set.


The control flag indicates that the messages contained in the message set are not intended for application consumption (see below). 


Compression (3)

Timestamp type (1)

Transactional (1)

Control(1)

Unused (

11

10)

 


Discussion on Maximum Message Size. The broker’s configuration max.message.size previously controlled the maximum size of a single uncompressed message or a compressed set of messages. With this design, it now controls the maximum message set size, compressed or not. In practice, the difference is minor because a single message can be written as a singleton message set, with the small increase in overhead mentioned above.

...

The length field of the message format is encoded as an unsigned a signed variable-length int, abbr. “uintVar”integer. Similarly the offset delta and key length fields are encoded as unitVar as well. The message’s offset can then be calculated as the offset of the message set + offset delta. 

Message Attributes: In this format, we have also added a single byte for individual message attributes. Only message sets can be compressed, so there is no need to reserve some of these attributes for the compression type.  The control flag indicates that the message is a control message, which means it is not intended for application consumption. The remaining bits are currently unused, though one could be used for KIP-87 (message tombstone marker).All of the message-level attributes are available for future use. 


Unused (8)

Control Messages

We use control messages to represent transaction markers. All messages contained in a batch with the control attribute set (see above) are considered control messages and follow a specific format. Each control message must

 

Control Flag (1)

Unused (5)

Control messages will always have a non-null key, which is used to indicate the type of control message type with the following schema:

...

As the batch size increases, the overhead of the new format grows smaller compared to the old format because of the eliminated redundancy. The overhead per message in the old format is fixed at 34 bytes. For the new format, the message set overhead is 53 bytes, while per-message overhead ranges from 6 to 25 bytes. This makes it more costly to send individual messages, but space is quickly recovered with even modest batching. For example, assuming a fixed message size of 1K with 100 byte keys and reasonably close timestamps, the overhead increases by only 7 bytes for each additional batched message (2 bytes for the message size, 1 byte for attributes, 2 bytes for timestamp delta, 1 byte for offset delta, and 1 byte for key size) :

...


Batch Size

Old Format Overhead

New Format Overhead

1

34*1 = 34

53 + 1*7 = 60

3

34*3 = 102

53 + 3*7 = 74

10

34*10 = 340

53 + 10*7 = 123

50

34*50 = 1700

53 + 50*7 = 403

100

34*100 = 3400

45

53 + 100*7 =

745

753

Metrics

As part of this work, we would need to expose new metrics to make the system operable. These would include:

  1. Number of live PIDs (a proxy for the size of the PID->Sequence map)
  2. Current LSO per partition (useful to detect stuck consumers and lost commit/abort markers).
  3. Number of active transactionalIds (proxy for the memory consumed by the transaction coordinator).

...

  1. Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.

  2. Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.

  3. Notify clients that they can upgrade, BUT should not start using the idempotent / transactional message APIs yet.

  4. [When observed that most of the clients have upgraded] Restart the brokers, with the message format version set to the latest.

  5. Notify upgraded clients that they can now start using the idempotent / transactional message APIs.

The reason for step 3 is to avoid the performance cost for down-converting messages to an older format, which effectively loses the “zero-copy” optimization. Ideally, all consumers are upgraded before the producers even begin writing to the new message format.

Note: Since the old producer has long since been deprecated and the old consumer will be deprecated in 0.11.0, these clients will not support the new format. In order to avoid the conversion hit, users will have to upgrade to the new clients. It is possible to selectively enable the message format on topics which are already using the new clients.

Test Plan

Correctness

The new features will be tested through unit, integration, and system tests.

The integration tests will focus on ensuring that the basic guarantees (outlined in the Summary of Guarantees section) are satisfied across components.

The system tests will focus on ensuring that the guarantees are satisfied even with failing components, ie. that the system works even when consumers, producers, brokers are killed in various states.

We will also add to existing compatibility system tests to ensure that old clients can still talk to the new brokers with the new message format.

Performance

This KIP introduces significant changes to the message format along with the new features.

We plan on introducing changes in a staged fashion, with the first change being to the message format. We will run our performance test suite on these message format changes and ensure that there is a minimal performance impact thanks to these changes at worst. Note that the message format changes are the only ones which can affect users who don't enable the idempotent producer and don't use transactions.

Then, we will benchmark the performance of the idempotent producer and the transactional producer separately. Finally, we will benchmark the consumer and broker performance when transactions are in use and read_committed mode is enabled. We will publish the results of all these benchmarks so that users can make informed decisions about when and how to use these features. 

Rejected Alternatives

As mentioned earlier, we have a separate design document which explores the design space --including rejected alternatives-- as well as all the implementation details. The latter also includes the specifics of message format changes, new RPCs, error handling, etc. 

The design document is available here.

...