Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: AcceptedAdopted (2.6.0)

Discussion thread: here

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8587

...

This architecture does not scale well as the number of input partitions increases. Every producer come with separate memory buffers, a separate thread, separate network connections. This limits the performance of the producer since we cannot effectively use the output of multiple tasks to improve batching. It also causes unneeded load on brokers since there are more concurrent transactions and more redundant metadata management.


It's strongly recommended to read the detailed design doc for better understanding the internal changes. This KIP only presents high level ideas.

Proposed Changes

The root of the problem is that transaction coordinators have no knowledge of consumer group semantics. They simply do not understand that partitions can be moved between processes. Let's take a look at a sample exactly-once use case, which is quoted from KIP-98

...

Code Block
interface Producer {
   /**
     * Sends a list of specified offsets to the consumer group coordinator, and also marks
     * those offsets as part of the current transaction. These offsets will be considered
     * committed only if the transaction is committed successfully. The committed offset should
     * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
     * <p>
     * This method should be used when you need to batch consumed and produced messages
     * together, typically in a consume-transform-produce pattern. Thus, the specified
     * {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used
     * {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false}
     * and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
     * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
     *
     * This API won't deprecate the existing {@link KafkaProducer#sendOffsetsToTransaction(Map, String) sendOffsets} API as standalone
     * mode EOS applications are still relying on it. If the broker doesn't support the new underlying transactional API, the callcaller will be automatically
     * downgraded to ignore consumer metadata, while in the meantime a warning shall be loggedcrash.
	 *
     * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
     * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
     * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException  fatal error indicating the message
     *         format used for the offsets topic on the broker does not support transactions
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized. See the exception for more details
     * @throws org.apache.kafka.common.errors.IllegalGenerationException if the passed in consumer metadata has illegal generation
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if the passed in consumer metadata has a fenced group.instance.id
     * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
     *         other unexpected error
     */ 
   void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException, IllegalGenerationException, FencedInstanceIdException; // NEW
}

...

Code Block
public class StreamsConfig {

    public static final String EXACTLY_ONCE_BETA = "exactly_once_beta";

}

Compatibility, Deprecation, and Migration Plan

It’s extremely hard to preserve two types of stream clients within the same application due to the difficulty of state machine reasoning and fencing. It would be the same ideology for the design of upgrade path: one should never allow task producer and thread producer under the same application group.

Following the above principle, two rounds of rolling bounced for all Kafka Streams instances are required as follows:

Additionally, we remove the task-level metrics "commit-latency-max" and "commit-latency-avg" because the current committing of tasks that consists of multiple steps (like flushing, committing offsets/transactions, writing checkpoint files) is not done on a per-task level any longer, but the different steps are split out into individual phased over all tasks and committing offsets and transactions is unified into one step for all tasks at once. Because those metics cannot be collected in a useful way any longer, we cannot deprecate the metrics and remove in a future release, but need to remove them directly without a deprecation period.

Compatibility, Deprecation, and Migration Plan

Given the new fetch offset fencing mechanism, to opt-into the new producer per thread model, first this new fencing mechanism must be enabled, before the existing producer side fencing can be disabled.

Following the above principle, two rounds of rolling bounced for all Kafka Streams instances are required as follows:

  1. Broker must be upgraded to 2.5 first
  2. Upgrade the stream application binary and keep the PROCESSING_GUARATNEE setting at "exactly_once". Do the first rolling bounce, and make sure the group is stable with every instance on 2.6 binary.
  3. Upgrade the PROCESSING_GUARANTEE setting to "exaclty_once_beta" and do a second rolling bounce to starts using
  4. Broker must be upgraded to 2.5 first.
  5. Upgrade the stream application binary and choose to set UPGRADE_FROM_CONFIG config to 2.5 (note that 2.5 refers to the Streams version, not the broker version) or lower and set PROCESSING_GUARATNEE to "exactly_once_beta". Do the first rolling bounce, and make sure the group is stable with every instance on 2.6 binary.
  6. Remove/unset UPGRADE_FROM_CONFIG, in order to point the application to actual Kafka client version. Do a second rolling bounce and now the application officially starts using new thread producer for EOS.

The reason for doing two rolling bounces is because the old (ie, 2.5) transactional producer doesn’t have access to consumer generation, so group coordinator doesn’t have an effective way to fence old zombies. By doing first rolling bounce, the task producer will also opt-in accessing the consumer state and send TxnOffsetCommitRequest with generation. With this foundational change, it is much safer save to execute step 3.

***Note that the above recommended upgrade path is for users who need consistency guarantee. For users who don't worry about consistency, step 2 & 3 could be combined into a single rolling bounce with Kafka client library upgrade. The application should resume work without problem.rolling bounce with Kafka client library upgrade. The application should resume work without problem.

Of course, enabling producer per thread is optional, and thus, uses can also just stay with the "exctly_once" configuration (or with "at_least_once" if exactly-once semantics are not used to begin with). For both cases, it is not required to upgrade the brokers to 2.5, and a single rolling bounce upgrade of the Kafka Streams applications is sufficient.

Note: It is also possible to switch back from "exactly_once_beta" to "exactly_once" with a single round of rolling bounces.

A downgrade from 2.6 "exaclty_once_beta" to 2.5 (that only supports "exaclty_once") requires two rolling bounced (i.e., follow the upgrade path in reverse order).

Non-stream EOS  Upgrade

As for non-stream streams users, they would require following steps:

...

As stated in the upgrade path, if the broker version is too old, we shall not enable thread producer even running with Kafka Streams 2.5. This shall be done by a query of the inter.broker.version during Stream instance startup to decide whether we go with new thread producer or old task producer for safety6. If you enable "exaclty_once_beta" against pre 2.5 brokers, Kafka Streams will raise an error.

Rejected Alternatives

  • We could use admin client to fetch the inter.broker.protocol on start to choose which type of producer they want to use. This approach however is harder than we expected, because brokers maybe on the different versions and if we need user to handle the tricky behavior during upgrade, it would actually be unfavorable. So a hard-coded config is a better option we have at hand.
  • We have considered to leverage transaction coordinator to remember the assignment info for each transactional producer, however this means we are copying the state data into 2 separate locations and could go out of sync easily. We choose to still use group coordinator to do the generation and partition fencing in this case.
  • We once decided to use a transactional.group.id config to replace the transactional.id, and consolidate all the transaction states for an application under one transactional coordinator. This use case is no longer needed once we rely on group coordinator to do the fencing, and we could re-implement it any time in the future with new upgrade procedure safely.

...