...
Code Block | ||
---|---|---|
| ||
-- create a catalog with MetaService CREATE CATALOG my_catalog WITH ( 'type'='table-store', 'warehouse'='file:/tmp/table_store', 'metastore' = 'table-store', 'uri'='http://<meta-service-host-name>:<port>', 'consistency'='strongReadCommitted' ); USE CATALOG my_catalog; -- create three user shopping tables in my_catalog which will be managed by MetaService CREATE TABLE word_valueshopping ( worduserId STRINGBIGINT, PRIMARYitemId KEYBIGINT, NOTamount ENFORCEDBIGINT, valprice BIGINTDOUBLE ); CREATE TABLE worduser_item_countamount ( worduserId STRINGBIGINT, PRIMARY KEY NOT ENFORCEDitemId BIGINT, cnttotalAmount BIGINT ); CREATE TABLE worduser_item_sumprice ( worduserId STRINGBIGINT, PRIMARY KEY NOT ENFORCEDitemId BIGINT, val_sumtotalPrice BIGINTDOUBLE ); |
Users can create a source table and three streaming jobs. The jobs write data to the three tables.
Code Block | ||
---|---|---|
| ||
-- create a wordshopping data generator table CREATE TEMPORARY TABLE wordshopping_tablesource ( word STRING userId BIGINT, itemId BIGINT, valamount BIGINT, price DOUBLE ) WITH ( 'connector' = 'datagen', 'fields.word.length' = '14'); -- table store requires checkpoint interval in streaming mode SET 'execution.checkpointing.interval' = '10 s'; -- write streaming data to word_valueshopping, worduser_item_countamount and worduser_item_sumprice tables INSERT INTO word_valueshopping SELECT word userId, itemId, amount, valprice FROM wordshopping_tablesource; INSERT INTO worduser_item_countamount SELECT worduser_id, item_id, countsum(*amount) FROM word_valueshopping GROUP BY worduser_id, item_id; INSERT INTO worduser_item_sumprice SELECT word user_id, item_id, sum(valprice) FROM word_valueshopping GROUP BY word; user_id, item_id; |
The ETL Topology
is as followed
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Users can query data from the three tables.
Code Block | ||
---|---|---|
| ||
-- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau'; -- switch to batch mode RESET 'execution.checkpointing.interval'; SET 'execution.runtime-mode' = 'batch'; -- olap query the table SELECT T1.worduserId, T1.cnt as t1cnt,itemId T1.sum_valtotalAmount as t1sum_valamount, T2.cnttotalPrice as t2cntprice, T3T2.sum_valtotalPrice as t3sum_val FROM (SELECT word, count(*) as cnt, sum(val) as sum_val FROM word_value GROUP BY word)/ T1.totalAmount as avgPrice FROM user_item_amount T1 JOIN worduser_item_countprice T2 JOIN word_sum T3 ON T1.worduserId=T2.worduserId and T2T1.worditemId=T3.worditemId; |
Since the data between jobs and tables is streaming, the results t1cnt and t2cnt, t1sum_val and t3sum_val are different without consistency guarantee amount, price and avgPrice are not correct; while MetaService
guarantees data consistency, the results t1cnt and t2cnt, t1sum_val and t3sum_val will be the same amount, price and avgPrice will be correct.
Query consistency information
...
Code Block | ||
---|---|---|
| ||
SELECT T.table_name FROM __META_JOB_SOURCE S JOIN __META_JOB_Sink T ON S.job_id=T.job_id WHERE S.table_name='Table1' |
Data Consistency Type
...
Query1:SELECT * FROM table1
Query2:SELECT * FROM table1 JOIN table2
Query3:SELECT * FROM table1 JOIN table2 JOIN table3
Strong Consistency
It will guarantee strong data consistency among queries above. Query gets the minimum version of all the related tables according to the source tables and the dependencies between them, which ensure data consistency between related tables. For the examples above, Query1, Query2 and Query3 will get Min(table1 version, table2 version) for table1 and table2, Min(table3 version) for table3.
Weak Consistency
...
Timestamp Barrier
divides unbounded streaming data in ETL Topology
into multiple bounded data set, each bounded data set can be seen as a big transaction
in streaming processing. Transaction
in streaming processing has following characteristics
- Each
transaction
consists of multiple operations in tables, each sink operation commits data to table accordingTimestamp Barrier. The
transaction
will be committed after all the operations are committed. - There is a sequential relationship between multiple transactions in processing data. They commit data to the same table serially.
- There're three states in a table for specific
transaction
: PreCommit, Commit and Snapshot- PreCommit: Sink has committed data to table according to
Timestamp Barrier
, but the relatedtransaction
is processing and not committed. The committed data in the table may be rolled back if the job fails. - Commit: The
transaction
related to a specificTimestamp Barrier
is committed, and the data in tables may be rolled back if jobs fail. - Snapshot: The
transaction
related to a specificTimestamp Barrier
is committed and all the tables generate snapshots for thetransaction
. The data in the tables won't be rolled back even when jobs fail.
- PreCommit: Sink has committed data to table according to
Same as the above example, suppose the data in the tables are as follows
- user_item_amount: (user1, item1, 100)
- user_item_price: (user1, item1, 1000)
- shopping: (user1, item1, 200, 1500) with
Timestamp Barrier
T is processing by ETL jobs.
User performs query SELECT userId, itemId, totalPrice, totalAmount, totalPrice / totalAmount as avgPrice FROM UserItemAmount a JOIN UserItemPrice p ON a.userId=p.userId and a.itemId=p.itemId
on tables user_item_amount and user_item_price multiple times.
According to the characteristics of transaction, the following data consistency can be supported
- Read Uncommitted
Read Uncommitted refers to querying table data of uncommitted transactions. When some tables in a transaction have committed data according to the Timestamp Barrier
and can be read by query, but the remaining tables have not been committed, the transaction has not been committed. For example
- The committed data in user_item_price are (user1, item1, 2500).
- The uncommitted data in user_item_amount are (user1, item1, 100).
- The result of user's query will be (user1, item1, 2500, 100, 25) which is not a consistency result.
- Read Committed
Read Committed
refers to querying table data of committed transactions only, it is default consistency in MetaService
. When a transaction
is committed, data in all tables are committed. Then the query can read the consistency data according to specific transaction
. For example
- The transaction T is not committed, the query result is (user1, item1, 1000, 100, 10)
- The transaction T has been committed, the query result is (user1, item1, 2500, 300, 8.33333)
Read Committed
doesn't support Repeatable Read
, which means when jobs fail, the data in tables will be rolled back and the query result will be rolled back from (user1, item1, 2500, 300, 8.33333) to (user1, item1, 1000, 100, 10)
- Repeatable Read
Repeatable Read
only reads data of a specific transaction
from snapshot. The snapshots in a table won't be rolled back even when job fails, query can get a committed transaction from snapshots of tables. For example
- Transaction T has been committed, but the related snapshots in tables are not created, the query result is (user1, item1, 1000, 100, 10)
- When the related snapshots in tables have been created, the query result is (user1, item1, 2500, 300, 8.33333)
- Snapshots in a persistent storage won't be rolled back even when jobs fail, and the query result will always be (user1, item1, 2500, 300, 8.33333), it's
Repeatable Read
Design of Data Consistency
...