Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

d

Table of Contents

Status

Current state: Under discussion

...

Code Block
interface Producer {
  /**
   * This API shall be called for consumer group aware transactional producers.
   */
  void initTransactions(Consumer<byte[], byte[]> consumer); // NEW

  /**
   * No longer need to pass in the consumer group id in a case where we already get access to the consumer state.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) throws ProducerFencedException, IllegalGenerationException; // NEW
}

Here we introduced an intermediate data structure `GroupAssignment` just to make the evolvement easier in case we need to add more identification info during transaction init stage. There are two main differences in the behavior of this API and the pre-existing `initTransactions`:

  • The first is that it is safe to call this API multiple times. In fact, it is required to be invoked after every consumer group rebalance or dynamic assignment. 
  • The second is that it is safe to call after receiving a `ProducerFencedException`. If a producer is fenced, all that is needed is to rejoin the associated consumer group and call this new `initTransactions` API.

The new thread producer API will highly couple with consumer group. We choose to define a new producer config `transactional.group.id`.

Code Block
languagejava
titleProducerConfig.java
public static final String TRANSACTIONAL_GROUP_ID = "transactional.group.id";

This config value will be automatically set to consumer group id on Kafka Streams app. For plain EOS user, you need to manually configure that id to the consumer group id if you are not following the recommend practice below by calling `initTransaction(consumer)`.

The new thread producer API will highly couple with consumer group. We choose to define a new producer config `transactional.group.id`.

Code Block
languagejava
titleProducerConfig.java
public static final String TRANSACTIONAL_GROUP_ID = "transactional.group.id";

This config value will be automatically set to consumer group id on Kafka Streams app. For plain EOS user, you need to manually configure that id to the consumer group id if you are not following the recommended practice below by calling `initTransaction(consumer)`.

We could effectively unset the `transactional.id` config in Kafka Streams We could effectively unset the `transactional.id` config in Kafka Streams because we no longer use it for revoking ongoing transactions. Instead we would stick to consumer group id when we rely on group membership. To enable this, we need to extend the InitProducerId API to support consumer group aware initialization.

...

Code Block
linenumberstrue
  KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
  KafkaProducer producer = new KafkaProducer();
 
  // Will access consumer internal state. Only called once in the app's life cycle after first rebalance.
  // Note that this is a blocking call until consumer successfully joins the group.
  producer.initTransactions(consumer);	
  while (true) {
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

    // Write the records and commit offsets under a single transaction
    producer.beginTransaction();
    for (ProducerRecord record : processed)
      producer.send(record);
    
    producer.sendOffsetsToTransaction(consumedOffsets);

    producer.commitTransaction();
  }

The main points are the following:

  1. Producer calling new initTransaction will be blocked until consumer successfully joins the group and get valid generation for the first time.
  2. Generation.id will be used for group coordinator fencing.
  3. We no longer need to close the producer after a rebalance.

Compatibility, Deprecation, and Migration Plan

This is a server-client integrated change, and it's required to upgrade the broker first with `inter.broker.protocol.version` (IBP) to the latest. Any produce request with higher version will automatically get fenced because of no support. For the case of Kafka Streams, we will apply an automated approach. Upon application initialization, we use admin client to send out an ApiVersionRequest to bootstrap servers to know their current version. If all members' responses indicates that they do support 447 features, we shall initialize the application with thread producer when EOS is on; otherwise we shall use the old producer per task semantics. 

In a tricky case user attempts to downgrade, we shall throw exception and ask application user to restart their clients to use old API.

Some key observations are:

  1. User must be utilizing both consumer and producer as a complete EOS application,
  2. User needs to store transactional offsets inside Kafka group coordinator, not in any other external system for the sake of fencing,
  3. Need to call the new producer.initTransactions(consumer); which passes in a consumer struct for state access during initialization,

Producer no longer needs to call sendOffsetsToTransaction(offsets, consumerGroupId) because we will be able to access consumer group id internally. Instead just pass offsets as single parameter. 

Compatibility, Deprecation, and Migration Plan

It’s extremely hard to preserve two types of stream clients within the same application due to the difficulty of state machine reasoning and fencing. It would be the same ideology for the design of upgrade path: one should never allow task producer and thread producer under the same application group.

Following the above principle, Kafka Streams uses version probing to solve the upgrade problem. Step by step guides are:

  1. Broker must be upgraded to 2.4 first. This means the `inter.broker.protocol.version` (IBP) has to be set to the latest. Any produce request with higher version will automatically get fenced because of no support.
  2. Upgrade the stream application binary and choose to set UPGRADE_FROM_CONFIG config to 2.3 or lower. Do the first rolling bounce, and make sure the group is stable.
  3. Remove that config to make application point to actual kafka client version 2.4. Do the second rolling bounce and now the application officially starts using new thread producer for EOS.

The reason for doing two rolling bounces is because the old transactional producer doesn’t have access to consumer generation, so group coordinator doesn’t have effective way to fence old zombies. By doing first rolling bounce, the task producer will also opt in accessing the consumer state and send TxnOffsetCommitRequest with generation. With this foundational change, it is much safer to execute step 3A minor thing to take care here is that we haven't implemented the ApiVersion request under KIP-117. We shall fulfill this API gap during the implementation.

Rejected Alternatives

  • Producer Pooling:
  • Producer support multiple transactional ids:
  • Tricky rebalance synchronization:
  • We could use admin client to fetch the inter.broker.protocol on start to choose which type of producer they want to use. This approach however is harder than we expected, because brokers maybe on the different versions and if we need user to handle the tricky behavior during upgrade, it would actually be unfavorable. So a hard-coded config is a better option we have at hand.
  • We have considered to leverage transaction coordinator to remember the assignment info for each transactional producer, however this means we are copying the state data into 2 separate locations and could go out of sync easily. We choose to still use group coordinator to do the generation and partition fencing in this case.

...