[This KIP proposal is a joint work between Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram, and guozhang Wang]
Motivation
This document outlines a proposal for strengthening the message delivery semantics of Kafka. This builds on significant work which has been done previously, specifically, here and here.
A little bit about transactions and streams
In the previous section, we mentioned the main motivation for transactions is to enable exactly once processing in Kafka Streams. It is worth digging into this use case a little more, as it motivates many of the tradeoffs in our design.
Recall that data transformation using Kafka Streams typically happens through multiple stream processors, each of which is connected by Kafka topics. This setup is known as a stream topology and is basically a DAG where the stream processors are nodes and the connecting Kafka topics are vertices. This pattern is typical of all streaming architectures. You can read more about the Kafka streams architecture here.
As such, a transaction for Kafka streams would essentially encompass the input messages, the updates to the local state store, and the output messages. Including input offsets in a transaction motivates adding the ‘sendOffsets’ API to the Producer interface, described below. Further details will be presented in a separate KIP.
Further, stream topologies can get pretty deep --10 stages is not uncommon. If output messages are only materialized on transaction commits, then a topology which is N stages deep will take N x T to process its input, where T is the average time of a single transaction. So Kafka Streams requires speculative execution, where output messages can be read by downstream processors even before they are committed. Otherwise transactions would not be an option for serious streaming applications. This motivates the ‘read uncommitted’ consumer mode described later.
These are two specific instances where we chose to optimize for the streams use case. As the reader works through this document we encourage her to keep this use case in mind as it motivated large elements of the proposal.
Public Interfaces
Producer API changes
The producer will get five new methods (initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction), with the send method updated to throw a new exception. This is detailed below:
public interface Producer<K,V> extends Closeable { /** * Needs to be called before any of the other transaction methods. Assumes that * the transaction.app.id is specified in the producer configuration. * * This method does the following: * 1. Ensures any transactions initiated by previous instances of the producer * are committed or rolled back. * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * * @throws IllegalStateException if the appId for the producer is not set * in the configuration. */ void initTransactions() throws IllegalStateException; /** * Should be called before the start of each new transaction. * * @throws ProducerFencedException if another producer is with the same * transaction.app.id is active. */ void beginTransaction() throws ProducerFencedException; /** * Sends a list of consumed offsets to the consumer group coordinator, and also marks * those offsets as part of the current transaction. These offsets will be considered * consumed only if the transaction is committed successfully. * * This method should be used when you need to batch consumed and produced messages * together, typically in a consume-transform-produce pattern. * * @throws ProducerFencedException if another producer is with the same * transaction.app.id is active. */ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; /** * Commits the ongoing transaction. * * @throws ProducerFencedException if another producer is with the same * transaction.app.id is active. */ void commitTransaction() throws ProducerFencedException; /** * Aborts the ongoing transaction. * * @throws ProducerFencedException if another producer is with the same * transaction.app.id is active. */ void abortTransaction() throws ProducerFencedException; /** * Send the given record asynchronously and return a future which will eventually contain the response information. * * @param record The record to send * @return A future which will eventually contain the response information * */ public Future<RecordMetadata> send(ProducerRecord<K, V> record); /** * Send a record and invoke the given callback when the record has been acknowledged by the server */ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); }
The OutOfOrderSequence Exception
The Producer will raise an OutOfOrderSequenceException
if the broker detects data loss. In other words, if it receives a sequence number which is greater than the sequence it expected. This exception will be returned in the Future
and passed to the Callback
, if any. This is a fatal exception, and future invocations of Producer methods like send
, beginTransaction
, commitTransaction
, etc. will raise an IlegalStateException
.
An Example Application
Here is an simple application which demonstrates the use of the APIs introduced above.
public class KafkaTransactionsExample { public static void main(String args[]) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); // Note that the ‘transaction.app.id’ configuration _must_ be specified in the // producer config in order to use transactions. KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); // We need to initialize transactions once per producer instance. To use transactions, // it is assumed that the application id is specified in the config with the key // transactions.app.id. // // This method will recover or abort transactions initiated by previous instances of a // producer with the same app id. Any other transactional messages will report an error // if initialization was not performed. // // The response indicates success or failure. Some failures are irrecoverable and will // require a new producer instance. See the documentation for TransactionMetadata for a // list of error codes. producer.initTransactions(); while(true) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); if (!records.isEmpty()) { // Start a new transaction. This will begin the process of batching the consumed // records as well // as an records produced as a result of processing the input records. // // We need to check the response to make sure that this producer is able to initiate // a new transaction. producer.beginTransaction(); // Process the input records and send them to the output topic(s). List<ProducerRecord<String, String>> outputRecords = processRecords(records); for (ProducerRecord<String, String> outputRecord : outputRecords) { producer.send(outputRecord); } // To ensure that the consumed and produced messages are batched, we need to commit // the offsets through // the producer and not the consumer. // // If this returns an error, we should abort the transaction. sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets()); // Now that we have consumed, processed, and produced a batch of messages, let's // commit the results. // If this does not report success, then the transaction will be rolled back. producer.endTransaction(); } } } }
New Configurations
Broker configs
transaction.app.id.timeout.ms | The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer AppID without receiving any transaction status updates from it. |
max.transaction.timeout.ms | The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return an error in BeginTxnRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction. |
Producer configs
transaction.timeout.ms | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. This config value will be sent to the transaction coordinator along with the BeginTxnRequest. |
transaction.app.id | A unique and persistent way to identify a producer. This is used to ensure idempotency and to enable transaction recovery or rollback across producer sessions. This is optional: you will lose cross-session guarantees if this is blank. |
Consumer configs
isolation.level | Here are the possible values (default is all): read.uncommitted: consume both committed and uncommitted messages in offset ordering. read.committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction. |
Proposed Changes
Summary of Guarantees
Idempotent Producer Guarantees
To implement idempotent producer semantics, we introduce the concepts of a producer id, henceforth called the PID, and sequence numbers for Kafka messages. Every new producer will be assigned a unique PID during initialization. The PID assignment is completely transparent to users and is never exposed by clients.
Transactional Guarantees
At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit.
Exactly one active producer with a given AppId. This is achieved by fencing off old generations when a new instance with the same AppId comes online.
- Transaction recovery across application sessions. If an application instance dies, the next instance can be guaranteed that any unfinished transactions have been completed (whether aborted or committed), leaving the new instance in a clean state prior to resuming work.
Note that the transactional guarantees mentioned here are from the point of view of the producer. On the consumer side, the guarantees are a bit weaker. In particular, we cannot guarantee that all the messages of a committed transaction will be consumed all together. This is for several reasons:
- For compacted topics, some messages of a transaction maybe overwritten by newer versions.
- Transactions may straddle log segments. Hence when old segments are deleted, we may lose some messages in the first part of a transaction.
- Consumers may seek to arbitrary points within a transaction, hence missing some of the initial messages.
- Consumer may not consume from all the partitions which participated in a transaction. Hence they will never be able to read all the messages that comprised the transaction.
Key Concepts
To implement transactions, ie. ensuring that a group of messages are produced and consumed atomically, we introduce several new concepts:
- We introduce a new entity called a Transaction Coordinator. Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning PIDs and managing transactions is done by the transaction coordinator.
- We introduce a new internal kafka topic called the Transaction Log. Similar to the Consumer Offsets topic, the transaction log is a persistent and replicated record of every transaction. The transaction log is the state store for the transaction coordinator, with the snapshot of the latest version of the log encapsulating the current state of each active transaction.
- We introduce the notion of Control Messages. These are special messages written to user topics, processed by clients, but never exposed to users. They are used, for instance, to let brokers indicate to consumers if the previously fetched messages have been committed atomically or not. Control messages have been previously proposed here.
- We introduce a notion of AppId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same AppId will be able to resume (or abort) any transactions instantiated by the previous instance.
- We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given AppId, and hence enables us to maintain transaction guarantees in the event of failures.
In additional to the new concepts above, we also introduce new request types, new versions of existing requests, and new versions of the core message format in order to support transactions. The details of all of these will be deferred to other documents.
Data Flow
In the diagram above, the sharp edged boxes represent distinct machines. The rounded boxes at the bottom represent Kafka TopicPartitions, and the diagonally rounded boxes represent logical entities which run inside brokers.
Each arrow represents either an RPC, or a write to a Kafka topic. These operations occur in the sequence indicated by the numbers next to each arrow. The sections below are numbered to match the operations in the diagram above, and describe the operation in question.
1. Finding a transaction coordinator -- the GroupCoordinatorRequest
Since the transaction coordinator is at the center assigning PIDs and managing transactions,the first thing a producer has to do is issue a GroupCoordinatorRequest to any broker to discover the location of its coordinator.
2. Getting a producer Id -- the InitPIDRequest
After discovering the location of its coordinator, the next step is to retrieve the producer’s PID. This is achieved by issuing a InitPIDRequest to the transaction coordinator
2.1 When an AppId is specified
If the transaction.app.id configuration is set, this AppId passed along with the InitPIDRequest, and the mapping to the corresponding PID is logged in the transaction log in step 2a. This enables us to return the same PID for the AppId to future instances of the producer, and hence enables recovering or aborting previously incomplete transactions.
In addition to returning the PID, the InitPIDRequest performs the following tasks:
- Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
- Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.
The handling of the InitPIDRequest is synchronous. Once it returns, the producer can send data and start new transactions.
2.2 When an AppId is not specified
If no AppId is specified in the configuration, a fresh PID is assigned, and the producer only enjoys idempotent semantics and transactional semantics within a single session.
3. Starting a Transaction – The BeginTxnRequest
The new KafkaProducer will have a KafkaProducer.beginTxn method which has to be called to signal the start of a new transaction. Within this method, the producer will send a BeginTxnRequest to the transaction coordinator, which will record the start of the transaction in the transaction log as denoted in step 3a.
4. The consume-transform-produce loop
In this stage, the producer begins to consume-transform-produce the messages that comprise the transaction. This is a long phase and is potentially comprised of multiple requests.
4.1 AddPartitionsToTxnRequest
The producer sends this request to the transaction coordinator the first time a new TopicPartition is written to as part of a transaction. The addition of this TopicPartition to the transaction is logged by the coordinator in step 4.1a. We need this information so that we can write the commit or abort markers to each TopicPartition (see section 5.2 for details).
4.2 ProduceRequest
The producer writes a bunch of messages to the user’s TopicPartitions through one or more ProduceRequests (fired from the send method of the producer). These requests include the PID , epoch, and sequence number as denoted in 4.2a.
4.3 AddOffsetCommitsToTxnRequest
The producer has a new KafkaProducer.sendOffsetsToTransaction API method, which enables the batching of consumed and produced messages. This method takes a Map<TopicPartitions, OffsetAndMetadata> and a groupId argument.
The sendOffsetsToTransaction method sends an AddOffsetCommitsToTxnRequests with the groupId to the transaction coordinator, from which it can deduce the TopicPartition for this consumer group in the internal __consumer-offsets topic. The transaction coordinator logs the addition of this topic partition to the transaction log in step 4.3a.
4.4 TxnOffsetCommitRequest
Also as part of sendOffsets, the producer will send a TxnOffsetCommitRequest to the consumer coordinator to persist the offsets in the __consumer-offsets topic (step 4.4a). The consumer coordinator validates that the producer is allowed to make this request (and is not a zombie) by using the PID and producer epoch which are sent as part of this request.
The consumed offsets are not visible externally until the transaction is committed, the process for which we will discuss now.
5. Committing or Aborting a Transaction
Once the data has been written, the user must call the new commitTransaction or abortTransaction methods of the KafkaProducer. These methods will begin the process of committing or aborting the transaction respectively.
5.1 EndTxnRequest
When a producer is finished with a transaction, the newly introduced KafkaProducer.endTransaction or KafkaProducer.abortTransaction must be called. The former makes the data produced in 4 available to downstream consumers. The latter effectively erases the produced data from the log: it will never be accessible to the user, ie. downstream consumers will read and discard the aborted messages.
Regardless of which producer method is called, the producer issues an EndTxnRequest to the transaction coordinator, with additional data indicating whether the transaction is to be committed or aborted. Upon receiving this request, the coordinator:
- Writes a PREPARE_COMMIT or PREPARE_ABORT message to the transaction log. (step 5.1a)
- Begins the process of writing the command messages known as COMMIT (or ABORT) markers to the user logs through the UpdateTxnRequest. (see section 5.2 below).
- Finally writes the COMMITTED (or ABORTED) message to transaction log. (see 5.3 below).
5.2 UpdateTxnRequest
This request is issued by the transaction coordinator to the the leader of each TopicPartition which is part of the transaction. Upon receiving this request, each broker will write a COMMIT(PID) or ABORT(PID) control message to the log. (step 5.2a)
This message indicates to consumers whether the messages with the given PID must be delivered to the user or dropped. As such, the consumer will buffer messages which have a PID until it reads a corresponding COMMIT or ABORT message, at which point it will deliver or drop the messages respectively.
Note that, if the __consumer-offsets topic is one of the TopicPartitions in the transaction, the commit (or abort) marker is also written to the log, and the consumer coordinator is notified that it needs to materialize these offsets in the case of a commit or ignore them in the case of an abort (step 5.2a on the left).
5.3 Writing the final Commit or Abort Message
After all the commit or abort markers are written the data logs, the transaction coordinator writes the final COMMITTED or ABORTED message to the transaction log, indicating that the transaction is complete (step 5.3 in the diagram). At this point, most of the messages pertaining to the transaction in the transaction log can be removed.
We only need to retain the PID of the completed transaction along with a timestamp, so we can eventually remove the AppId->PID mapping for the producer. See the Expiring PIDs section below.
Compatibility, Deprecation, and Migration Plan
We follow the same approach used in KIP-32. To upgrade from a previous message format version, users should:
Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.
Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.
Notify clients that they can upgrade, BUT should not start using the idempotent / transactional message APIs yet.
[When observed that most of the clients have upgraded] Restart the brokers, with the message format version set to the latest.
Notify upgraded clients that they can now start using the idempotent / transactional message APIs.
The reason for step 3 is to avoid the performance cost for down-converting messages to an older format, which effectively loses the “zero-copy” optimization. Ideally, all consumers are upgraded before the producers even begin writing to the new message format.
Test Plan
The new features will be tested through unit, integration, and system tests.
The integration tests will focus on ensuring that the basic guarantees (outlined in the Summary of Guarantees section) are satisfied across components.
The system tests will focus on ensuring that the guarantees are satisfied even with failing components, ie. that the system works even when consumers, producers, brokers are killed in various states.
We will also add to existing compatibility system tests to ensure that old clients can still talk to the new brokers with the new message format.
Rejected Alternatives
As mentioned earlier, we have a separate design document which explores the design space --including rejected alternatives-- as well as all the implementation details. The latter also includes the specifics of message format changes, new RPCs, error handling, etc.
The design document is available here.