Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

[This KIP proposal is a joint work between Jason GustafsonFlavio Junqueira,  Apurva Mehta, Sriram, and guozhang Wang]

 


Table of Contents

Status

Current stateAdopted

Discussion threadhttp://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging

JIRA

toc
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4815

Motivation

This document outlines a proposal for strengthening the message delivery semantics of Kafka. This builds on significant work which has been done previously, specifically, here and here.

...

In this document we present a proposal for bringing transactions to Kafka. We will only focus on the user facing changes: the client API changes, and the new configurations we will introduce, and the summary of guarantees. We also outline the basic data flow, which summarizes all the new RPCs we will introduce with transactions. The design details of the design is are presented in a separate document.

...

An Example Application

Here is an a simple application which demonstrates the use of the APIs introduced above.

...


Code Block
languagejava
titleKafkaTransactionsExample.java
linenumberstrue
public class KafkaTransactionsExample {
 
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);


    // Note that the ‘transactional.id’ configuration _must_ be specified in the 
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

    // We need to initialize transactions once per producer instance. To use transactions, 
    // it is assumed that the application id is specified in the config with the key 
    // transactional.id.
    // 
    // This method will recover or abort transactions initiated by previous instances of a 
    // producer with the same app id. Any other transactional messages will report an error 
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will 
    // require a new producer  instance. See the documentation for TransactionMetadata for a 
    // list of error codes.
    producer.initTransactions();
    
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed 
        // records as well
        // as an records produced as a result of processing the input records. 
        //
        // We need to check the response to make sure that this producer is able to initiate 
        // a new transaction.
        producer.beginTransaction();
        
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord); 
        }
        
        // To ensure that the consumed and produced messages are batched, we need to commit 
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
        
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        
     
        // Now that we have consumed, processed, and produced a batch of messages, let's 
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransactioncommitTransaction();
      }
    } 
  }
}

New Configurations

Broker configs

 


transactional.id.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer TransactionalId without receiving any transaction status updates from it.

Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids.

max.transaction.timeout.ms

The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return

an

a InvalidTransactionTimeout error in

BeginTxnRequest

InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.

Producer configs

transaction.state.log.replication.factor

The number of replicas for the transaction state topic.

Default: 3

transaction.state.log.num.partitions

The number of partitions for the transaction state topic.

Default: 50

transaction.state.log.min.isr

The minimum number of insync replicas the each partition of the transaction state topic needs to have to be considered online.

Default: 2

transaction.state.log.segment.bytes

The segment size for the transaction state topic.

Default: 104857600 bytes.

Producer configs


 

enable.idempotence
enable.idempotence

Whether or not idempotence is enabled (false by default). If disabled, the producer will not set the PID field in produce requests and the current producer delivery semantics will be in effect. Note that idempotence must be enabled in order to use transactions.

When idempotence is enabled, we enforce that acks=all, retries > 1, and max.inflight.requests.per.connection=1. Without these values for these configurations, we cannot guarantee idempotence. If these settings are not explicitly overidden by the application, the producer will set acks=all, retries=Integer.MAX_VALUE, and max.inflight.requests.per.connection=1 when idempotence is enabled.

transaction.timeout.ms

The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

This config value will be sent to the transaction coordinator along with the

BeginTxnRequest.

InitPidRequest. If this value is larger than the max.transaction.timeout.ms setting in the broker, the request will fail with a `InvalidTransactionTimeout` error.

Default is 60000. This makes a transaction to not block downstream consumption

Default is 60000. This makes a transaction to not block downstream consumption

more than a minute, which is generally allowable in real-time apps.

transactional.id

The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery.

Note that enable.idempotence must be enabled if a TransactionalId is configured.

The default is empty, which means transactions cannot be used.

Consumer configs

 


isolation.level

Here are the possible values (default is

all

read_uncommitted):

read

.

_uncommitted: consume both committed and uncommitted messages in offset ordering.

read

.

_committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction.

Proposed Changes

Summary of Guarantees

Idempotent Producer Guarantees

To implement idempotent producer semantics, we introduce the concepts of a producer id, henceforth called the PID, and sequence numbers for Kafka messages. Every new producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users and is never exposed by clients.


For a given PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. The sequence number will be incremented by the producer on every message sent to the broker. The broker maintains in memory the sequence numbers it receives for each topic partition from every PID. The broker will reject a produce request if its sequence number is not exactly one greater than the last committed message from that PID/TopicPartition pair. Messages with a lower sequence number result in a duplicate error, which can be ignored by the producer. Messages with a higher number result in an out-of-sequence error, which indicates that some messages have been lost, and is fatal.

This ensures that, even though a producer must retry requests upon failures, every message will be persisted in the log exactly once. Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session.

These idempotent producer semantics are potentially useful for stateless applications like metrics tracking and auditing.

Transactional Guarantees

At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit. 


Further, since consumer progress is recorded as a write to the offsets topic, the above capability is leveraged to enable applications to batch consumed and produced messages into a single atomic unit, ie. a set of messages may be considered consumed only if the entire ‘consume-transform-produce’ executed in its entirety.

Additionally, stateful applications will also be able to ensure continuity across multiple sessions of the application. In other words, Kafka can guarantee idempotent production and transaction recovery across application bounces.

To achieve this, we require that the application provides a unique id which is stable across all sessions of the application. For the rest of this document, we refer to such an id as the TransactionalId. While there may be a 1-1 mapping between an TransactionalId and the internal PID, the main difference is the the TransactionalId is provided by users, and is what enables idempotent guarantees across producers sessions described below.

When provided with such an TransactionalId, Kafka will guarantee:
  1. Exactly one active producer with a given TransactionalId. This is achieved by fencing off old generations when a new instance with the same TransactionalId comes online.

  2. Transaction recovery across application sessions. If an application instance dies, the next instance can be guaranteed that any unfinished transactions have been completed (whether aborted or committed), leaving the new instance in a clean state prior to resuming work.

Note that the transactional guarantees mentioned here are from the point of view of the producer. On the consumer side, the guarantees are a bit weaker. In particular, we cannot guarantee that all the messages of a committed transaction will be consumed all together. This is for several reasons:

  1. For compacted topics, some messages of a transaction maybe overwritten by newer versions.
  2. Transactions may straddle log segments. Hence when old segments are deleted, we may lose some messages in the first part of a transaction.
  3. Consumers may seek to arbitrary points within a transaction, hence missing some of the initial messages.
  4. Consumer may not consume from all the partitions which participated in a transaction. Hence they will never be able to read all the messages that comprised the transaction.

Key Concepts

To implement transactions, ie. ensuring that a group of messages are produced and consumed atomically, we introduce several new concepts:

  1. We introduce a new entity called a Transaction Coordinator. Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning PIDs and managing transactions is done by the transaction coordinator.
  2. We introduce a new internal kafka topic called the Transaction Log. Similar to the Consumer Offsets topic, the transaction log is a persistent and replicated record of every transaction. The transaction log is the state store for the transaction coordinator, with the snapshot of the latest version of the log encapsulating the current state of each active transaction.
  3. We introduce the notion of Control Messages. These are special messages written to user topics, processed by clients, but never exposed to users. They are used, for instance, to let brokers indicate to consumers if the previously fetched messages have been committed atomically or not. Control messages have been previously proposed here.
  4. We introduce a notion of TransactionalId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same TransactionalId will be able to resume (or abort) any transactions instantiated by the previous instance.
  5. We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given TransactionalId, and hence enables us to maintain transaction guarantees in the event of failures.

In additional to the new concepts above, we also introduce new request types, new versions of existing requests, and new versions of the core message format in order to support transactions. The details of all of these will be deferred to other documents.

Data Flow

Image RemovedImage Added

 

In the diagram above, the sharp edged boxes represent distinct machines. The rounded boxes at the bottom represent Kafka TopicPartitions, and the diagonally rounded boxes represent logical entities which run inside brokers. 

...

If no TransactionalId is specified in the configuration, a fresh PID is assigned, and the producer only enjoys idempotent semantics and transactional semantics within a single session.

3. Starting a Transaction – The

...

beginTransaction() API

The new KafkaProducer will have a KafkaProducer.beginTxnbeginTransaction() method which has to be called to signal the start of a new transaction. Within this method, the producer will send a BeginTxnRequest to the transaction coordinator, which will record the start of the transaction in the transaction log as denoted in step 3aThe producer records local state indicating that the transaction has begun, but the transaction won’t begin from the coordinator’s perspective until the first record is sent.

4. The consume-transform-produce loop

...

The producer sends this request to the transaction coordinator the first time a new TopicPartition is written to as part of a transaction. The addition of this TopicPartition to the transaction is logged by the coordinator in step 4.1a. We need this information so that we can write the commit or abort markers to each TopicPartition (see section 5.2 for details)If this is the first partition added to the transaction, the coordinator will also start the transaction timer.

 

4.2 ProduceRequest

The producer writes a bunch of messages to the user’s TopicPartitions through one or more ProduceRequests (fired from the send method of the producer). These requests include the PID , epoch, and sequence number as denoted in 4.2a.

...

When a producer is finished with a transaction, the newly introduced KafkaProducer.endTransaction commitTransaction or KafkaProducer.abortTransaction must be called. The former makes the data produced in 4 available to downstream consumers. The latter effectively erases the produced data from the log: it will never be accessible to the user, ie. downstream consumers will read and discard the aborted messages.

...

The transaction coordinator handles each of the following requests: InitPidBeginTxnAddPartitionsToTxnAddOffsetsToTxn, and EndTxn. Each request to the transaction coordinator includes the producer’s TransactionalId and can be used for authorization. Each of these requests mutates the transaction state of the producer, so they all require Write access to the corresponding ProducerTransactionalId resource.  Additionally, the AddPartitionsToTxn API requires Write access to the topics corresponding to the included partitions, and the AddOffsetsToTxn API requires Read access to the group included in the request.

...

Code Block
languagetext
titleFetchRequest
// FetchRequest v4

FetchRequest => ReplicaId MaxWaitTime MinBytes IsolationLevel [TopicName [Partition FetchOffset MaxBytes]]
 ReplicaId => int32
 MaxWaitTime => int32
 MinBytes => int32
 IsolationLevel => int8 (READ_COMMITTED | READ_UNCOMMITTED)
 TopicName => string
 Partition => int32
 FetchOffset => int64
 MaxBytes => int32
 IsolationLevel => int8 (READ_COMMITTED | READ_UNCOMMITTED)


Code Block
languagetext
titleFetchResponse
// FetchResponse v4

FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset AbortedTransactions MessageSetSize MessageSet]]
 ThrottleTime => int32
 TopicName => string
 Partition => int32
 ErrorCode => int16
 HighwaterMarkOffset => int64
 LastStableOffset => int64
 AbortedTransactions => [PID FirstOffset]
   PID => int64
   FirstOffset => int64
 MessageSetSize => int32

...

  • DuplicateSequenceNumber [NEW]

  • InvalidSequenceNumber [NEW]

  • ProducerFenced InvalidProducerEpoch [NEW]

  • UNSUPPORTED_FOR_MESSAGE_FORMAT

...

Code Block
languagetext
titleInitPidRequest
InitPidRequest => TransactionalId TransactionTimeoutMs
 TransactionalId => String
 TransactionTimeoutMs => int32


Code Block
languagetext
titleInitPidResponse
InitPIDResponse => Error PID Epoch
 Error => Int16
 PID => Int64
 Epoch => Int16

...

  • Ok

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

...

  • ConcurrentTransactions

  • InvalidTransactionTimeout

AddPartitionsToTxnRequest/Response

Sent by producer to its transaction coordinator to to begin a new add a partition to the current ongoing transaction. Request handling details can be found here.

Code Block
languagetext
titleBeginTxnRequestAddPartitionsToTxnRequest
BeginTxnRequestAddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]
 TransactionalId => string
 PID => int64
 Epoch => int16
 Topic => string
 Partition => int32


Code Block
languagetext
titleBeginTxnResponseAddPartitionsToTxnResponse
BeginTxnResponseAddPartitionsToTxnResponse => ErrorCode
 ErrorCode =>: int16

Error code:

  • Ok

  • NotCoordinator

  • CoordinatorNotAvailable

  • CoordinatorLoadInProgressProducerFenced

  • InvalidPidMapping

  • NotCoordinatorForTransactionalIdInvalidTxnState

  • CoordinatorNotAvailableConcurrentTransactions

  • InvalidTxnRequestGroupAuthorizationFailed

...

AddOffsetsToTxnRequest

Sent by the producer to its transaction coordinator to add a partition to indicate a consumer offset commit operation is called as part of the current ongoing transaction. Request handling details can be found here.

Code Block
languagetext
titleAddPartitionsToTxnRequestAddOffsetsToTxnRequest
AddPartitionsToTxnRequestAddOffsetsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]ConsumerGroupID
 TransactionalId => string
 PID => int64
 Epoch => int32int16
 TopicConsumerGroupID => string
 Partition => int32


Code Block
languagetext
titleAddPartitionsToTxnResponseAddOffsetsToTxnResponse
AddPartitionsToTxnResponseAddOffsetsToTxnResponse => ErrorCode
 ErrorCode: int16

Error code:

  • Ok

  • ProducerFencedInvalidProducerEpoch

  • InvalidPidMapping

  • NotCoordinatorForTransactionalId

  • CoordinatorNotAvailable

  • ConcurrentTransactions

  • InvalidTxnRequest

...

EndTxnRequest/Response

Sent by the producer to its transaction coordinator to indicate a consumer offset commit operation is called as part of prepare committing or aborting the current ongoing transaction. Request handling details can be found here.

Code Block
languagetext
titleAddOffsetsToTxnRequestEndTxnRequest
AddOffsetsToTxnRequestEndTxnRequest => TransactionalId PID Epoch ConsumerGroupIDCommand
 TransactionalId => string
 PID => int64
 Epoch => int32int16
 ConsumerGroupIDCommand => string boolean (false(0) means ABORT, true(1) means COMMIT)


Code Block
languagetext
titleAddOffsetsToTxnResponseEndTxnResponse
AddOffsetsToTxnResponseEndTxnResponse => ErrorCode
 ErrorCode: int16
 Coordinator => NodeId Host Port
   NodeId => int32
   Host => string
   Port => int32int16

Error code:

  • Ok

  • ProducerFencedInvalidProducerEpoch

  • InvalidPidMapping

  • CoordinatorNotAvailable

  • ConcurrentTransactions

  • NotCoordinatorForTransactionalIdCoordinatorNotAvailable

  • InvalidTxnRequest

...

WriteTxnMarkersRequest/Response

Sent by producer to its transaction coordinator to prepare committing or aborting the current ongoing to broker to commit the transaction. Request handling details can be found here.

Code Block
languagetext
titleEndTxnRequestWriteTxnMarkersRequest
EndTxnRequestWriteTxnMarkersRequest => TransactionalId[CoorinadorEpoch PID Epoch Command
Marker TransactionalId =[Topic [Partition]]]
 CoordinatorEpoch => stringint32
 PID => int64
 Epoch => int32int16
 CommandMarker => boolean (false(0) means ABORT, true(1) means COMMIT)
Code Block
languagetext
titleEndTxnResponse
EndTxnResponse
 Topic => ErrorCodestring
 ErrorCodePartition => int16int32

...


Code Block

...

Ok

...

ProducerFenced

...

InvalidPidMapping

...

CoordinatorNotAvailable

...

...

InvalidTxnRequest

WriteTxnMarkerRequest/Response

Sent by transaction coordinator to broker to commit the transaction. Request handling details can be found here.

Code Block
languagetext
titleWriteTxnMarkerRequestWriteTxnMarkersResponse
WriteTxnMarkerRequestWriteTxnMarkersResponse => [PID Epoch Marker CoordinatorEpoch [TopicTopicName [Partition ErrorCode]]]
 PID => int64
 EpochTopicName => int32string
 CoordinatorEpochPartition => int32
 MarkerErrorCode => int8 (0 = COMMIT, 1 = ABORT)
 Topic => string
 Partition => int32
Code Block
languagetext
titleWriteTxnMarkerResponse
WriteTxnMarkerResponse => ErrorCode
 ErrorCode => int16
int16

Error Error code:

  • Ok

TxnOffsetCommitRequest/Response

...

Code Block
languagetext
titleTxnOffsetCommitRequest
TxnOffsetCommitRequest   => ConsumerGroupID
                           PID
                           Epoch
                           RetentionTime
                           OffsetAndMetadata
 ConsumerGroupID => string
 PID => int64
 Epoch => int32int16
 RetentionTime => int64
 OffsetAndMetadata => [TopicName [Partition Offset Metadata]]
   TopicName => string
   Partition => int32
   Offset => int64
   Metadata => string

...

Code Block
languagetext
titleTxnOffsetCommitResponse
TxnOffsetCommitResponse => [TopicName [Partition ErrorCode]]]
 TopicName => string
 Partition => int32
 ErrorCode => int16

Error code:

  • ProducerFenced

Message Format

  • InvalidProducerEpoch

Note: The following is tangential to the TxnOffsetCommitRequest/Response: When an  OffsetCommitRequest from a consumer failed with a retriable error, we return RetriableOffsetCommitException to the application callback. Previously, this 'RetriableOffsetCommitException' would include the underlying exception. With the changes in KIP-98, we no longer include the underlying exception in the 'RetriableOffsetCommitException'.

Message Format

In order to add new fields such as PID and epoch into the produced messages for transactional messaging and de-duplication, we need to change Kafka’s In order to add new fields such as PID and epoch into the produced messages for transactional messaging and de-duplication, we need to change Kafka’s message format and bump up its version (i.e. the “magic byte”). More specifically, we need to add the following fields into each message:

...

Code Block
languagetext
MessageSet => 
  FirstOffset => int64
  Length => int32
  CRCPartitionLeaderEpoch => int32 /* Added for KIP-101 */
  Magic => int8  /* bump up to “2” */
  AttributesCRC => int16
int32 /* LastOffsetDeltaCRC32C =>which int32 {NEW}
  FirstTimestamp => int64covers everything from Attributes on */
  Attributes => int16
  LastOffsetDelta => int32 {NEW}
  FirstTimestamp => int64 {NEW}
  MaxTimestampDeltaMaxTimestamp => int64 {NEW}
  PID => int64 {NEW}
  EpochProducerEpoch => int16 {NEW}
  FirstSequence => int32 {NEW}
  Messages => Message1, Message2, … , MessageN {NEW}[Message]

Message => {ALL FIELDS NEW}
  Length => uintVarvarint
  Attributes => int8
  TimestampDelta => intVarvarint
  OffsetDelta => uintVarvarint
  KeyLen => uintVar [OPTIONAL]varint
  Key => data [OPTIONAL]
  ValueLen => varint
  Value => data

  Headers => [OPTIONAL]

 

Header] /* See KIP-82. Note the array uses a varint for the number of headers. */
 
Header => HeaderKey HeaderVal
  HeaderKeyLen => varint
  HeaderKey => string
  HeaderValueLen => varint
  HeaderValue => data


The ability to store some fields only at the The ability to store some fields only at the message set level allows us to conserve space considerably when batching messages into a message set. For example, there is no need to write the PID within each message since it will always be the same for all messages within each message set. In addition, by separating the message level format and message set format, now we can also use variable-length types for the inner (relative) offsets and save considerably over a fixed 8-byte field size.

...

Message Set Attributes: The message set attributes are essentially the same as in the existing format, though we have added an additional byte for future use. In addition to the existing 3 bits used to indicate the compression codec  and 1 bit for timestamp type, we will use another bit to indicate that the message set is transactional (see Transaction Markers section). This lets consumers in READ_COMMITTED know whether a transaction marker is expected for a given message set.


The control flag indicates that the messages contained in the message set are not intended for application consumption (see below). 


Compression (3)

Timestamp type (1)

Transactional (1)

Control(1)

Unused (

11

10)

 


Discussion on Maximum Message Size. The broker’s configuration max.message.size previously controlled the maximum size of a single uncompressed message or a compressed set of messages. With this design, it now controls the maximum message set size, compressed or not. In practice, the difference is minor because a single message can be written as a singleton message set, with the small increase in overhead mentioned above.

...

The length field of the message format is encoded as an unsigned a signed variable-length int, abbr. “uintVar”integer. Similarly the offset delta and key length fields are encoded as unitVar as well. The message’s offset can then be calculated as the offset of the message set + offset delta. At the end we still maintains a message-level CRC (reason discussed below). 

Message Attributes: In this format, we have also added a single byte for individual message attributes. Only message sets can be compressed, so there is no need to reserve some of these attributes for the compression type. Instead, we use the first bit to indicate a null key and the second bit to indicate a null value (1 indicates null, 0 indicates non-null). This allows us to use an unsigned variable length encoding for the key and value length fields. When the key or value is null, we can omit the corresponding fields:

  • Null-key bit is 1: skip the key-length and key fields.

  • Null-value bit is 1: skip the key-length (since it can now be calculated) and value fields.

  • Both bits are 1: skip key-length, key and value fields.

The control flag indicates that the message is a control message, which means it is not intended for application consumption. The remaining bits are currently unused, though one could be used for KIP-87 (message tombstone marker).

. All of the message-level attributes are available for future use. 


Unused (8)

Control Messages

We use control messages to represent transaction markers. All messages contained in a batch with the control attribute set (see above) are considered control messages and follow a specific format. Each control message must

 

Null Key (1)

Null Value (1)

Control Flag (1)

Unused (5)

Control messages will always have a non-null key, which is used to indicate the type of control message type with the following schema:

...

  1. Having easy access to the offset of the first message allows us to stream messages to the user on demand. In the existing format, we only know the last offset in each message set, so we have to read the messages fully into memory in order to compute the offset of the first message to be returned to the user.

  2. As before, the message set header has a fixed size. This is important because it allows us to do in-place offset/timestamp assignment on the broker before writing to disk.

  3. We have removed the per-message CRC in this format. We hesitated initially to do so because of its use in some auditing applications for end-to-end validation. The problem is that it is not safe, even currently, to assume that the CRC seen by the producer will match that seen by the consumer. One case where it is not preserved is when the topic is configured to use the log append time. Another is when messages need to be up-converted prior to appending to the log. For these reasons, and to conserve space and save computation, we have removed the CRC and deprecated client usage of these fields.

  4. The message set CRC covers the header and message data. Alternatively, we could let it cover only the header, but if compressed data is corrupted, then decompression may fail with obscure errors. Additionally, that would require us to add the message-level CRC back to the message.

  5. The CRC32C polynomial is used for all CRC computations in the new format because optimised implementations are significantly faster  (i.e. if they use the CRC32 instruction introduced in SSE4.2).

  6. Individual messages within a message set have their full size (including header, key, and value) as the first field. This is designed to make deserialization efficient. As we do for the message set itself, we can read the size from the input stream, allocate memory accordingly, and do a single read up to the end of the message. This also makes it easier to skip over the messages if we are looking for a particular one, which potentially saves us from copying the key and value.

  7. We have not included a field for the size of the value in the message schema since it can be computed directly using the message size and the length of the header and key.

  8. We have used a variable length integer to represent timestamps. Our approach is to let the first message

...

As the batch size increases, the overhead of the new format grows smaller compared to the old format because of the eliminated redundancy. The overhead per message in the old format is fixed at 34 bytes. For the new format, the message set overhead is 53 bytes, while per-message overhead ranges from 6 to 25 bytes. This makes it more costly to send individual messages, but space is quickly recovered with even modest batching. For example, assuming a fixed message size of 1K with 100 byte keys and reasonably close timestamps, the overhead increases by only 7 bytes for each additional batched message (2 bytes for the message size, 1 byte for attributes, 2 bytes for timestamp delta, 1 byte for offset delta, and 1 byte for key size) :


 

Batch Size

Old Format Overhead

New Format Overhead

1

34*1 = 34

53 + 1*7 = 60

3

34*3 = 102

53 + 3*7 = 74

10

34*10 = 340

53 + 10*7 = 123

50

34*50 = 1700

53 + 50*7 = 403

100

34

*100 = 3400

45 + 100*7 = 745

 

*100 = 3400

53 + 100*7 = 753

Metrics

As part of this work, we would need to expose new metrics to make the system operable. These would include:

  1. Number of live PIDs (a proxy for the size of the PID->Sequence map)
  2. Current LSO per partition (useful to detect stuck consumers and lost commit/abort markers).
  3. Number of active transactionalIds (proxy for the memory consumed by the transaction coordinator).

Compatibility, Deprecation, and Migration Plan

...

  1. Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.

  2. Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.

  3. Notify clients that they can upgrade, BUT should not start using the idempotent / transactional message APIs yet.

  4. [When observed that most of the clients have upgraded] Restart the brokers, with the message format version set to the latest.

  5. Notify upgraded clients that they can now start using the idempotent / transactional message APIs.

The reason for step 3 is to avoid the performance cost for down-converting messages to an older format, which effectively loses the “zero-copy” optimization. Ideally, all consumers are upgraded before the producers even begin writing to the new message format.

Note: Since the old producer has long since been deprecated and the old consumer will be deprecated in 0.11.0, these clients will not support the new format. In order to avoid the conversion hit, users will have to upgrade to the new clients. It is possible to selectively enable the message format on topics which are already using the new clients.

Test Plan

Correctness

The new features will be tested through unit, integration, and system tests.

The integration tests will focus on ensuring that the basic guarantees (outlined in the Summary of Guarantees section) are satisfied across components.

The system tests will focus on ensuring that the guarantees are satisfied even with failing components, ie. that the system works even when consumers, producers, brokers are killed in various states.

We will also add to existing compatibility system tests to ensure that old clients can still talk to the new brokers with the new message format.

Performance

This KIP introduces significant changes to the message format along with the new features.

We plan on introducing changes in a staged fashion, with the first change being to the message format. We will run our performance test suite on these message format changes and ensure that there is a minimal performance impact thanks to these changes at worst. Note that the message format changes are the only ones which can affect users who don't enable the idempotent producer and don't use transactions.

Then, we will benchmark the performance of the idempotent producer and the transactional producer separately. Finally, we will benchmark the consumer and broker performance when transactions are in use and read_committed mode is enabled. We will publish the results of all these benchmarks so that users can make informed decisions about when and how to use these features. 

Rejected Alternatives

As mentioned earlier, we have a separate design document which explores the design space --including rejected alternatives-- as well as all the implementation details. The latter also includes the specifics of message format changes, new RPCs, error handling, etc. 

The design document is available here.

...