Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There are a few enhancements that we need to make to the offset manager and the compacted __consumer_offsets topic in order to support this. First, the compacted topic will now contain transactional control records as well. We will need to come up with an eviction strategy for these control records. Second, the offset manager needs to become transaction-aware - specifically, an offset fetch request should return an error if the group is associated with a pending transaction.

Transaction coordinator

  • The transaction coordinator is the leader of a specific partition of the __transaction_control topic (i.e., the journal log). It is the point of entry for initiating, committing and aborting transactions. It maintains the following (in-memory) state:
  • A high-watermark (of the journal log) corresponding to the BEGIN record of the first in-flight transaction. The transaction coordinator can periodically checkpoint this high-watermark to ZooKeeper.
  • For all in-flight transactions that follow the HW in the journal log:
    • The list of topic-partitions (of the payload) of the transaction.
    • Timeout of the transaction.
    • The producer ID associated with the transaction and its generation.

We need to ensure that whatever retention policy is in place does not delete a log segment if the transactional HW is within that log segment. This could just be an       additional parameter to the retention policy. i.e., segments including and following that offset should not be deleted.

Transaction flow

(Assuming no failures at each step.)

InitPhase

  1. Producer: Determine which broker is the coordinator for its group.- Producer: Send a BeginTransaction(producerId, generation, partitions... ) request to the coordinator and await response. We could provide a variation of this API that   also includes a time-out. If the producer needs to commit its consumer state as part of the transaction it will need to include the relevant partition of the __consumer_offsets topic in the BeginTransaction request.
  2. Broker: Generate a TxId.
  3. Coordinator: Append a BEGIN(TxId, producerId, generation, partitions...) record to the journal log and send response.
  4. Producer: Read back response (which will include the TxId).
  5. Coordinator (and followers): Update in-memory state of in-flight transactions and data partitions of the transaction.

SendPhase

Producer: Send transaction payloads (i.e., records) to the leader brokers of the data partitions. Each record will contain the TxId and TxCtl fields. The TxCtl is really only required to mark the final commit (or abort). The producer request envelope will include the producer ID and generation as well, but these will not be appended to the data logs.

EndPhase (When the producer is ready to commit a transaction,)

  1. Producer: Send an OffsetCommitRequest to commit the input state associated with the end of that transaction (i.e., input for start of next transaction).
  2. Producer: Send a CommitTransaction(TxId, producerId, generation) request to the coordinator and await response. (A non-error response indicates to the producer that the transaction will be committed.)
  3. Coordinator: Append the PREPARE_COMMIT(TxId) request to the journal log and then send a response to the producer.
  4. Coordinator: Send a CommitTransaction(TxId, partitions...) request to every leader broker (for records in the transaction payload).
  5. Leader brokers:
    1. If this is a leader for a partition of a topic other than __consumer_offsets: Upon receiving a CommitTransaction(TxId, partition1, partition2, ...) request it will    append an empty (null) record (i.e., no key/value) to the respective partition and set the TxId and TxCtl (to COMMITTED) fields of the record. The leader broker will then respond to the coordinator.
    2. If this is a leader for a partition of __consumer_offsets: append a record to the partition with key set to G-LAST-COMMIT, value set to TxId. It should also set the   TxId and TxCtl fields of the record. The broker will then respond to the coordinator.  
  6. Coordinator: Append a COMMITTED(TxId) request to the journal log.
  7. Coordinator (and followers): Advance HW if possible (see above for details of the HW).

Aborting a transaction

A transaction may be aborted by a producer due to failures while sending the transaction's payload records. A transaction may also be aborted by the coordinator when it has not been committed within a configurable timeout period.

  • Producer: Send an AbortTransaction(TxId) request to the coordinator and await response. (A non-error response indicates to the producer that the transaction will be aborted.)
  • Coordinator: Append a PREPARE_ABORT(TxId) record to the journal log and then send a response to the producer.
  • Coordinator: Send a AbortTransaction(TxId, partitions...) request to every leader broker (for records in the transaction payload). (The abort-action on the receiving brokers is similar to the above for commits.)

Failure cases (in basic transaction flow)

  • Timeout or error response when producer sends BeginTransaction(TxId): producer simply retries (with same TxId).
  • Broker-side error while the producer is sending the data: The producer should abort (and subsequently redo) the transaction (with a new TxId). If the producer does not  abort the transaction, the coordinator will abort the transaction after the transaction's timeout period. Redoing the transaction is required only in the case of an error for which the request data may have been appended and replicated to the follower. For example, a producer request timeout would require a redo while a NotLeaderForPartitionException does not require a redo.
  • Timeout or error response when producer sends CommitTransaction(TxId): producer simply retries (with same TxId).
  • Producer failure while a transaction is pending: if the coordinator is in a position to detect a closed socket (when it sends the response to the BeginTransactionRequest) then it can proactively abort the transaction.  Otherwise the transaction will be aborted after its timeout period.
  • Coordinator failure: i.e., when the coordinator moves to another broker (i.e., leadership of a journal partition moves). The coordinator will scan the log from the last checkpointed HW. If there are any transactions that were in PREPARE_COMMIT or PREPARE_ABORT, the new coordinator will redo the COMMIT and ABORT. Note that transactions that are in-flight when a coordinator goes down don't necessarily need to be aborted - i.e., the producer can just send its CommitTransactionRequest to the new coordinator.

Do transactions make sense for compacted topics?

Compacted topics discard earlier records with the same key during the compaction process. Is this legal if those records are part of a transaction? It is perhaps a bit weird but may not be too harmful since the rationale for using the compaction policy within a topic is to retain the latest update for keyed data.

Still, some messages from within a transaction can get compacted out while others remain. i.e., it is possible that a consumer (that is lagging behind) at an offset within a compacted segment will only see some messages from a transaction. So if that application was (say) updating some table and the messages in the transaction correspond to different keys then this scenario could result in an inconsistent view of the database. i.e., this is a caveat to keep in mind when using transactions within compacted topics.

Compacting topics with transactional messages

The transactional HW is the offset of the earliest pending transaction. The issue is, do we allow messages from committed transactions after the transactional HW (or non- transactional messages) to participate in a round of compaction. I think the answer is yes. There are two cases to consider:

  • The transactional HW is in the active segment: In this case, there is no issue since only segments that have rolled over participate in the compaction process.
  • The transactional HW is in a dirty portion that has rolled over from the active segment and is thus eligible for compaction: In this case we have two options:
    • Do compaction as usual except that messages that are part of a pending transaction need to be copied over to the "cleaned" log. The cleaner checkpoint should not be moved beyond the transactional HW. i.e., even if compaction can be done past the transactional HW that portion should still be considered "dirty" for the next round.
    • Don't compact past the transactional HW.
    • Both of these approaches require that the transactional HW be maintained on the followers as well. This is not as bad as it sounds since this information can be made  available from the transaction coordinator module. The transactional HW only needs to be approximate. i.e., it could be slightly stale at the expense of achieving slightly less compaction than possible.