Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagesql
-- create a catalog with MetaService
CREATE CATALOG my_catalog WITH (
 'type'='table-store',
 'warehouse'='file:/tmp/table_store',
 'metastore' = 'table-store',
 'uri'='http://<meta-service-host-name>:<port>',
 'consistency'='strongReadCommitted' );

USE CATALOG my_catalog;

-- create three user shopping tables in my_catalog which will be managed by MetaService
CREATE TABLE word_valueshopping (
 worduserId STRINGBIGINT,
 PRIMARYitemId KEYBIGINT,
 NOTamount ENFORCEDBIGINT,
 valprice BIGINTDOUBLE );

CREATE TABLE worduser_item_countamount (
 worduserId STRINGBIGINT,
 PRIMARY KEY NOT ENFORCEDitemId BIGINT,
 cnttotalAmount BIGINT );

CREATE TABLE worduser_item_sumprice (
 worduserId STRINGBIGINT,
 PRIMARY KEY NOT ENFORCEDitemId BIGINT,
 val_sumtotalPrice BIGINTDOUBLE );

Users can create a source table and three streaming jobs. The jobs write data to the three tables.

Code Block
languagesql
-- create a wordshopping data generator table
CREATE TEMPORARY TABLE wordshopping_tablesource (
 word STRING userId BIGINT,
 itemId BIGINT,
 valamount BIGINT,
 price DOUBLE ) WITH (
 'connector' = 'datagen',
 'fields.word.length' = '14');

-- table store requires checkpoint interval in streaming mode 
SET 'execution.checkpointing.interval' = '10 s'; 

-- write streaming data to word_valueshopping, worduser_item_countamount and worduser_item_sumprice tables 
INSERT INTO word_valueshopping SELECT word userId, itemId, amount, valprice FROM wordshopping_tablesource;
INSERT INTO worduser_item_countamount SELECT worduser_id, item_id, countsum(*amount) FROM word_valueshopping GROUP BY worduser_id, item_id;
INSERT INTO worduser_item_sumprice SELECT word user_id, item_id, sum(valprice) FROM word_valueshopping GROUP BY word; user_id, item_id;

The ETL Topology  is as followed

draw.io Diagram
bordertrue
diagramNamefigure4
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth621
revision4

Users can query data from the three tables.

Code Block
languagesql
-- use tableau result mode 
SET 'sql-client.execution.result-mode' = 'tableau'; 

-- switch to batch mode 
RESET 'execution.checkpointing.interval'; 
SET 'execution.runtime-mode' = 'batch'; 

-- olap query the table 
SELECT
   T1.worduserId,
   T1.cnt as t1cnt,itemId
   T1.sum_valtotalAmount as t1sum_valamount,
   T2.cnttotalPrice as t2cntprice,
   T3T2.sum_valtotalPrice as t3sum_val
 FROM
 (SELECT word, count(*) as cnt, sum(val) as sum_val
   FROM word_value GROUP BY word)/ T1.totalAmount as avgPrice
 FROM user_item_amount T1
 JOIN worduser_item_countprice T2
 JOIN word_sum T3
 ON T1.worduserId=T2.worduserId and T2T1.worditemId=T3.worditemId;

Since the data between jobs and tables is streaming, the results t1cnt and t2cnt, t1sum_val and t3sum_val are different without consistency guarantee amount, price and avgPrice are not correct; while MetaService guarantees data consistency, the results t1cnt and t2cnt, t1sum_val and t3sum_val will be the same amount, price and avgPrice will be correct.

Query consistency information

...

Code Block
languagesql
SELECT T.table_name
 FROM __META_JOB_SOURCE S
 JOIN __META_JOB_Sink T ON S.job_id=T.job_id
 WHERE S.table_name='Table1'

Data Consistency Type

...

  1. Query1:SELECT * FROM table1

  2. Query2:SELECT * FROM table1 JOIN table2

  3. Query3:SELECT * FROM table1 JOIN table2 JOIN table3

  • Strong Consistency

It will guarantee strong data consistency among queries above. Query gets the minimum version of all the related tables according to the source tables and the dependencies between them, which ensure data consistency between related tables. For the examples above, Query1, Query2 and Query3 will get Min(table1 version, table2 version) for table1 and table2, Min(table3 version) for table3.

  • Weak Consistency

...

Timestamp Barrier divides unbounded streaming data in ETL Topology into multiple bounded data set, each bounded data set can be seen as a big transaction in streaming processing. Transaction in streaming processing has following characteristics

  1. Each transaction consists of multiple operations in tables, each sink operation commits data to table according Timestamp Barrier. The transaction will be committed after all the operations are committed.
  2. There is a sequential relationship between multiple transactions in processing data. They commit data to the same table serially.
  3. There're three states in a table for specific transaction : PreCommit, Commit and Snapshot
    1. PreCommit: Sink has committed data to table according to Timestamp Barrier , but the related transaction is processing and not committed. The committed data in the table may be rolled back if the job fails.
    2. Commit: The transaction related to a specific Timestamp Barrier is committed, and the data in tables may be rolled back if jobs fail.
    3. Snapshot: The transaction related to a specific Timestamp Barrier is committed and all the tables generate snapshots for the transaction . The data in the tables won't be rolled back even when jobs fail.

Same as the above example, suppose the data in the tables are as follows

  1. user_item_amount: (user1, item1, 100)
  2. user_item_price: (user1, item1, 1000)
  3. shopping: (user1, item1, 200, 1500) with Timestamp Barrier  T is processing by ETL jobs.

User performs query SELECT userId, itemId, totalPrice, totalAmount, totalPrice / totalAmount as avgPrice FROM UserItemAmount a JOIN UserItemPrice p ON a.userId=p.userId and a.itemId=p.itemId on tables user_item_amount and user_item_price multiple times.

According to the characteristics of transaction, the following data consistency can be supported

  • Read Uncommitted

Read Uncommitted refers to querying table data of uncommitted transactions. When some tables in a transaction have committed data according to the Timestamp Barrier  and can be read by query, but the remaining tables have not been committed, the transaction has not been committed. For example

  1. The committed data in user_item_price are (user1, item1, 2500).
  2. The uncommitted data in user_item_amount are (user1, item1, 100).
  3. The result of user's query will be (user1, item1, 2500, 100, 25) which is not a consistency result.
  • Read Committed

Read Committed refers to querying table data of committed transactions only, it is default consistency in MetaService . When a transaction is committed, data in all tables are committed. Then the query can read the consistency data according to specific transaction . For example

  1. The transaction T is not committed, the query result is (user1, item1, 1000, 100, 10)
  2. The transaction T has been committed, the query result is (user1, item1, 2500, 300, 8.33333)

Read Committed doesn't support Repeatable Read , which means when jobs fail, the data in tables will be rolled back and the query result will be rolled back from (user1, item1, 2500, 300, 8.33333) to (user1, item1, 1000, 100, 10)

  • Repeatable Read

Repeatable Read only reads data of a specific transaction from snapshot. The snapshots in a table won't be rolled back even when job fails, query can get a committed transaction from snapshots of tables. For example

  1. Transaction T has been committed, but the related snapshots in tables are not created, the query result is (user1, item1, 1000, 100, 10)
  2. When the related snapshots in tables have been created, the query result is (user1, item1, 2500, 300, 8.33333)
  3. Snapshots in a persistent storage won't be rolled back even when jobs fail, and the query result will always be (user1, item1, 2500, 300, 8.33333), it's Repeatable Read 

Design of Data Consistency

...