Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Timestamp Barrier divides unbounded streaming data in ETL Topology into multiple bounded data set, each bounded data set can be seen as a big transaction in streaming processing. Transaction in streaming processing has following characteristics

  1. Each transaction consists of multiple operations and tables, each sink operation commits data to table according Timestamp Barrier. The transaction will be committed after all the operations are committed.
  2. There is a sequential relationship between multiple transactions in processing data, they commit data to the same table serially.
  3. There're three states in a table for specific transaction : PreCommit, Commit and Snapshot
    1. PreCommit: Sink has committed data to table according to Timestamp Barrier , but the related transaction is processing and not committed. The committed data in the table may be rolled back if the job fails.
    2. Commit: The transaction related to a specific Timestamp Barrier is committed, and the data in tables may be rolled back if jobs fail.
    3. Snapshot: The transaction related to a specific Timestamp Barrier is committed and all the tables create snapshots for the transaction . The data in the tables won't be rolled back even when jobs fail.

The key points of transaction are as follows

  1. If the records in a "epoch" (Timestamp Barrier) are finished writing to a table, we call the transaction is PROCESSED in the table.
  2. If the table creates a snapshot for the records in a "epoch", we call the transaction is WRITTEN in the table.
  3. If a transaction is PROCESSED in all tables, we call the transaction is PRECOMMIT
  4. If a transaction is WRITTEN in all tables, we call the transaction is COMMIT

When job fails, the records is not WRITTEN in a table will be "rolled back". Same as the above example, suppose the data in the tables are as follows

...

Read Uncommitted refers to querying table data of uncommitted transactions. When some tables in a transaction have committed data according to the Timestamp Barrier  and can be read by queryPROCESSED data, the remaining tables have are not been committedPROCESSED, and the transaction will not been committed PRECOMMIT yet. For example

  1. The committed data is PROCESSED in user_item_price are : (user1, item1, 2500).
  2. The uncommitted data is not PROCESSED in user_item_amount are : (user1, item1, 100).
  3. The result of user's query will be (user1, item1, 2500, 100, 25) which is not a consistency result.

...

Read Committed refers to querying table data of committed PRECOMMIT transactions only, it is default consistency in MetaService . When a transaction is committedPRECOMMIT, data in all tables are committedPROCESSED. Then the query can read the consistency data according to specific transaction . For example

  1. The transaction T is not committedPRECOMMIT, the query result is (user1, item1, 1000, 100, 10)
  2. The transaction T has been committedPRECOMMIT, the query result is (user1, item1, 2500, 300, 8.33333)

Read Committed doesn't support Repeatable Read , which means when jobs fail after transaction T is committedPRECOMMIT, the data in tables will be rolled back and the query result will fallback from (user1, item1, 2500, 300, 8.33333) to (user1, item1, 1000, 100, 10)

...

Repeatable Read only reads data of a specific transaction from snapshotis WRITTEN in tables. The snapshots in a table won't be rolled back even when jobs fail, and query can get a committed transaction from snapshots of tables. For example

  1. Transaction T has been committedPROCESSED, but the related snapshots in tables are not created, the query result is (user1, item1, 1000, 100, 10)
  2. When the related snapshots in tables have been created, the query result is (user1, item1, 2500, 300, 8.33333)
  3. Snapshots in a persistent storage won't be rolled back even when jobs fail, and the query result will always be (user1, item1, 2500, 300, 8.33333), it's Repeatable Read 

If Repeatable Read only reads data of a COMMIT transaction, the data will be consistency; otherwise, the data in a query will be in different transaction.

Reuse Data In State

After align data with Timestamp Barrier , join operators in jobs can keep Delta State in memory and join data in shared state as follows

...

  • Stateless operator. The operator processes every completely ignore the timestamp barrier, processes every input record and output the result which it just does before. It does not need to align data with But it should collect all Timestamp Barrier , and when it receives Timestamp Barrier , it should broadcast the barrier to downstream tasks. 
  • Stateful operator and temporal operator. Records in a same Timestamp Barrier  are out of order, stateful and temporal operators should align them according to their timestamp field. The operators will execute computation when they collect all the timestamp barrier, and broadcast it downstream tasks. There's a sequence relationship between timestamp barriers, and records between timestamp barriers are ordered/Temporal operator, should either
    • If the business doesn't require ordering, it could process the records immediately as before
    • If the business requires ordering, it buffers the records internally like current windowed/temporal operator are doing. Records in each "epoch" (as demarcated by timestamp barriers) will be processed after the previous "epoch" is finished, just like pre-aggregate. It means that the operators compute and output results for a timestamp barrier based on the result of a previous timestamp barrier.
  • Sink operator. Sink streaming output results to Table Store , and commit the results when it collects all the timestamp barrier. The source of downstream ETL job can prefetch data from Table Store , but should produce data after the upstream sink committed.
    • If the external system requires ordered writes (something like Kafka topic or append only store), the sinks would have to buffer the writes until a "timestamp barrier" arrives
    • For sinks which might support writing the data simultaneously to different "epochs". For example writing files bucketed by each epoch. Each bucket/epoch could be committed independently

2. Timestamp Barrier across ETL jobs

...