Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 Each arrow represents either an RPC, or a write to a Kafka topic. These operations occur in the sequence indicated by the numbers next to each arrow. The sections below are numbered to match the operations in the diagram above, and describe the operation in question.

 1. Finding a transaction coordinator -- the GroupCoordinatorRequest

Since the transaction coordinator is at the center assigning PIDs and managing transactions,the first thing a producer has to do is issue a GroupCoordinatorRequest to any broker to discover the location of its coordinator.

 2. Getting a producer Id -- the InitPIDRequest

After discovering the location of its coordinator, the next step is to retrieve the producer’s PID. This is achieved by issuing a InitPIDRequest to the transaction coordinator

 2.1 When an AppId is specified

...

2.2 When an AppId is not specified

 

If no AppId is specified in the configuration, a fresh PID is assigned, and the producer only enjoys idempotent semantics and transactional semantics within a single session.

 3. Starting a Transaction – The BeginTxnRequest

The new KafkaProducer will have a KafkaProducer.beginTxn method which has to be called to signal the start of a new transaction. Within this method, the producer will send a BeginTxnRequest to the transaction coordinator, which will record the start of the transaction in the transaction log as denoted in step 3a.

 

4. The consume-transform-produce loop

...

In this stage, the producer begins to consume-transform-produce the messages that comprise the transaction. This is a long phase and is potentially comprised of multiple requests. 

4.1 AddPartitionsToTxnRequest

 The producer sends this request to the transaction coordinator the first time a new TopicPartition is written to as part of a transaction. The addition of this TopicPartition to the transaction is logged by the coordinator in step 4.1a. We need this information so that we can write the commit or abort markers to each TopicPartition (see section 5.2 for details). 

4.2 ProduceRequest

 The producer writes a bunch of messages to the user’s TopicPartitions through one or more ProduceRequests (fired from the send method of the producer). These requests include the PID , epoch, and sequence number as denoted in 4.2a.

 

...

4.3 AddOffsetCommitsToTxnRequest

 The producer has a new KafkaProducer.sendOffsets API method, which enables the batching of consumed and produced messages. This method takes a Map<TopicPartitions, OffsetAndMetadata> and a groupId argument.

 The sendOffsets method sends an AddOffsetCommitsToTxnRequests with the groupId to the transaction coordinator, from which it can deduce the TopicPartition for this consumer group in the internal __consumer-offsets topic. The transaction coordinator logs the addition of this topic partition to the transaction log in step 4.3a. 

4.4 TxnOffsetCommitRequest

 Also as part of sendOffsets, the producer will send a TxnOffsetCommitRequest to the consumer coordinator to persist the offsets in the __consumer-offsets topic (step 4.4a). The consumer coordinator validates that the producer is allowed to make this request (and is not a zombie) by using the PID and producer epoch which are sent as part of this request.

 The consumed offsets are not visible externally until the transaction is committed, the process for which we will discuss now. 

5. Committing or Aborting a Transaction

...

Once the data has been written, the user must call the new commitTxn or abortTxn methods of the KafkaProducer. These methods will begin the process of committing or aborting the transaction respectively. 

5.1 EndTxnRequest

 When a producer is finished with a transaction, the newly introduced KafkaProducer.endTxn or KafkaProducer.abortTxn must be called. The former makes the data produced in 4 available to downstream consumers. The latter effectively erases the produced data from the log: it will never be accessible to the user, ie. downstream consumers will read and discard the aborted messages.

...

  1.  Writes a PREPARE_COMMIT or PREPARE_ABORT message to the transaction log. (step 5.1a)
  2. Begins the process of writing the command messages known as COMMIT (or ABORT) markers to the user logs through the UpdateTxnRequest. (see section 5.2 below).
  3.  Finally writes the COMMITTED (or ABORTED) message to transaction log. (see 5.3 below).

 

5.2 UpdateTxnRequest

...

This request is issued by the transaction coordinator to the the leader of each TopicPartition which is part of the transaction. Upon receiving this request, each broker will write a COMMIT(PID) or ABORT(PID) control message to the log. (step 5.2a)

 This message indicates to consumers that messages with the given PID must be delivered to the user all together, or not at all. As such, the consumer will buffer messages which have a PID until it reads a corresponding COMMIT or ABORT message, at which point it will deliver or drop the messages respectively.

 Note that, if the __consumer-offsets topic is one of the TopicPartitions in the transaction, the commit (or abort) marker is also written to the log, and the consumer coordinator is notified that it needs to materialize these offsets in the case of a commit or ignore them in the case of an abort (step 5.2a on the left). 

5.3 Writing the final Commit or Abort Message

 After all the commit or abort markers are written the data logs, the transaction coordinator writes the final COMMITTED or ABORTED message to the transaction log, indicating that the transaction is complete (step 5.3 in the diagram). At this point, most of the messages pertaining to the transaction in the transaction log can be removed.

 We only need to retain the PID of the completed transaction along with a timestamp, so we can eventually remove the AppId->PID mapping for the producer. See the Expiring PIDs section below.

 Compatibility, Deprecation, and Migration Plan

We follow the same approach used in KIP-32. To upgrade from a previous message format version, users should:

  1. Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.

  2. Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.

  3. Upgrade all or most clients.

  4. Restart the brokers, with the message format version set to the latest.

The reason for step 3 is to avoid the performance cost for down-converting messages to an older format, which effectively loses the “zero-copy” optimization. Ideally, all consumers are upgraded before the producers even begin writing to the new message format.

Test Plan

The new features will be tested through unit, integration, and system tests.

 The integration tests will focus on ensuring that the basic guarantees (outlined in the Summary of Guarantees section) are satisfied across components.

 The system tests will focus on ensuring that the guarantees are satisfied even with failing components, ie. that the system works even when consumers, producers, brokers are killed in various states.

 We will also add to existing compatibility system tests to ensure that old clients can still talk to the new brokers with the new message format.

Rejected Alternatives

As mentioned earlier, we have a separate design document which explores the design space --including rejected alternatives-- as well as all the implementation details. The latter also includes the specifics of message format changes, new RPCs, error handling, etc. 

 The design document is available here.