Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This architecture does not scale well as the number of input partitions increases. Every producer come with separate memory buffers, a separate thread, separate network connections. This limits the performance of the producer since we cannot effectively use the output of multiple tasks to improve batching. It also causes unneeded load on brokers since there are more concurrent transactions and more redundant metadata management.

...

It's strongly recommended to read the detailed design doc

...

 for better

...

understanding the internal changes

...

. This KIP only presents high level ideas.

Proposed Changes

The root of the problem is that transaction coordinators have no knowledge of consumer group semantics. They simply do not understand that partitions can be moved between processes. Let's take a look at a sample exactly-once use case, which is quoted from KIP-98

...

Transaction coordinator uses the initTransactions API currently in order to fence producers using the same transactional Id and to ensure that previous transactions have been completed. We propose to switch this guarantee on group coordinator instead.   In the above template, we call consumer.poll() to get data, but and internally for the very first time we start doing so, we need consumer needs to know the input topic offset. This is done by a FetchOffset call to group coordinator. With transactional processing, there could be offsets that are "pending", I.E they are part of some ongoing transaction. Upon receiving FetchOffset request, broker will export offset position to the "latest stable offset" (LSO), which is the largest offset that has already been committed. Since we rely on unique transactional.id to revoke stale transaction, we believe any pending transaction will be aborted when producer calls initTransaction again. During normal use case such as Kafka Streams, we will also explicitly close producer to send out a EndTransaction request to make sure we start from clean state.


It This approach is no longer safe to do so when we allow topic partitions to move around transactional producers, since transactional coordinator doesn't know about partition assignment and producer won't call initTransaction again during its lifecyclelife cycle. Omitting pending offsets and proceed could introduce duplicate processing. The proposed solution proposed solution is to reject FetchOffset request by sending out PendingTransactionException out a new exception called PendingTransactionException to new client when there is pending transactional offset commits, so that old transaction will eventually expire due to transaction timeout. timeoutAfter expiration, and txn transaction coordinator will take care of writing abort transaction markers and failure records, etc. Since it would be an unknown exception for bump the producer epoch. For old consumers, we will choose to send a COORDINATOR_LOAD_IN_PROGRESS exception to let it retry, too.   When client receives PendingTransactionException or COORDINATOR_LOAD_IN_PROGRESS, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness, and in this case the . The worst case for availability loss is just waiting for transaction timeout when the last generation producer wasn’t shut down gracefully, which should be rare.

Below is the new approach we introduce here.discussed:


Note that the current default transaction.timeout is set to one minute, which is too long for Kafka Streams EOS use cases. It is because Considering the default commit interval was set to only 100 msmilliseconds, and we would first doom to hit session timeout if we don't actively commit offsets during that tight window. So we suggest to shrink the transaction timeout to be the same default value as session timeout (10 seconds), to reduce the potential performance loss for offset fetch delay when some instance instances accidentally crashescrash.

Public Interfaces

The main addition of this KIP is a new variant of the current initTransactions API which gives us access to the consumer group states, such as member state and generation.id.

Code Block
interface Producer {
  /**
   * This API shall be called for consumer group aware transactional producers.
   */
  void initTransactions(Consumer<byte[], byte[]> consumer); // NEW

  /**
   * No longer need to pass in the consumer group id in a case where we already get access to the consumer state.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) throws ProducerFencedException, IllegalGenerationException; // NEW
}

Shrinking transactional.timeout

We shall set `transaction.timout.ms` default to 10000 ms (10 seconds).

Fencing zombie

A zombie process may invoke InitProducerId after falling out of the consumer group. In order to distinguish zombie requests, we need to leverage group coordinator to fence out of generation client.

...

  1. Broker must be upgraded to 2.4 first. This means the `inter.broker.protocol.version` (IBP) has to be set to the latest. Any produce request with higher version will automatically get fenced because of no support.
  2. Upgrade the stream application binary and choose to set UPGRADE_FROM_CONFIG config to 2.3 or lower. Do the first rolling bounce, and make sure the group is stable.
  3. Remove that config to make application point to actual kafka Kafka client version 2.4. Do the second rolling bounce and now the application officially starts using new thread producer for EOS.

...