Status

Current stateDraft

Discussion thread: here

JIRA: here

Released: 0.4.0

Motivation

DL can provide "at-least-once" delivery semantic but not "exactly-once" delivery semantic. That means that a message can be delivered one or more times if the reader doesn't handle duplicates.
The duplicates come from two places, one is at writer side (this assumes using write proxy not the core library), while the other one is at reader side.
  •  writer side: if the client attempts to write a record to the write proxies and gets a network error (e.g timeouts) then retries, the retrying will potentially result in duplicates.
  • reader side:if the reader reads a message from a stream and then crashes, when the reader restarts it would restart from last known position (DLSN). If the reader fails after processing a record and before recording the position, the processed record will be delivered again.
The reader problem can be properly addressed by making use of the sequence numbers of records and doing proper checkpointing. For example, in database, it can checkpoint the indexed data with the sequence number of records; in flink, it can checkpoint the state with the sequence numbers.
The writer problem can be addressed by implementing an idempotent writer. However, an alternative and more powerful approach is to support transactions.
What does transaction mean?

transaction means a collection of records can be written transactionally within a stream or across multiple streams. They will be consumed by the reader together when a transaction is committed, or will never be consumed by the reader when the transaction is aborted.
The transaction will expose following guarantees:
  • The reader should not be exposed to records written from uncommitted transactions (mandatory)
  • The reader should consume the records in the transaction commit order rather than the record written order (mandatory)
  • No duplicated records within a transaction (mandatory)
  • Allow interleaving transactional writes and non-transactional writes (optional) 
Stream Transaction & Namespace Transaction

There will be two types of transaction, one is Stream level transaction (local transaction), while the other one is Namespace level transaction (global transaction).
The stream level transaction is a transactional operation on writing records to one stream; the namespace level transaction is a transactional operation on writing records to multiple streams.

Public Interfaces

TBD

Proposed Changes

Implementation Thoughts

  • transaction is consist of begin control record, a series of data records and commit/abort control record.
  • The begin/commit/abort control record is written to a `commit` log stream, while the data records will be written to normal data log streams.
  • The `commit` log stream will be the same log stream for stream-level transaction,  while it will be a *system* stream (or multiple system streams) for namespace-level transactions.
  • The transaction code looks like as below:

    Transaction txn = client.transaction();
    Future<DLSN> result1 = txn.write(stream-0, record);
    Future<DLSN> result2 = txn.write(stream-1, record);
    Future<DLSN> result3 = txn.write(stream-2, record);
    Future<Pair<DLSN, DLSN>> result = txn.commit();
if the txn is committed, all the write futures will be satisfied with their written DLSNs. if the txn is aborted, all the write futures will be failed together. there is no partial failure state.

Write Flow

  1. writer get a transaction id from the owner of the `commit' log stream
  2. write the begin control record (synchronously) with the transaction id
  3. for each write within the same txn, it will be assigned a local sequence number starting from 0. the combination of transaction id and local sequence number will be used later on by the readers to de-duplicate records.
  4. the commit/abort control record will be written based on the results from 2.
NOTE:
  • Application can supply a timeout for the transaction when #begin() a transaction. The owner of the `commit` log stream can abort transactions that never be committed/aborted within their timeout.
  • All the log records can be simply retried as they will be de-duplicated probably at the reader side.

Read Flow

  1. Reader can be configured to read uncommitted records or committed records only (by default read uncommitted records)
  2. If reader is configured to read committed records only, the read ahead cache will be changed to maintain one additional pending committed records. the pending committed records map is bounded and records will be dropped when read ahead is moving.
  3. when the reader hits a commit record, it will rewind to the begin record and start reading from there. leveraging the proper read ahead cache and pending commit records cache, it would be good for both short transactions and long transactions.

Sequence Numbers

  1. We will add a fourth field to DLSN. It is `local sequence number` within a transaction session. So the new DLSN of records in a transaction will be the DLSN of commit control record plus its local sequence number.
  2. The sequence id will be still the position of the commit record plus its local sequence number. The position will be advanced with total number of written records on writing the commit control record.

Transaction Group & Namespace Transaction

using one single log stream for namespace transaction can cause the bottleneck problem since all the begin/commit/end control records will have to go through one log stream.
the idea of 'transaction group' is to allow partitioning the writers into different transaction groups.
clients can specify the `group-name` when starting the transaction. if there is no `group-name` specified, it will use the default `commit` log in the namespace for creating transactions.

Compatibility, Deprecation, and Migration Plan

TBD

Test Plan

TBD

Rejected Alternatives

TBD

  • No labels