Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

VerificationFailureRate – rateof verifications that returned in failure either from the AddPartitionsToTxn response or through errors in the manager.

Proposed Changes

Bump Epoch on Each Transaction for New Clients (1)

In order to provide better guarantees around transactions, we should change semantics to ALWAYS bump the epoch upon the commit or abort of a transaction on newer clients (those with a higher produce version). This will allow us to uniquely identify a transaction with a producer ID and epoch. By being able to unique identify a transaction, we can better tell where one transaction ends and the next begins. This would cover the case where a message from a previous transaction incorrectly gets added to a new one and the hanging transaction case.

As we do now, we will ensure that any produce requests are using the correct epoch. Messages from previous transactions will be fenced because they will have an older epoch.

Upon the end of the transaction, the client sends the EndTxnRequest. When the transaction coordinator receives this, it will write the prepare commit message with a bumped epoch and send WriteTxnMarkerRequests with the bumped epoch. We write the bumped epoch for compatibility so if we downgrade or switch coordinators to an older version, the markers will still be written correctly. Finally, it will send the EndTxnResponse with the bumped epoch (and producer ID if we overflow the epoch) to the client. Newer clients will read this epoch and set it as their own -- using it for the next transaction.

Transaction Feature Version (0)

Similar to Metadata Version, we will introduce a transaction version using the features component introduced by KIP-584: Versioning scheme for features

This version will be used to gate the ability to use flexible fields in transactional state records – work that was started by KIP-915: Txn and Group Coordinator Downgrade Foundation and the ability to turn on the changes 1 and 2 below.

Transaction Version will behave similarly to Metadata Version as it will be dynamically changeable.

Bump Epoch on Each Transaction for New Clients (1)

In order to provide better guarantees around transactions, we should change semantics to ALWAYS bump the epoch upon the commit or abort of a transaction on newer clients (those with a higher produce version). This will allow us to uniquely identify a transaction with a producer ID and epoch. By being able to unique identify a transaction, we can better tell where one transaction ends and the next begins. This would cover the case where a message from a previous transaction incorrectly gets added to a new one and the hanging transaction case.

As we do now, we will ensure that any produce requests are using the correct epoch. Messages from previous transactions will be fenced because they will have an older epoch.

Upon the end of the transaction, the client sends the EndTxnRequest. When the transaction coordinator receives this, it will write the prepare commit message with a bumped epoch and send WriteTxnMarkerRequests with the bumped epoch. We write the bumped epoch for compatibility so if we downgrade or switch coordinators to an older version, the markers will still be written correctly. Finally, it will send the EndTxnResponse with the bumped epoch (and producer ID if we overflow the epoch) to the client. Newer clients will read this epoch and set it as their own -- using it for the next transaction.


For more detail – consider the epoch bump cases for the transactional state records For more detail – consider the epoch bump cases for the transactional state records and EndTxn responses where we do and do not see epoch overflow:

Say we have producer ID x and epoch y. When we overflow epoch y we get producer ID z.

PREPARE
producerId: x
*previous/lastProducerId (tagged field): x
nextProducerId (tagged field): empty or z if y will overflow
producerEpoch: y + 1

  • Non-overflow: Return epoch y + 1 and producer ID x

  • Overflow: Return epoch 0 and producer ID z for overflow in EndTxnResponse

  • Keep the previous producer ID field as x. We use this field to signify that a new client + new server (with epoch bump) set the field.
  • If we retry and see epoch - 1 + and producer ID x in last seen fields and are issuing the same command (ie commit not abort) we can return (with the new epoch)


COMPLETE
producerId: x or z if y overflowed
*previous/lastProducerId (tagged field): x
nextProducerId (tagged field): empty
producerEpoch: y + 1 or 0 if we overflowed

  • Non-overflow: Set the producer ID to x

  • Overflow: Set producer ID to z (from nextProducerId field), set epoch to 0, and nextProduceId can go back to empty.

  • Keep the previous producer ID field to x.

  • If we retry and see epoch max - 1 + ID in last seen fields and are issuing the same command we can return (with current producer ID and epoch)

...

New clients will have a bumped produce version and receive a valid Transaction Version supporting the feature that indicates they do not have to send AddPartitionsToTxn requests. Before this, the client will need to continue to send AddPartitionsToTxn and AddOffsetCommitsToTxn requests. Clients should continue using the older version of request (v3).

...

New clients will also take advantage of clearly defined retriable and abortable errors. These changes will apply to both Produce and TxnOffsetCommit requests

New clients will know that brokers support the new version with the newly created transaction feature.

Old ClientsOld Clients

Produce Requests

For the current Java client, we would return the already existing INVALID_TXN_STATE error with a message indicating the transaction was not ongoing. This error will result in the batch failing, aborting the transaction and the sequence number being adjusted as to not cause issues with out of order sequence. Non-Java clients should also have this handling, but if not a comparable approach can be used. Some clients treat this error as fatal, and that is preferable to writing the state. In many cases, the producer issuing this request is a zombie and abortable/fatal distinction should not matter.

...

There are slightly different semantics for TxnOffsetCommits and how they handle errors.  INVALID_TXN_STATE and INVALID_PID_MAPPING are fatal for this request. These will be fatal. All the retriable errors will be converted to COORDINATOR_NOT_AVAILABLE since  NOT_ENOUGH_REPLICAS is not handled. These will not contain the unique error message since the api does not support that.

...

New Servers

Attempting to send verifications to older brokers will be a no-op, we can just append as normal.

Downgrading a broker or switching to a broker without the new code for handling the new records should gracefully ignore the tagged fields and write the commit markers with the bumped epoch. This should not affect old brokers or consumers.

Attempting to use a new client with old server code that doesn't add partitions will result in the new client falling back to old behavior. It is possible that some brokers will have old code and some will have new code. In this case we can not ignore the add partitions calls for new clients. In this case, we should fail with an abortable error and signal to the client the old protocol should be used.

Test Plan

Unit/Integration testing will be done to test the various hanging transaction scenarios described above. Tests will also be done to ensure client compatibility between various versions.

...

New servers will indicate their ability to support the new protocol with a new transaction feature version. This is similar to metadata version, but will be used for transactional features only. The transaction feature version will be used to signal 1) that flexible fields can be written in the transactional state topic and 2) the new protocol (epoch bumps and add partitions optimization) can be used

Consider a few scenarios in the course of an upgrade – at the beginning of a transaction, the client will determine whether it should use the old or new protocol based on the client itself and latest epoch TV it sees from the brokers it is connected to.

  • Any case with old client (new or old image/TV on brokers) → old protocol (client doesn’t support)

  • New client, old image and TV on brokers → old protocol (server TV is too low)

  • New client, new image but not new MV on brokers → old protocol (server TV is too low)

  • New client, new image + TV on at least one broker → new protocol (client supports and data broker has new TV – even if not all brokers have TV, it has the code to support)

Before requests, we also will check the latest epoch TV. If we see a new one, we will wait for the transaction to complete before upgrading. If we see a lower MV, we should abort the transaction and start again with the old protocol.

Old Servers

Attempting to send verifications to older brokers will be a no-op, we can just append as normal.

Downgrading a broker or switching to a broker without the new code for handling the new records should gracefully ignore the tagged fields and write the commit markers with the bumped epoch. This should not affect old brokers or consumers.

Attempting to use a new client with old server code that doesn't add partitions will result in the new client falling back to old behavior. It is possible that some brokers will have old code and some will have new code. In this case we can not ignore the add partitions calls for new clients. In this case, we should fail with an abortable error and signal to the client the old protocol should be used as described above

Test Plan

Unit/Integration testing will be done to test the various hanging transaction scenarios described above. Tests will also be done to ensure client compatibility between various image and transaction feature versions.


Rejected Alternatives

Use Metadata Version to gate features

Introducing a separate feature gives greater control over the rollout process and allows us greater control over turning the feature off. We don’t foresee a reason to require us turning off/downgrading, but it feels better to have the option and the ability to not affect/block other features when making any changes. Creating a new feature paves the way for other areas to create their own features (ie, Group Coordinator for KIP-848) This usage is consistent with the original intent of KIP-584.

In the overflow case, have a CompleteCommit record with the old producer ID + the new producer ID written in a pseudo InitProducerIdRecord

...