Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If the InitProducerId was asked to not abort the ongoing transaction, then the application is only allowed to call .commitTransaction, .abortTransaction, or .completeTransaction calling other methods (e.g. .send) would fail.  This is because the reason to call InitProducerId without aborting the ongoing transaction is to complete a prepared transaction after the producer’s crash.

If the InitProducerId was asked to not abort the ongoing transaction and the transaction is a participant of the 2PC protocol, we don’t limit the transactional timeout by the transaction.max.timeout.ms any more, so if the client passes max integer value, the broker can accept it.

The admin client would support a new method to abort a transaction with a given transactional id.  The method would just execute InitProducerId.

is indeed ongoing, we want to increment the epoch (and potentially allocate new producerId, if the epoch is about to overflow) so that we can fence requests from previous incarnations of the transactional producer. On the other hand, we want to preserve the producerId and epoch of the ongoing transaction, so that we could properly match it to the prepared transaction state stored in the database.  To support that, we need to persist two (producerId, epoch) pairs in the transaction state and return both of them in the InitProducerId response.

If the transaction is a participant of the 2PC protocol, we don’t limit the transactional timeout by the transaction.max.timeout.ms any more, so if the client passes max integer value, the broker can accept it.

The admin client would support a new method to abort a transaction with a given transactional id.  The method would just execute InitProducerId.

A new metric would A new metric would be added to track ongoing transaction time.

...

We will bump the InitProducerId API.  The new request schema is schemas are going to be the following:

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ProducerId Epoch Enable2Pc KeepPreparedTxn
 TransactionalId => NULLABLE_STRING
 TransactionTimeoutMs => INT32
 ProducerId => INT64
 Epoch => INT16
 Enable2Pc => BOOL  // NEW
 KeepPreparedTxn => BOOL  // NEW

Implementation note for InitProducerId API – even though we may not change the state when KeepPreparedTxn is specified, we still need to do a write (of the same state) to make sure we’re not getting data from a stale leader.

We’ll also bump ListTransactionsRequest to support listing only transactions that are running longer than a certain amount of time.

Code Block
ListTransactionsRequest

InitProducerIdResponse => StatesFilterError ProducerIdFilterProducerId RunningLongerThanMs
Epoch StatesFilter OngoingTxnProducerId OngoingTxnEpoch
 Error => INT16
 ProducerId => []STRINGINT64
 ProducerIdFilterEpoch => []STRING
 RunningLongerThanMsINT16
 OngoingTxnProducerId => INT64  // NEW

Metric Changes

A new metric will be added

active-transaction-total-time-max The max time a currently-open transaction has been open

To calculate the metrics we’ll just walk through the ongoing transactions and record the max value.

Client Configuration Changes

transaction.two.phase.commit.enable The default would be ‘false’.  If set to ‘true’, then the broker is informed that the client is participating in two phase commit protocol and can set transaction timeout to values that exceed transaction.max.timeout.ms setting on the broker (if the timeout is not set explicitly on the client and the two phase commit is set to ‘true’ then the transaction never expires).

transaction.timeout.ms The semantics is not changed, but it can be set to values that exceed transaction.max.timeout.ms if two.phase.commit.enable is set to ‘true’.

Broker Configuration Changes

transaction.two.phase.commit.enable The default would be ‘false’.  If it’s ‘false’, 2PC functionality is disabled even if the ACL is set, clients that attempt to use this functionality would receive TRANSACTIONAL_ID_AUTHORIZATION_FAILED error.

KafkaProducer API Changes

New KafkaProducer.PreparedTxnState class is going to be defined as following:

Code Block
languagejava
static public class PreparedTxnState {
  public String toString();
  public PreparedTxnState(String serializedState);
  public PreparedTxnState();
}

The objects of this class can serialize to / deserialize from a string value and can be written to / read from a database.  The implementation is going to store producerId and epoch.

New overloaded method will be added to KafkaProducer:

public void initTransactions(boolean keepPreparedTxn) 

If the value is 'true' then the corresponding field is set in the InitProducerIdRequest and the KafkaProducer object is set into a state which only allows calling .commitTransaction or .abortTransaction.

If the transaction.two.phase.commit.enable setting is set to ‘false’ but keepPreparedTxn is set to ‘true’ then the call fails with the INVALID_TXN_STATE error.

New method will be added to KafkaProducer:

public PreparedTxnState prepareTransaction() 

This would flush all the pending messages and transition the producer into a mode where only .commitTransaction, .abortTransaction, or .completeTransaction could be called.  If the call is successful (all messages successfully got flushed to all partitions) the transaction is prepared.  If the 2PC is not enabled, we return the INVALID_TXN_STATE error.

New method would be added to KafkaProducer:

public void completeTransaction(PreparedTxnState preparedTxnState)

The method would compare the currently prepared transaction state and the state passed in the argument and either commit or abort the transaction.  If the producer is not in prepared state (i.e. either prepareTransaction was called or initTransaction(true) was called) we return an INVALID_TXN_STATE error.

AdminClient API Changes

Class ListTransactionsOptions will support 2 new methods:

public ListTransactionsOptions runningLongerThanMs(long runningLongerThanMs) // set 

public long runningLongerThanMs() // get 

The Admin  interface will support a new method:

public TerminateTransactionResult forceTerminateTransaction(String transactionalId) 

TerminateTransactionResult just contains KafkaFuture<void> result method.

NOTE that there is an existing abortTransaction  method that is used to abort “hanging” transactions (artifact of some gaps in the transaction protocol implementation that will be addressed in KIP-890, i.e. once part 1 of KIP-890 is implemented we won’t have “hanging” transactions).  “Hanging” transactions are not known to the Kafka transaction coordinator, they are just dangling messages in data partitions that cannot be aborted via the normal transaction protocol.  So abortTransaction actually needs information about data partitions so that it could go and insert markers directly there.

On the other hand, the forceTerminateTransaction method would operate on a well-formed, but long running transaction for a given transactional id.  Under the covers it would just use InitProducerId call with keepPreparedTxn=false.

ACL Changes

A new value will be added to the enum AclOperation: TWO_PHASE_COMMIT ((byte) 15 .  When InitProducerId comes with enable2Pc=true, it would have to have both WRITE and TWO_PHASE_COMMIT operation enabled on the transactional id resource.

Command Line Tool Changes

The kafka-transactions.sh --list command is going to support a new flag --runningLongerThanMs that would take the number of milliseconds.  If this flag is provided, only transactions that have been running longer than the specified number of milliseconds would be listed.

The kafka-transactions.sh tool is going to support a new command --forceTerminateTransaction.  It has one required argument --transactionalId that would take the transactional id for the transaction to be terminated.

The kafka-acls.sh tool is going to support a new --operation TwoPhaseCommit.

Compatibility, Deprecation, and Migration Plan

The proposal doesn't remove or update any existing functionality, it just adds new functionality that would only be executed if the new configurations and APIs are used.

Test Plan

The corresponding unit an integration tests will be added.

Rejected Alternatives

Explicit “prepare” RPC

Given that the 2PC protocol is defined in terms of “prepare” and “commit” phases it seems natural to just add a “prepare” RPC to Kafka.  The RPC would tell the Kafka transaction coordinator to transition the transaction into a new “prepared” state (note that our current state names are misleading – PREPARE_COMMIT is actually the “commit” phase).

There are some potential benefits of doing that:

  1. Transactions that haven’t reached “prepared” state can be aborted via timeout.
  2. New updates to “prepared” transactions can be rejected.
  3. InitProducerId would know to not abort prepared transactions.
  4. We could query for prepared transactions.

The disadvantage of an explicit “prepare” state is that we’d need to run a synchronous operation on the Kafka transaction coordinator topic and (if we want to support the benefit 2) send “prepare” markers to all leaders (so that they could bounce off new updates).

At a closer examination the benefits are not really eliminating any complexities:

  1. We still need tooling and operational support to handle transactions that are stuck in the “prepared” state.
  2. The external transaction coordinator would have to keep the state anyway and would know to not send any messages to prepared transactions.
  3. The external transaction coordinator would have to keep the state anyway and can pass this info to InitProducerId (via the keepPreparedTxn flag) instead of keeping this info in 2 places.
  4. The external transaction coordinator has already the knowledge of prepared transactions.

So we decided to keep the “implicit prepare” the same way we have it in Kafka today and avoid extra synchronous operations that would just duplicate the state that is kept in the external transaction coordinator.

HeartBeat RPC

Potentially, it could be good to distinguish between abandoned transactions (i.e. a producer started a 2PC transaction and then died) and just long running transactions.  We could add a heartbeat RPC between the 2 cases.

HeartBeat RPC definitely sounds like a “good thing to do”.  It is not clear, though, what would be the cases when we need to handle these situations differently – a long running transaction is a concern regardless of whether it got abandoned or the application is still working on it (also the application might have a bug or etc.), the operator would still need to investigate what’s going on with the application and determine if it’s safe to abort the transaction without violating “prepared” guarantees.

So it doesn’t seem to justify extra complexity, and we just add tooling for inspection of any long running transactions.

Using TransactionTimeoutMs=MAX_INT Instead of Enable2Pc

Having a Boolean flag and then a timeout value seems redundant, we really need just either one of the other, so technically instead of adding an independent flag we could use a special timeout value to indicate that it’s a 2PC transaction.  This, however, would couple intent with specific implementation; an explicit Boolean seems to reflect the intent better.

Allowing keepPreparedTxn=true without 2PC

Without 2PC the notion of “prepared” transaction is subverted by the transaction.max.timeout.ms so Kafka cannot promise to keep transaction in-doubt until a decision is reached.  So from a purity perspective using keepPreparedTxn=true doesn’t reflect the semantics.

In practice, however, Flink already effectively has a way to keep a “prepared” transaction by using reflection, so if we want to support all the following properties:

  1. Get rid of reflection use in Flink and move to public API in new versions of Flink
  2. Support new versions of Flink in Kafka clusters that don’t want to grant 2PC privileges.

Then we need to enable using keepPreparedTxn=true even if 2PC is disabled.

Using Partition Offsets as PreparedTxnState

With KIP-890, we can identify transactions as {producerId, epoch}.  Without KIP-890 the only way to remember a prepared transaction state is to keep track of partitions and their offsets.  The first transaction offsets are already tracked on the partition leaders, so this information is present in the cluster in some form but getting it requires calling partitions leader, which brings complexity that doesn’t exist with KIP-890.

Another disadvantage is that the PrepareTxnState isn’t bounded if it tracks partitions and their offsets – some producers would produce data into topics with a large number of partitions, some producers would produce data into topics with a small number of partitions.  Thus, the solution could work with some configurations (e.g. it was tested with VARCHAR(255)) and break when topic-partition properties get changed.  On the other hand, {producerId, epoch} has a small and fixed size.

Support Multiple Concurrent Transactions Per Producer


 OngoingTxnEpoch => INT16  // NEW 

We’ll also bump ListTransactionsRequest to support listing only transactions that are running longer than a certain amount of time:

Code Block
ListTransactionsRequest => StatesFilter ProducerIdFilter RunningLongerThanMs
 StatesFilter => []STRING
 ProducerIdFilter => []STRING
 RunningLongerThanMs => INT64 // NEW

Persisted Data Format Changes

We need to store an additional (producerId, epoch) pair in TransactionalLogValue record, so we add 2 new tagged fields without bumping the version, so that it is possible to downgrade to a version that supports KIP-915:

Code Block
languagejs
{
  "type": "data",
  "name": "TransactionLogValue",
  "validVersions": "0-1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "TransactionProducerId", "type": "int64", "versions": "0+",  // Renamed
      "about": "Producer id of the current / last transaction"},
    { "name": "TransactionProducerEpoch", "type": "int16", "versions": "0+",  // Renamed
      "about": "Epoch associated with the producer id of the current / last transaction"},
    { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
      "about": "Transaction timeout in milliseconds"},
    { "name": "TransactionStatus", "type": "int8", "versions": "0+",
      "about": "TransactionState the transaction is in"},
    { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0+", "nullableVersions": "0+",
      "about": "Set of partitions involved in the transaction", "fields": [
      { "name": "Topic", "type": "string", "versions": "0+"},
      { "name": "PartitionIds", "type": "[]int32", "versions": "0+"}]},
    { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+",
      "about": "Time the transaction was last updated"},
    { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
      "about": "Time the transaction was started"},
    { "name": "ProducerId", "type": "int64", "tag": 0, "taggedVersions": "1+",  // New
      "about": "Producer id in use by the transactional id." },
    { "name": "ProducerEpoch", "type": "int16", "tag": 1, "taggedVersions": "1+",  // New
      "about": "Producer id in use by the transactional id." }
  ]
}

Note that the (producerId, epoch) pair that corresponds to the ongoing transaction is going to be written instead of the existing ProducerId and ProducerEpoch fields (which are renamed to reflect the semantics) to support downgrade.

Metric Changes

A new metric will be added

active-transaction-total-time-max The max time a currently-open transaction has been open

To calculate the metrics we’ll just walk through the ongoing transactions and record the max value.

Client Configuration Changes

transaction.two.phase.commit.enable The default would be ‘false’.  If set to ‘true’, then the broker is informed that the client is participating in two phase commit protocol and can set transaction timeout to values that exceed transaction.max.timeout.ms setting on the broker (if the timeout is not set explicitly on the client and the two phase commit is set to ‘true’ then the transaction never expires).

transaction.timeout.ms The semantics is not changed, but it can be set to values that exceed transaction.max.timeout.ms if two.phase.commit.enable is set to ‘true’.

Broker Configuration Changes

transaction.two.phase.commit.enable The default would be ‘false’.  If it’s ‘false’, 2PC functionality is disabled even if the ACL is set, clients that attempt to use this functionality would receive TRANSACTIONAL_ID_AUTHORIZATION_FAILED error.

KafkaProducer API Changes

New KafkaProducer.PreparedTxnState class is going to be defined as following:

Code Block
languagejava
static public class PreparedTxnState {
  public String toString();
  public PreparedTxnState(String serializedState);
  public PreparedTxnState();
}

The objects of this class can serialize to / deserialize from a string value and can be written to / read from a database.  The implementation is going to store producerId and epoch.

New overloaded method will be added to KafkaProducer:

public void initTransactions(boolean keepPreparedTxn) 

If the value is 'true' then the corresponding field is set in the InitProducerIdRequest and the KafkaProducer object is set into a state which only allows calling .commitTransaction or .abortTransaction.

If the transaction.two.phase.commit.enable setting is set to ‘false’ but keepPreparedTxn is set to ‘true’ then the call fails with the INVALID_TXN_STATE error.

New method will be added to KafkaProducer:

public PreparedTxnState prepareTransaction() 

This would flush all the pending messages and transition the producer into a mode where only .commitTransaction, .abortTransaction, or .completeTransaction could be called.  If the call is successful (all messages successfully got flushed to all partitions) the transaction is prepared.  If the 2PC is not enabled, we return the INVALID_TXN_STATE error.

New method would be added to KafkaProducer:

public void completeTransaction(PreparedTxnState preparedTxnState)

The method would compare the currently prepared transaction state and the state passed in the argument and either commit or abort the transaction.  If the producer is not in prepared state (i.e. either prepareTransaction was called or initTransaction(true) was called) we return an INVALID_TXN_STATE error.

AdminClient API Changes

Class ListTransactionsOptions will support 2 new methods:

public ListTransactionsOptions runningLongerThanMs(long runningLongerThanMs) // set 

public long runningLongerThanMs() // get 

The Admin  interface will support a new method:

public TerminateTransactionResult forceTerminateTransaction(String transactionalId) 

TerminateTransactionResult just contains KafkaFuture<void> result method.

NOTE that there is an existing abortTransaction  method that is used to abort “hanging” transactions (artifact of some gaps in the transaction protocol implementation that will be addressed in KIP-890, i.e. once part 1 of KIP-890 is implemented we won’t have “hanging” transactions).  “Hanging” transactions are not known to the Kafka transaction coordinator, they are just dangling messages in data partitions that cannot be aborted via the normal transaction protocol.  So abortTransaction actually needs information about data partitions so that it could go and insert markers directly there.

On the other hand, the forceTerminateTransaction method would operate on a well-formed, but long running transaction for a given transactional id.  Under the covers it would just use InitProducerId call with keepPreparedTxn=false.

ACL Changes

A new value will be added to the enum AclOperation: TWO_PHASE_COMMIT ((byte) 15 .  When InitProducerId comes with enable2Pc=true, it would have to have both WRITE and TWO_PHASE_COMMIT operation enabled on the transactional id resource.

Command Line Tool Changes

The kafka-transactions.sh --list command is going to support a new flag --runningLongerThanMs that would take the number of milliseconds.  If this flag is provided, only transactions that have been running longer than the specified number of milliseconds would be listed.

The kafka-transactions.sh tool is going to support a new command --forceTerminateTransaction.  It has one required argument --transactionalId that would take the transactional id for the transaction to be terminated.

The kafka-acls.sh tool is going to support a new --operation TwoPhaseCommit.

Compatibility, Deprecation, and Migration Plan

The proposal doesn't remove or update any existing functionality, it just adds new functionality that would only be executed if the new configurations and APIs are used.

The broker can be downgraded to versions that support KIP-915.  We write the TransactionProducerId and TransactionProducerEpoch that correspond to the ongoing transaction in place of the ProducerId and ProducerEpoch fields, so an old broker would be able to properly commit or abort the transaction.

Test Plan

The corresponding unit an integration tests will be added.

Rejected Alternatives

Explicit “prepare” RPC

Given that the 2PC protocol is defined in terms of “prepare” and “commit” phases it seems natural to just add a “prepare” RPC to Kafka.  The RPC would tell the Kafka transaction coordinator to transition the transaction into a new “prepared” state (note that our current state names are misleading – PREPARE_COMMIT is actually the “commit” phase).

There are some potential benefits of doing that:

  1. Transactions that haven’t reached “prepared” state can be aborted via timeout.
  2. New updates to “prepared” transactions can be rejected.
  3. InitProducerId would know to not abort prepared transactions.
  4. We could query for prepared transactions.

The disadvantage of an explicit “prepare” state is that we’d need to run a synchronous operation on the Kafka transaction coordinator topic and (if we want to support the benefit 2) send “prepare” markers to all leaders (so that they could bounce off new updates).

At a closer examination the benefits are not really eliminating any complexities:

  1. We still need tooling and operational support to handle transactions that are stuck in the “prepared” state.
  2. The external transaction coordinator would have to keep the state anyway and would know to not send any messages to prepared transactions.
  3. The external transaction coordinator would have to keep the state anyway and can pass this info to InitProducerId (via the keepPreparedTxn flag) instead of keeping this info in 2 places.
  4. The external transaction coordinator has already the knowledge of prepared transactions.

So we decided to keep the “implicit prepare” the same way we have it in Kafka today and avoid extra synchronous operations that would just duplicate the state that is kept in the external transaction coordinator.

HeartBeat RPC

Potentially, it could be good to distinguish between abandoned transactions (i.e. a producer started a 2PC transaction and then died) and just long running transactions.  We could add a heartbeat RPC between the 2 cases.

HeartBeat RPC definitely sounds like a “good thing to do”.  It is not clear, though, what would be the cases when we need to handle these situations differently – a long running transaction is a concern regardless of whether it got abandoned or the application is still working on it (also the application might have a bug or etc.), the operator would still need to investigate what’s going on with the application and determine if it’s safe to abort the transaction without violating “prepared” guarantees.

So it doesn’t seem to justify extra complexity, and we just add tooling for inspection of any long running transactions.

Using TransactionTimeoutMs=MAX_INT Instead of Enable2Pc

Having a Boolean flag and then a timeout value seems redundant, we really need just either one of the other, so technically instead of adding an independent flag we could use a special timeout value to indicate that it’s a 2PC transaction.  This, however, would couple intent with specific implementation; an explicit Boolean seems to reflect the intent better.

Allowing keepPreparedTxn=true without 2PC

Without 2PC the notion of “prepared” transaction is subverted by the transaction.max.timeout.ms so Kafka cannot promise to keep transaction in-doubt until a decision is reached.  So from a purity perspective using keepPreparedTxn=true doesn’t reflect the semantics.

In practice, however, Flink already effectively has a way to keep a “prepared” transaction by using reflection, so if we want to support all the following properties:

  1. Get rid of reflection use in Flink and move to public API in new versions of Flink
  2. Support new versions of Flink in Kafka clusters that don’t want to grant 2PC privileges.

Then we need to enable using keepPreparedTxn=true even if 2PC is disabled.

Using Partition Offsets as PreparedTxnState

With KIP-890, we can identify transactions as {producerId, epoch}.  Without KIP-890 the only way to remember a prepared transaction state is to keep track of partitions and their offsets.  The first transaction offsets are already tracked on the partition leaders, so this information is present in the cluster in some form but getting it requires calling partitions leader, which brings complexity that doesn’t exist with KIP-890.

Another disadvantage is that the PrepareTxnState isn’t bounded if it tracks partitions and their offsets – some producers would produce data into topics with a large number of partitions, some producers would produce data into topics with a small number of partitions.  Thus, the solution could work with some configurations (e.g. it was tested with VARCHAR(255)) and break when topic-partition properties get changed.  On the other hand, {producerId, epoch} has a small and fixed size.

Support Multiple Concurrent Transactions Per Producer

Currently Kafka supports one transaction per producer, which may limit concurrency.  It should be in principle possible to implement support for multiple concurrent transactions, but it seems to be an independent large improvement that deserves its own KIP that should be proposed separately.  If such functionality is implemented in Kafka we could amend 2PC to work with multiple transactions.

Skip Epoch Bump When KeepPreparedTxn=true

Epoch bumping for a prepared transaction has some complexity – need to keep another (producerId, epoch) pair, so to avoid this complexity we considered just keeping the original epoch if we need to keep the prepared transaction.  This actually works for fencing zombie messages because a corresponding abort or commit would bump the epoch anyway with the KIP-890.  However, if we have a split brain (two producer instances running concurrently with the same transactionalId) we could run into scenarios where they can stomp on each other and commit incorrect data.  For example:

  1. (instance1) InitProducerId(keepPreparedTxn=true), got epoch=42
  2. (instance2) InitProducerId(keepPreparedTxn=true), got epoch=42
  3. (instance1) CommitTxn, epoch bumped to 43
  4. (instance2) CommitTxn, this is considered a retry, so it got epoch 43 as well
  5. (instance1) Produce messageA w/sequence 1
  6. (instance2) Produce messageB w/sequence 1, this is considered a duplicate
  7. (instance2) Produce messageC w/sequence 2
  8. (instance1) Produce messageB w/sequence 2, this is considered a duplicate

Now if either of those commit the transaction, it would have a mix of messages from 2 instances.  With the proper epoch bump, instance1 would get fenced at step 3Currently Kafka supports one transaction per producer, which may limit concurrency.  It should be in principle possible to implement support for multiple concurrent transactions, but it seems to be an independent large improvement that deserves its own KIP that should be proposed separately.  If such functionality is implemented in Kafka we could amend 2PC to work with multiple transactions.