Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The transactional HW is in the active segment: In this case, there is no issue since only segments that have rolled over participate in the compaction process.
  • The transactional HW is in a dirty portion that has rolled over from the active segment and is thus eligible for compaction: In this case we have two options:
    • Do compaction as usual except that messages that are part of a pending transaction need to be copied over to the "cleaned" log. The cleaner checkpoint should not be moved beyond the transactional HW. i.e., even if compaction can be done past the transactional HW that portion should still be considered "dirty" for the next round.
    • Don't compact past the transactional HW.
    • Both of these approaches require that the transactional HW be maintained on the followers as well. This is not as bad as it sounds since this information can be made  available from the transaction coordinator module. The transactional HW only needs to be approximate. i.e., it could be slightly stale at the expense of achieving slightly less compaction than possible.

 

 

Pipelining

The above transaction flow contains a number of points where the producer may need to block:

  • Response for BeginTransaction request.
  • Response for CommitTransaction request.

So in the absence of pipelining, long transactions are good for producer throughput, but require more buffering in consumers. Short transactions require less buffering in consumers, but adversely impact producer throughput (which may be an acceptable penalty to pay to achieve transactional messaging).

It is possible to support pipelining to some degree (courtesy Raul).  Earlier versions of this write-up describe a concept of transaction batches.  However, that does not actually benefit consumers since transactions in the batch are not exposed by the transaction coordinator until the producer commits the entire batch. It will reduce some load on the transaction coordinator and improve producer throughput but it is probably not worth the effort.

Fencing (a.k.a single-writer requirement)

Some use-cases have the problem of a processor soft-failing in the middle of the send-phase of the transaction (say, due to a long GC) and having a new processor with the same group and producer ID taking over and retrying the transaction. While the retry is in-flight the failed processor may recover and resume its send-phase of the earlier attempt.

This can be addressed by including the producer ID and a generation along with each control record and payload record of the transaction. The producer should store its generation along with the producer's state. When a producer starts up to perform a series of transactions it should increment the generation. The transaction coordinator  and leader brokers of the payload partitions can keep track of the current producer generation of pending transactions and reject any requests that come from a producer with an older generation. The producer ID needs to be included in the control records that the transaction coordinator appends to the __transaction_control journal log but it does not need to be included in the records of the actual data logs.

One nuance to this is when a producer is starting up for the first time and obtains its group state which will be empty and therefore sets its generation to zero. If it soft-fails at that point and a fail-over producer repeats the same process, we could end up with two producers with the same ID and generation. I think this can be addressed simply by having the transaction coordinator ensure that for a given producerId-generation combination, there can be only one producer connection. If it detects this condition, it can close both connections and abort any transaction that may have been initiated. (The leader brokers should also keep track of in-flight              transactions, their associated producerIDs-generations and do the same.  Since the abort from the coordinator can arrive before the producer actually stops sending data the broker can need reject those producer requests since it does not correspond to any valid pending transaction.)

Transactional consumer

  • Consumer configuration will specify whether or not it will consume in READ_COMMITTED or READ_UNCOMMITTED mode.
  • READ_UNCOMMITTED: The only change is that the consumer should ignore the transactional control messages.- In order to support READ_COMMITTED, the consumer will need to maintain a PendingTransactionMap [TxId-Partition] -> messages. This map buffers messages for each transaction/partition combination. It should ideally have the capability of (seamlessly) spilling to disk if it hits a (configurable) memory limit.- The consumer will maintain an internal iterator to iterate over messages as they arrive. It will buffer messages as long as they are part of a pending transaction but   will expose them to the application iterator as soon as it reads a COMMIT for the transaction (or discard them if it reads an ABORT).
  • READ_COMMMITTED:
    • When the consumer iterator implementation encounters a transactional control message for a partition, it does not return the message to the application. Instead, it begins to buffer those messages in the PendingTransactionMap.
    • Offset management: The consumer's offset state for each partition will now be an offset tuple. The first offset is the start offset of the earliest pending transaction in that partition. The second offset is the offset of the internal iterator.
    • We may want to maintain one more offset - which is the internal offset of the transaction that we are currently processing.  This may be required with long transactions - i.e., a consumer may be in the middle of processing a transaction and fail to call poll in time.)
  • Consumer failure
    • If a consumer fails it can delete any state that spilt over to disk (if applicable) and start with a new map.
    • If the previous check-pointed state was (o1, o2) it will resume fetching from o1 and ignore any committed transactions between o1 and o2.

Idempotence

Transaction support should be sufficient to achieve idempotence. However, a producer will need to buffer messages of the transaction until it receives the commit response for that transaction. We can avoid this need to buffer by incorporating sequence numbers in the message header and incorporating pieces of the original idempotent producer proposal.