Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Draft in progressAdopted (2.6.0)

Discussion thread:   here

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8587

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Exactly one once semantics (EOS) provides transactional message processing guarantees. Producers can write to multiple partitions atomically so that either all writes succeed or all writes fail. This can be used in the context of stream processing frameworks, such as Kafka Streams, to ensure exactly once processing between topics.

...

This architecture does not scale well as the number of input partitions increases. Every producer come with separate memory buffers, a separate thread, separate network connections. This limits the performance of the producer since we cannot effectively use the output of multiple tasks to improve batching. It also causes unneeded load on brokers since there are more concurrent transactions and more redundant metadata management.


It's strongly recommended to read the detailed design doc for better understanding the internal changes. This KIP only presents high level ideas.

Proposed Changes

We argue that the root The root of the problem is twofold:

...

that transaction coordinators have no knowledge of consumer group semantics. They simply do not understand that partitions can be moved between processes.

...

 Let's take a look at a sample exactly-once use case, which is quoted from KIP-98

Code Block
languagejava
titleKafkaTransactionsExample.java
linenumberstrue
public class KafkaTransactionsExample {
  
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
 
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
    producer.initTransactions();
     
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        producer.beginTransaction();
        	
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord);
        }
    
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
        
        producer.endTransaction();
      }
    }
  }
}

As one could see, the first thing when a producer starts up is to register its identity through initTransactions API. Transaction coordinator leverages this step in order to fence producers using the same transactional.id and to ensure that previous transactions must complete. In the above template, we call consumer.poll() to get data, and internally for the very first time we start doing so, consumer needs to know the input topic offset. This is done by a FetchOffset call to group coordinator. With transactional processing, there could be offsets that are "pending", I.E they are part of some ongoing transactions. Upon receiving FetchOffset request, broker will export offset position to the "latest stable offset" (LSO), which is the largest offset that has already been committed when consumer isolation.level is `read_committed`. Since we rely on unique transactional.id to revoke stale transaction, we believe any pending transaction will be aborted when producer calls initTransaction again. During normal use case such as Kafka Streams, we will also explicitly close producer to send out a EndTransaction request to make sure we start from clean state.


Image Added

This approach is no longer safe when we allow topic partitions to move around transactional producers, since transactional coordinator doesn't know about partition assignment and producer won't call initTransaction again during its life cycle. Omitting pending offsets and proceed could introduce duplicate processing. The proposed solution is to reject FetchOffset request by sending out a new exception called PendingTransactionException to new client when there is pending transactional offset commits, so that old transaction will eventually expire due to transaction timeout. After expiration, transaction coordinator will take care of writing abort transaction markers and bump the producer epoch. When client receives PendingTransactionException, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness. The worst case for availability loss is just waiting for transaction timeout when the last generation producer wasn’t shut down gracefully, which should be rare.

Below is the new approach we discussed:


Image Added

Note that the current default transaction.timeout is set to one minute, which is too long for Kafka Streams EOS use cases. Considering the default commit interval was set to only 100 milliseconds, we would doom to hit session timeout if we don't actively commit offsets during that tight window. So we suggest to shrink the transaction timeout to be the same default value as session timeout (10 seconds) on Kafka Streams, to reduce the potential performance loss for offset fetch delay when some instances accidentally crash.

Public Interfaces

The main addition of this KIP is a new variant of the current sendOffsetsToTransaction API which gives us access to the consumer group states, such as generation.id, member.id and group.instance.id, etc.

Code Block
interface Producer {
   /**
     * Sends a list of specified offsets to the consumer group coordinator, and also marks
     * those offsets as part of the current transaction. These offsets will be considered
     * committed only if the transaction is committed successfully. The committed offset should
     * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
     * <p>
     * This method should be used when you need to batch consumed and produced messages
     * together, typically in a consume-transform-produce pattern. Thus, the specified
     * {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used
     * {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false}
     * and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
     * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
     *
     * This API won't deprecate the existing {@link KafkaProducer#sendOffsetsToTransaction(Map, String) sendOffsets} API as standalone
     * mode EOS applications are still relying on it. If the broker doesn't support the new underlying transactional API, the caller will crash.
	 *
     * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
     * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
     * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException  fatal error indicating the message
     *         format used for the offsets topic on the broker does not support transactions
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized. See the exception for more details
     * @throws org.apache.kafka.common.errors.IllegalGenerationException if the passed in consumer metadata has illegal generation
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if the passed in consumer metadata has a fenced group.instance.id
     * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
     *         other unexpected error
     */ 
   void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException, IllegalGenerationException, FencedInstanceIdException; // NEW
}

Shrink transactional.timeout

We shall set `transaction.timout.ms` default to 10000 ms (10 seconds) on Kafka Streams. For non-stream users, we highly recommend you to do the same if you want to use the new semantics.

Shrink abort timeout transaction scheduler interval

We shall set `transaction.abort.timed.out.transaction.cleanup.interval.ms` default to 10000 ms (10 seconds) on broker side (previously one minute). This config controls the scheduled interval for purging expired transactions, which we need to tune more frequently to timeout zombie transactions.

Offset Fetch Request 

We will add a new error code for consumer to wait for pending transaction clearance. In order to be able to return corresponding exceptions for old/new clients, we shall also bump the OffsetFetch protocol version.

Code Block
PENDING_TRANSACTION(85, "There are pending transactions for the offset topic that need to be cleared", PendingTransactionException::new),

In the meantime, this offset fetch back-off should be only applied to EOS use cases, not general offset fetch use case such as admin client access. We shall also define a flag within offset fetch request so that we only trigger back-off logic when the request is involved in EOS, I.E on isolation level read_committed.

Code Block
OffsetFetchRequest => Partitions GroupId WaitTransaction
  Partitions          => List<TopicPartition>
  GroupId             => String
  WaitTransaction     => Boolean // NEW

The PendingTransactionException could also be surfaced to all the public APIs in Consumer:

Code Block
languagejava
titleKafkaConsumer.java
    /**
     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
     * This method may issue a remote call to the server if there is no current position for the given partition.
     * <p>
     * This call will block until either the position could be determined or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     *
     * @param partition The partition to get the position for
     * @param timeout   The maximum duration of the method
     *
     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
     *             the partition
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
+    * @throws org.apache.kafka.common.errors.PendingTransactionException if there are other pending transactional commits
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     */
    long position(TopicPartition partition, Duration timeout);

    /**
     * Get the last committed offset for the given partition (whether the commit happened by this process or
     * another). This offset will be used as the position for the consumer in the event of a failure.
     * <p>
     * This call will block to do a remote call to get the latest committed offsets from the server.
     *
     * @param partition The partition to check
     * @param timeout   The maximum duration of the method
     *
     * @return The last committed offset and metadata or null if there was no prior commit
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
+    * @throws org.apache.kafka.common.errors.PendingTransactionException if there are other pending transactional commits
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
     */
    OffsetAndMetadata committed(TopicPartition partition, final Duration timeout);

    /**
     * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
     * subscribed to any topics or partitions before polling for data.
     * <p>
     * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
     * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
     * offset for the subscribed list of partitions
     *
     * <p>
     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
     * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
     * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
     *
     *
     * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
     *
     * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
     *
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
     *             partitions is undefined or out of range and no offset reset policy has been configured
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
     *             topics or to the configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
     *             session timeout, errors deserializing key/value pairs, your rebalance callback thrown exceptions,
     *             or any new error cases in future versions)
     * @throws java.lang.IllegalArgumentException if the timeout value is negative
     * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
     *             partitions to consume from
     * @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
     * @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid
     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
+    * @throws org.apache.kafka.common.errors.PendingTransactionException if there are other pending transactional commits
     */
    @Override
    public ConsumerRecords<K, V> poll(final Duration timeout) {
        return poll(time.timer(timeout), true);
    }

Fence Zombie

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of sync client.

To help get access to consumer state for txn producer, consumer will expose a new API for some of its internal states as an opaque struct. This is already done by KIP-429, and we just showcase the some high level structure here for convenience.

Code Block
languagejava
ConsumerGroupMetadata {
  public String groupId();
 
  public int generationId();
 
  public String memberId();
 
  public Optional<String> groupInstanceId();
}

As we see, the metadata exposed contains member.id, group.instance.id and generation.id, which are essentially the identifiers we use in the normal offset commit protocol. To be able to keep track of the latest metadata information, we will add a top-level API to consumer:

Code Block
languagejava
titleConsumer.java
ConsumerGroupMetadata groupMetadata();

So that EOS users could get refreshed group metadata as needed.


To pass the information to broker, member.id, group.instance.id and generation.id field shall be added to `TxnOffsetCommitRequest`,  which makes txn offset commit fencing consistent with normal offset fencing. 

Code Block
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
  TransactionalId     => String
  GroupId             => String
  ProducerId  		  => int64		
  ProducerEpoch       => int16
  Offsets  	          => Map<TopicPartition, CommittedOffset>
  GenerationId        => int32, default -1 // NEW
  MemberId			  => nullable String // NEW
  GroupInstanceId 	  => nullable String // NEW

If one of the field is not matching correctly on server side, the client will be fenced immediately. An edge case is defined as:

Code Block
1. Client A tries to commit offsets for topic partition P1, but haven't got the chance to do txn offset commit before a long GC.
2. Client A gets out of sync and becomes a zombie due to session timeout, group rebalanced. 
3. Another client B was assigned with P1.
4. Client B doesn't see pending offsets because A hasn't committed anything, so it will proceed with potentially `pending` input data
5. Client A was back online, and continue trying to do txn commit. Here if we have generation.id, we will catch it!


And here is a recommended non-EOS usage example:

Code Block
linenumberstrue
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  // Recommend a smaller txn timeout, for example 10 seconds.
  producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10000);
  KafkaProducer producer = new KafkaProducer(producerConfig);

  producer.initTransactions();	
  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

    producer.commitTransaction();
  }

Some key observations are:

  1. User must be utilizing both consumer and producer as a complete EOS application,
  2. User needs to store transactional offsets inside Kafka group coordinator, not in any other external system for the sake of fencing,
  3. Producer needs to call sendOffsetsToTransaction(offsets, groupMetadata) to be able to fence properly.

For Kafka Streams that needs to be backward compatible to older broker versions, we add a new config value for config parameter PROCESSING_GUARANTEE that we call "exaclty_once_beta". Hence, users with new broker versions can opt-in the new feature (ie, using a single producer per thread for EOS instead of a producer per task).

Code Block
public class StreamsConfig {

    public static final String EXACTLY_ONCE_BETA = "exactly_once_beta";

}

Additionally, we remove the task-level metrics "commit-latency-max" and "commit-latency-avg" because the current committing of tasks that consists of multiple steps (like flushing, committing offsets/transactions, writing checkpoint files) is not done on a per-task level any longer, but the different steps are split out into individual phased over all tasks and committing offsets and transactions is unified into one step for all tasks at once. Because those metics cannot be collected in a useful way any longer, we cannot deprecate the metrics and remove in a future release, but need to remove them directly without a deprecation period.

Compatibility, Deprecation, and Migration Plan

Given the new fetch offset fencing mechanism, to opt-into the new producer per thread model, first this new fencing mechanism must be enabled, before the existing producer side fencing can be disabled.

Following the above principle, two rounds of rolling bounced for all Kafka Streams instances are required as follows:

  1. Broker must be upgraded to 2.5 first
  2. Upgrade the stream application binary and keep the PROCESSING_GUARATNEE setting at "exactly_once". Do the first rolling bounce, and make sure the group is stable with every instance on 2.6 binary.
  3. Upgrade the PROCESSING_GUARANTEE setting to "exaclty_once_beta" and do a second rolling bounce to starts using new thread producer for EOS.

The reason for doing two rolling bounces is because the old (ie, 2.5) transactional producer doesn’t have access to consumer generation, so group coordinator doesn’t have an effective way to fence old zombies. By doing first rolling bounce, the task producer will opt-in accessing the consumer state and send TxnOffsetCommitRequest with generation. With this foundational change, it is save to execute step 3.

***Note that the above recommended upgrade path is for users who need consistency guarantee. For users who don't worry about consistency, step 2 & 3 could be combined into a single rolling bounce with Kafka client library upgrade. The application should resume work without problem.

Of course, enabling producer per thread is optional, and thus, uses can also just stay with the "exctly_once" configuration (or with "at_least_once" if exactly-once semantics are not used to begin with). For both cases, it is not required to upgrade the brokers to 2.5, and a single rolling bounce upgrade of the Kafka Streams applications is sufficient.

Note: It is also possible to switch back from "exactly_once_beta" to "exactly_once" with a single round of rolling bounces.

A downgrade from 2.6 "exaclty_once_beta" to 2.5 (that only supports "exaclty_once") requires two rolling bounced (i.e., follow the upgrade path in reverse order).

Non-stream EOS  Upgrade

As for non-streams users, they would require following steps:

  1. Same as stream, upgrade the broker version to 2.5
  2. Change `sendOffsetsToTransaction` to the new version. Note the service would crash if the detected broker version through txn offset commit protocol is lower than 2.5.
  3. Rolling bounce the application

Since we couldn't predict all the implementation cases, this upgrade guide is not guaranteed to be safe for online operation, and there would be a risk of state inconsistency/data loss.

Working with Older Brokers

As stated in the upgrade path, if the broker version is too old, we shall not enable thread producer even running with Kafka Streams 2.6. If you enable "exaclty_once_beta" against pre 2.5 brokers, Kafka Streams will raise an error.

Rejected Alternatives

  • We could use admin client to fetch the inter.broker.protocol on start to choose which type of producer they want to use. This approach however is harder than we expected, because brokers maybe on the different versions and if we need user to handle the tricky behavior during upgrade, it would actually be unfavorable. So a hard-coded config is a better option we have at hand.
  • We have considered to leverage transaction coordinator to remember the assignment info for each transactional producer, however this means we are copying the state data into 2 separate locations and could go out of sync easily. We choose to still use group coordinator to do the generation and partition fencing in this case.
  • We once decided to use a transactional.group.id config to replace the transactional.id, and consolidate all the transaction states for an application under one transactional coordinator. This use case is no longer needed once we rely on group coordinator to do the fencing, and we could re-implement it any time in the future with new upgrade procedure safely.

Our proposal is to make transaction coordinators aware of consumer group assignments. Rather than distributing the transactional state by routing every transactional Id to a separate coordinator, we will use the consumer group id to identify a single transaction coordinator which is responsible for managing the state. This gives the coordinator the ability to understand which partitions are being consumed by each member of the group. This can then be used to safely coordinate assignment changes.

We use the initTransactions API currently in order to fence producers using the same transactional Id and to ensure that previous transactions have been completed. We will add a new initTransactions API which accepts the set of assigned partitions and the associated consumer group Id. This will be passed to the transaction coordinator in the InitProducerId call, and will be stored with the other transaction state.

Essentially the problem we are trying to solve is making the coordinator aware of the dependencies between processes that come as a result of partition reassignment. When handling the InitProducerId request, the coordinator will use the previous partition assignment of the consumer group to check which transactions need to be completed before it is safe to begin processing. The coordinator will then ensure that only one producer for each assigned partition is allowed to make progress at any time.

Public Interfaces

The main addition of this KIP is a new variant of the current initTransactions API which provides the set of partitions that were assigned in the latest rebalance.

Code Block
interface Producer {
  /**
   * Initialize transactional state for the producer with the partitions assigned
   * in the consumer group rebalance. This call ensures that any transactions
   * involving committed offsets from the set of input partitions must be completed
   * before this call returns. 
   *
   * Unlike the no-arg initTransactions() API, this can be called multiple times
   * on the same instance. Typically it should be called immediately after receiving
   * a new partition assignment from the group coordinator.
   */
  void initTransactions(Set<TopicPartition> inputPartitions, String consumerGroupId, int generationId);
}

There are two main differences in the behavior of this API and the pre-existing `initTransactions`:

  • The first is that it is safe to call this API multiple times. In fact, it is required to be invoked after every consumer group rebalance. 
  • The second is that it is safe to call after receiving a `ProducerFencedException`. If a producer is fenced, all that is needed is to rejoin the associated consumer group and call this new `initTransactions` API.

The key to this proposal is allowing a single transaction coordinator to see the assignments of all members in the group. It can then maintain the invariant that only one producer is allowed to make progress at any time for a particular input partition. To enable this, we need two protocol changes. First we need to update the FindCoordinator API to support lookup of the transaction coordinator using the consumer group Id. Second, we need to extend the InitProducerId API to support consumer group aware initialization.

The schema for FindCoordinator does not need to change, but we need to add a new coordinator type

Code Block
FindCoordinatorRequest => CoordinatorKey CoordinatorType
  CoordinatorKey => STRING
  CoordinatorType => INT8 // 0 -> Consumer group coordinator, 1 -> Transaction coordinator, 2 -> Transaction "group" coordinator

Below we provide the new InitProducerId schema:

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ConsumerGroupId AssignedPartitions
  TransactionalId => NullableString
  TransactionTimeoutMs => Int64
  ConsumerGroupId => NullableString         // NEW
  ConsumerGroupGeneration => Int32          // NEW
  AssignedPartitions => [Topic [Partition]] // NEW
    Topic => String
    Partition => Int32

InitProducerIdResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch
  ThrottleTimeMs => Int64
  ErrorCode => Int16
  ProducerId => Int64
  ProducerEpoch => Int16

The new InitProducerId API accepts either a user-configured transactional Id or a consumer group Id and the set of assigned partitions. When a consumer group is provided, the transaction coordinator will check whether there are any ongoing transactions that include the assigned partitions. If there are, these transactions will be aborted and the corresponding producers will be fenced by bumping their epochs. Once transactions are complete, the call will return.

Fencing

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we include the consumer group generation. Once the coordinator observes a generation bump for a group, it will refuse to handle requests from the previous generation. The only thing other group members can do is call InitProducerId themselves. This in fact would be the common case since transactions will usually be completed before a consumer joins a rebalance.

In order to pass the group generationId to `initTransaction`, we need to expose it from the consumer. We propose to add an overload to onPartitionsAssigned in the consumer's rebalance listener interface:

Code Block
public interface ConsumerRebalanceListener {
  default void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
    onPartitionsAssigned(partitions);
  }
}

With this proposal, the transactional id is no longer needed for proper fencing, but the coordinator still needs a way to identify producers individually as they are executing new transactions. There are two options: continue using the transactional id or use the producer id which is generated by the coordinator in InitProducerId. Either way, the main challenge is authorization. We currently use the transaction Id to authorize transactional operations. In this KIP, we will keep this model unchanged. The producer must still provide a transactional Id, but now the only requirement is that it is defined uniquely for each producer in the application. It is no longer tied to exactly once guarantees.

We also need to change the on-disk format for transaction state in order to persist both the consumer group id and the assigned partitions. We propose to use a separate record type in order to store the group assignment. Transaction state records will not change.

Code Block
Key => GroupId TransactionalId
  GroupId => String
  TransactionalId => String

Value => GenerationId AssignedPartitions
  GenerationId => Int32
  AssignedPartitions => [Topic [Partition]]
    Topic => String
    Partition => Int32

To be able to upgrade Kafka Streams application to leverage this new feature, a new config shall be introduced to control the producer upgrade decision:

Code Block
languagejava
titleStreamsConfig.java
public static boolean CONSUMER_GROUP_AWARE_TRANSACTION = "consumer.group.aware.transaction"; // default to false

When set to true and exactly-once is turned on, Kafka Streams application will choose to use single producer per thread.

Compatibility fencing

To fence an old producer accessing the same topic partition, we will introduce a new exception type:

Code Block
languagejava
titleErrors.java
CONCURRENT_PRODUCER_COMMIT(85, "This producer attempted to commit offset to a topic partition which is owned by another producer in this generation.", ConcurrentProducerCommitException::new),

And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back off:

Code Block
languagejava
titleErrors.java
PENDING_TRANSACTION(85, "Could not consume from this topic partition due to pending transactions going on.", PendingTransactionException::new),

Will discuss in more details in Compatibility section.

Example

Below we provide an example of a simple read-process-write loop with consumer group-aware EOS processing.

Code Block
linenumberstrue
  String groupId = "group";
  Set<String> topics = buildSubscription();  
  KafkaConsumer consumer = new KafkaConsumer(buildConsumerConfig(groupId));
  KafkaProducer producer = new KafkaProducer(buildProducerConfig());

  consumer.subscribe(topics, new ConsumerRebalanceListener() {
      void onPartitionsAssigned(Collection<TopicPartition> partitions, int generationId) {
        // On assignment, call initTransactions() in order to ensure any
        // transactions involving committed offsets from the assigned partitions
        // have been completed
        producer.initTransactions(partitions, consumerGroupId, generationId);
      }
  });

  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000));
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    producer.sendOffsetsToTransaction(consumedOffsets, groupId);
    producer.commitTransaction();
  }

The main points are the following:

  1. The new initTransactions API is used in the ConsumerRebalanceListener passed to subscribe.
  2. We no longer need to close the producer after a rebalance. We can call initTransactions multiple times.

Compatibility, Deprecation, and Migration Plan

This is a server-client integrated change, and it's required to upgrade the broker first with `inter.broker.protocol.version` to the latest. Any produce request with higher version will automatically get fenced because of no support.  

To make the upgrade completely compatible with current EOS transaction semantics, we need to be able to distinguish clients who are making progress on the same input source but use different transactional id. It is possible to have two different types of clients within the same consumer group. Imagine a case as a Kafka Streams applications, where half of the instances are using old task producer API, while the other half of them use new consumer group API. This fencing could be done by leveraging  `AddOffsetsToTxnRequest` which contains a consumer group id and topic partitions. Group coordinator could build a reverse mapping from topic partition to producer.id. In this way, when we upgrade to the new API, group coordinator will be actively checking this map upon receiving `AddOffsetsToTxnRequest`. If the stored `producer.id` doesn't match the one defined in request, coordinator would send out `ConcurrentProducerCommitException` to stored producer by aborting any ongoing transaction associated with it. This ensures us a smooth upgrade without worrying about old pending transactions.

Besides an active fencing mechanism, we also need to ensure 100% correctness during upgrade. This means no input data should be processed twice, even though we couldn't distinguish the client by transactional id anymore. The solution is to reject consume offset request by sending out PendingTransactionException to new client when there is pending transactional offset commits, so that new client shall start from a clean state instead of relying on transactional id fencing. When client receives PendingTransactionException, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness, and in this case the worst case for availability is just hitting the transaction timeout which should be trivial. Compared with other fancy fencing logic based on topic partition ownership, we believe this trade-off is a good deal.

Rejected Alternatives

  • Producer Pooling:
  • Producer support multiple transactional ids:
  • Tricky rebalance synchronization: