Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This is a server-client integrated change, and it's required to upgrade the broker first with `inter.broker.protocol.version` to the latest. Any produce request with higher version will automatically get fenced because of no support. Because we will use the different transactional id for same topic partitions (breaking EOS),  we lose old transaction information if coordinator location gets switched. Once server is upgraded, an existing application could opt to use this new feature by upgrading to latest version.Right now we have   

To make the upgrade completely compatible with current EOS transaction semantics, we need to be able to distinguish clients who are making progress on the same input source but use different transactional id. It is possible to have two different types of clients within the same consumer group. Imagine a case as a Kafka Streams applications, where half of the instances are using old task producer API, while the other half of them use new consumer group API. This fencing could be done by leveraging  `AddOffsetsToTxnRequest` which contains a consumer group id . By leveraging this information, we and topic partitions. Group coordinator could build a reverse mapping from topic partition to producer.id. Thus In this way, when we upgrade to the new API, group coordinator will be looking at this reverse mapping and send out abort transaction requests to all the affected transaction coordinators, which actively checking this map upon receiving `AddOffsetsToTxnRequest`. If the stored `producer.id` doesn't match the one defined in request, coordinator would send out `ConcurrentProducerCommitException` to stored producer by aborting any ongoing transaction associated with it. This ensures us a smooth upgrade without worrying about old pending transactions.

Besides an active fencing mechanism, we also need to ensure 100% correctness during upgrade. This means no input data should be processed twice, even though we couldn't distinguish the client by transactional id anymore. The solution is to reject consume offset request by sending out PendingTransactionException to new client when there is pending transactional offset commits, so that new client shall start from a clean state instead of relying on transactional id fencing. When client receives PendingTransactionException, it will back-off and retry getting input offset until all the pending transaction offsets are cleared. This is a trade-off between availability and correctness, and in this case the worst case for availability is just hitting the transaction timeout which should be trivial. Compared with other fancy fencing logic based on topic partition ownership, we believe this trade-off is a good deal To remain compatible with current semantic during upgrade, we would proactively reject old version producer if it tries to init transaction towards a topic partition that is owned by some other producer. This may potentially requires a new Request type to let group coordinator inform other transaction coordinator to fence old producer and let them crash immediately.

Rejected Alternatives

  • Producer Pooling:
  • Producer support multiple transactional ids:
  • Tricky rebalance synchronization:

...