Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

Table of Contents

Kafka currently provides at-least-once messaging guarantees. Duplicates can arise due to either producer retries or consumer restarts after failure. One way to provide exactly-once messaging semantics is to implement an idempotent producer. This has been covered at length in the proposal for an Idempotent Producer. An alternative and more general approach is to support transactional messaging. This can enable use-cases such as replicated logging for transactional data services in addition to the classic idempotent producer use-cases.

...

  1. Producer: Determine which broker is the coordinator for its group.
  2. - Producer: Send a BeginTransaction(producerId, generation, partitions... ) request to the coordinator and await response. We could provide a variation of this API that   that also includes a time-out. If the producer needs to commit its consumer state as part of the transaction it will need to include the relevant partition of the __consumer_offsets topic in the BeginTransaction request.
  3. Broker: Generate a TxId.
  4. Coordinator: Append a BEGIN(TxId, producerId, generation, partitions...) record to the journal log and send response.
  5. Producer: Read back response (which will include the TxId).
  6. Coordinator (and followers): Update in-memory state of in-flight transactions and data partitions of the transaction.

...

  • Timeout or error response when producer sends BeginTransaction(TxId): producer simply retries (with same TxId).
  • Broker-side error while the producer is sending the data: The producer should abort (and subsequently redo) the transaction (with a new TxId). If the producer does not  abort the transaction, the coordinator will abort the transaction after the transaction's timeout period. Redoing the transaction is required only in the case of an error for which the request data may have been appended and replicated to the follower. For example, a producer request timeout would require a redo while a NotLeaderForPartitionException does not require a redo.
  • Timeout or error response when producer sends CommitTransaction(TxId): producer simply retries the transaction (with same TxId). (However, see section on idempotence further down.)
  • Producer failure while a transaction is pending: if the coordinator is in a position to detect a closed socket (when it sends the response to the BeginTransactionRequest) then it can proactively abort the transaction.  Otherwise the transaction will be aborted after its timeout period.
  • Coordinator failure: i.e., when the coordinator moves to another broker (i.e., leadership of a journal partition moves). The coordinator will scan the log from the last checkpointed HW. If there are any transactions that were in PREPARE_COMMIT or PREPARE_ABORT, the new coordinator will redo the COMMIT and ABORT. Note that transactions that are in-flight when a coordinator goes down don't necessarily need to be aborted - i.e., the producer can just send its CommitTransactionRequest to the new coordinator.

...

  • Consumer configuration will specify whether or not it will consume in READ_COMMITTED or READ_UNCOMMITTED mode.
  • READ_UNCOMMITTED: The only change is that the consumer should ignore the transactional control messages.
  • - In order to support READ_COMMITTED, the consumer will need to maintain a PendingTransactionMap [TxId-Partition] -> messages. This map buffers messages for each transaction/partition combination. It should ideally have the capability of (seamlessly) spilling to disk if it hits a (configurable) memory limit.-
  • The consumer will maintain an internal iterator to iterate over messages as they arrive. It will buffer messages as long as they are part of a pending transaction but   but will expose them to the application iterator as soon as it reads a COMMIT for the transaction (or discard them if it reads an ABORT).
  • READ_COMMMITTED:
    • When the consumer iterator implementation encounters a transactional control message for a partition, it does not return the message to the application. Instead, it begins to buffer those messages in the PendingTransactionMap.
    • Offset management: The consumer's offset state for each partition will now be an offset tuple. The first offset is the start offset of the earliest pending transaction in that partition. The second offset is the offset of the internal iterator.
    • We may want to maintain one more offset - which is the internal offset of the transaction that we are currently processing.   This may be required with long transactions - i.e., a consumer may be in the middle of processing a transaction and fail to call poll in time.)
  • Consumer failure
    • If a consumer fails it can delete any state that spilt over to disk (if applicable) and start with a new map.
    • If the previous check-pointed state was (o1, o2) it will resume fetching from o1 and ignore any committed transactions between o1 and o2.

...

Transaction support should be sufficient to achieve idempotence. However, a producer will need to buffer messages of the transaction until it receives the commit response for that transaction. We can avoid this need to buffer by incorporating sequence numbers in the message header and incorporating pieces of the original idempotent producer proposal. In fact, we would probably want to have some form of the idempotent producer to be available (stand-alone) to avoid aborting long-running transactions that run into (say) a temporary network glitch during the data send phase.