Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the whole process of streaming data processing, there are mainly the following 4 How 5 Hows problems(general streaming and batch ETL):

...

Multiple ETL jobs and tables in Table Store form a topology. For example, there are dependencies between Table1, Table2, and Table3. Currently, users can't get relationship information between these tables, and when data on a table is late, it is difficult to trace the root cause based on job dependencies.

  • HOW to

...

  • share state between multiple jobs to save resources?

Multiple ETL jobs may have the same state data in different computation. For example, there're two JOIN ETL between three tables as follows

draw.io Diagram
600
bordertrue
diagramNamefigure2join
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth681521
revision1
As shown above, Flink ETL jobs will generate V11, V21, V31 in Table1,2,3 respectively for V1 in CDC. Suppose the following case: there's a base table in database, a user creates cascaded views or materialized views Table1, Table2 and Table3 based on it. Then the user executes complex queries on Table1, Table2, and Table3 and the query results are consistent with the base table. When the user creates the three tables in Table Store and incrementally updates data of them by Flink ETL jobs in real-time, these tables can be regarded as materialized views that are incrementally updated in real time. In the process of streaming data processing, how to define data correctness in query when users perform join query on these three tables? The case in point is how to ensure V11, V21 and V31 are read or not read in one query?

  • HOW to define E2E data processing delay of ETL jobs and tables in topology above?

Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?

  • HOW to revise the data in tables updated by streaming job?

When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?

In order to answer the above questions, we introduce Timestamp Barrier in Flink to align data, introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.

Proposed Design

Architecture

...

2

The data in State1 and Table1 are same, State2 , State3 and Table2  are same, State3 and Table3 are same.The more times the same table is joined, the more serious the resource waste is. We should reuse the data in the state and save resources.

  • HOW to define the data correctness in query?

draw.io Diagram
bordertrue
diagramNamefigure2
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth681
revision1

As shown above, Flink ETL jobs will generate V11, V21, V31 in Table1,2,3 respectively for V1 in CDC. Suppose the following case: there's a base table in database, a user creates cascaded views or materialized views Table1, Table2 and Table3 based on it. Then the user executes complex queries on Table1, Table2, and Table3 and the query results are consistent with the base table. When the user creates the three tables in Table Store and incrementally updates data of them by Flink ETL jobs in real-time, these tables can be regarded as materialized views that are incrementally updated in real time. In the process of streaming data processing, how to define data correctness in query when users perform join query on these three tables? The case in point is how to ensure V11, V21 and V31 are read or not read in one query?

  • HOW to define E2E data processing delay of ETL jobs and tables in topology above?

Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?

  • HOW to revise the data in tables updated by streaming job?

When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?

In order to answer the above questions, we introduce Timestamp Barrier in Flink to align data, introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.

Proposed Design

Architecture


We can regard each Flink ETL job as a single node with complex computation, and the table in Table Store as a data stream. Flink ETL and tables form a huge streaming job, which we call ETL Topology. We setup a MetaService node to manage the ETL Topology. The main architecture is:

draw.io Diagram
bordertrue
diagramNamefigure3
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth1081
revision3

There are two core points in the architecture: Timestamp Barrier Mechanism and MetaService 

  • Timestamp Barrier Mechanism

We need a barrier mechanism in Flink to guarantee the data consistency.

  1. Each ETL source needs to be assigned a unified Timestamp Barrier
  2. Stateful and temporal operators in Flink ETL job align and compute data according to the barrier.
  3. Sink operator in ETL job confirms data with barrier to sink tables in Table Store. 
  • MetaService component

MetaService  is the coordinator in ETL Topology , its capabilities are as followed:

1> Coordinate the Global Timestamp Barrier in ETL Topology

  1. As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global Timestamp Barrier .
  2. Timestamp Barrier  is transmitted between ETL job nodes by tables, then these job nodes can create  globally consistency snapshots in Table Store according to the barrier.

2> Manage dependencies between ETL jobs and tables

  1. MetaService manages the relationship between ETL jobs and tables in ETL Topology. Users can query these dependencies from MetaServices.
  2. MetaService manages Timestamp Barrier  in each ETL job, including barrier progress, completed barriers, etc.
  3. MetaService manages the Timestamp Barrier and snapshot of tables, including the latest completed snapshots, the relationship between barriers and snapshots.

3> Manage

...

There are two core points in the architecture: Timestamp Barrier Mechanism and MetaService 

  • Timestamp Barrier Mechanism

We need a barrier mechanism in Flink to guarantee the data consistency.

  1. Each ETL source needs to be assigned a unified Timestamp Barrier
  2. Stateful and temporal operators in Flink ETL job align and compute data according to the barrier.
  3. Sink operator in ETL job confirms data with barrier to sink tables in Table Store. 
  • MetaService component

MetaService  is the coordinator in ETL Topology , its capabilities are as followed:

1> Coordinate the Global Timestamp Barrier in ETL Topology

  1. As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global Timestamp Barrier .
  2. Timestamp Barrier  is transmitted between ETL job nodes by tables, then these job nodes can create  globally consistency snapshots in Table Store according to the barrier.

2> Manage dependencies between ETL jobs and tables

  1. MetaService manages the relationship between ETL jobs and tables in ETL Topology. Users can query these dependencies from MetaServices.
  2. MetaService manages Timestamp Barrier  in each ETL job, including barrier progress, completed barriers, etc.
  3. MetaService manages the Timestamp Barrier and snapshot of tables, including the latest completed snapshots, the relationship between barriers and snapshots.

3> Manage Data Consistency in Query On Table Store

...

Code Block
languagesql
-- create a catalog with MetaService
CREATE CATALOG my_catalog WITH (
 'type'='table-store',
 'warehouse'='file:/tmp/table_store',
 'metastore' = 'table-store',
 'uri'='http://<meta-service-host-name>:<port>',
 'consistency'='strongReadCommitted' );

USE CATALOG my_catalog;

-- create three user shopping tables in my_catalog which will be managed by MetaService
CREATE TABLE word_valueshopping (
 worduserId STRINGBIGINT,
 PRIMARYitemId KEYBIGINT,
 NOTamount ENFORCEDBIGINT,
 valprice BIGINTDOUBLE );

CREATE TABLE worduser_item_countamount (
 worduserId STRINGBIGINT,
 PRIMARY KEY NOT ENFORCEDitemId BIGINT,
 cnttotalAmount BIGINT );

CREATE TABLE worduser_item_sumprice (
 worduserId STRINGBIGINT,
 PRIMARY KEY NOT ENFORCEDitemId BIGINT,
 val_sumtotalPrice BIGINTDOUBLE );

Users can create a source table and three streaming jobs. The jobs write data to the three tables.

Code Block
languagesql
-- create a wordshopping data generator table
CREATE TEMPORARY TABLE wordshopping_tablesource (
 worduserId STRINGBIGINT,
 valitemId BIGINT,
 amount BIGINT,
 price DOUBLE ) WITH (
 'connector' = 'datagen',
 'fields.word.length' = '14');

-- table store requires checkpoint interval in streaming mode 
SET 'execution.checkpointing.interval' = '10 s'; 

-- write streaming data to word_valueshopping, worduser_item_countamount and worduser_item_sumprice tables 
INSERT INTO word_valueshopping SELECT word userId, itemId, amount, valprice FROM wordshopping_tablesource;
INSERT INTO worduser_item_countamount SELECT word user_id, item_id, countsum(*amount) FROM word_valueshopping GROUP BY word user_id, item_id;
INSERT INTO worduser_sumitem_price SELECT worduser_id, item_id, sum(valprice) FROM word_valueshopping GROUP BY word;user_id, item_id;

The ETL Topology  is as followed

draw.io Diagram
bordertrue
diagramNamefigure4
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth621
revision4

Users can query data from the three tables.

Code Block
languagesql
-- use tableau result mode 
SET 'sql-client.execution.result-mode' = 'tableau'; 

-- switch to batch mode 
RESET 'execution.checkpointing.interval'; 
SET 'execution.runtime-mode' = 'batch'; 

-- olap query the table 
SELECT
   T1.worduserId,
   T1.cnt as t1cnt,itemId
   T1.sum_valtotalAmount as t1sum_valamount,
   T2.cnttotalPrice as t2cntprice,
   T3T2.sum_valtotalPrice as t3sum_val
 FROM
 (SELECT word, count(*) as cnt, sum(val) as sum_val
   FROM word_value GROUP BY word) / T1.totalAmount as avgPrice
 FROM user_item_amount T1
 JOIN worduser_item_countprice T2
 JOIN word_sum T3
 ON T1.wordON T1.userId=T2.worduserId and T2T1.worditemId=T3.worditemId;

Since the data between jobs and tables is streaming, the results t1cnt and t2cnt, t1sum_val and t3sum_val are different without consistency guarantee amount, price and avgPrice are not correct; while MetaService guarantees data consistency, the results t1cnt and t2cnt, t1sum_val and t3sum_val will be the same amount, price and avgPrice will be correct.

Query consistency information

...

Code Block
languagesql
SELECT T.table_name
 FROM __META_JOB_SOURCE S
 JOIN __META_JOB_Sink T ON S.job_id=T.job_id
 WHERE S.table_name='Table1'

Data Consistency Type

...

Timestamp Barrier divides unbounded streaming data in ETL Topology into multiple bounded data set, each bounded data set can be seen as a big transaction in streaming processing. The key points of transaction are as follows

  1. If the records in a "epoch" (Timestamp Barrier) are finished writing to a table, we call the transaction is PROCESSED in the table.
  2. If the table creates a snapshot for the records in a "epoch", we call the transaction is WRITTEN in the table.
  3. If a transaction is PROCESSED in all tables, we call the transaction is PRECOMMIT
  4. If a transaction is WRITTEN in all tables, we call the transaction is COMMIT

When job fails, the records is not WRITTEN in a table will be "rolled back". Same as the above example, suppose the data in the tables are as follows

  1. user_item_amount: (user1, item1, 100)
  2. user_item_price: (user1, item1, 1000)
  3. shopping: (user1, item1, 200, 1500) with Timestamp Barrier  T is processing by ETL jobs.

User performs query SELECT userId, itemId, totalPrice, totalAmount, totalPrice / totalAmount as avgPrice FROM UserItemAmount a JOIN UserItemPrice p ON a.userId=p.userId and a.itemId=p.itemId on user_item_amount and user_item_price multiple times.

According to the characteristics of transaction, the following data consistency can be supported

  • Read Uncommitted

Read Uncommitted refers to querying table data of uncommitted transactions. When some tables in a transaction have PROCESSED data, the remaining tables are not PROCESSED, and the transaction will not been PRECOMMIT yet. For example

  1. The data is PROCESSED in user_item_price: (user1, item1, 2500).
  2. The data is not PROCESSED in user_item_amount: (user1, item1, 100).
  3. The result of user's query will be (user1, item1, 2500, 100, 25) which is not a consistency result.
  • Read Committed

Read Committed refers to querying table data of PRECOMMIT transactions only. When a transaction is PRECOMMIT, data in all tables are PROCESSED. Then the query can read the consistency data according to specific transaction . For example

  1. The transaction T is not PRECOMMIT, the query result is (user1, item1, 1000, 100, 10)
  2. The transaction T has been PRECOMMIT, the query result is (user1, item1, 2500, 300, 8.33333)

Read Committed doesn't support Repeatable Read , which means when jobs fail after transaction T is PRECOMMIT, the data in tables will be rolled back and the query result will fallback from (user1, item1, 2500, 300, 8.33333) to (user1, item1, 1000, 100, 10)

  • Repeatable Read

Repeatable Read only reads data is WRITTEN in tables. The snapshots in a table won't be rolled back even when jobs fail. For example

  1. Transaction T has been PROCESSED, but the related snapshots in tables are not created, the query result is (user1, item1, 1000, 100, 10)
  2. When the related snapshots in tables have been created, the query result is (user1, item1, 2500, 300, 8.33333)
  3. Snapshots in a persistent storage won't be rolled back even when jobs fail, and the query result will always be (user1, item1, 2500, 300, 8.33333), it's Repeatable Read 

If Repeatable Read only reads data of a COMMIT transaction, the data will be consistency; otherwise, the data in a query will be in different transaction.

Reuse Data In State

After align data with Timestamp Barrier , join operators in jobs can keep Delta State in memory and join data in shared state as follows

draw.io Diagram
bordertrue
diagramNameJOIN_DELTA
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth818
revision2

Each join operator only need to store delta state with timestamp in memory, and when it needs to join data, it can get the total data from shared state and delta state

  1. Query1:SELECT * FROM table1

  2. Query2:SELECT * FROM table1 JOIN table2

  3. Query3:SELECT * FROM table1 JOIN table2 JOIN table3

  • Strong Consistency

It will guarantee strong data consistency among queries above. Query gets the minimum version of all the related tables according to the source tables and the dependencies between them, which ensure data consistency between related tables. For the examples above, Query1, Query2 and Query3 will get Min(table1 version, table2 version) for table1 and table2, Min(table3 version) for table3.

  • Weak Consistency

It doesn't guarantee the data consistency among queries above, but only the data consistency of a single query. At this time, each query can get its latest version of tables, this ensures better data freshness.

Design of Data Consistency

...

  • Stateless operator. The operator completely ignore the timestamp barrier, processes every input record and output the result which it just does before. It does not need to align data with . But it should collect all Timestamp Barrier , and when it receives Timestamp Barrier , it should broadcast the barrier to downstream tasks. 
  • Stateful operator and temporal operator. Records in a same Timestamp Barrier  are out of order, stateful and temporal operators should align them according to their timestamp field. The operators will execute computation when they collect all the timestamp barrier, and broadcast it downstream tasks. There's a sequence relationship between timestamp barriers, and records between timestamp barriers are ordered/Temporal operator, should either
    • If the business doesn't require ordering, it could process the records immediately as before
    • If the business requires ordering, it buffers the records internally like current windowed/temporal operator are doing. Records in each "epoch" (as demarcated by timestamp barriers) will be processed after the previous "epoch" is finished, just like pre-aggregate. It means that the operators compute and output results for a timestamp barrier based on the result of a previous timestamp barrier.
  • Sink operator. Sink streaming output results to Table Store , and commit the results when it collect all the timestamp barrier. The source of downstream ETL job can prefetch data from Table Store , but should produce data after the upstream sink committed.
    • If the external system requires ordered writes (something like Kafka topic or append only store), the sinks would have to buffer the writes until a "timestamp barrier" arrives
    • For sinks which might support writing the data simultaneously to different "epochs". For example writing files bucketed by each epoch. Each bucket/epoch could be committed independently

2. Timestamp Barrier across ETL jobs

...

  1. JobManager reads and manages snapshot and timestamp barrier from Table Store , when it collect collects all the timestamp barrier of table, it sends the barrier to source subtasks.

  2. Source Node processes splits of snapshots. When it receives timestamp barrier from JobManager, it broadcasts timestamp barrier after finishing specified splits.

...

The main work in Timestamp Barrier  and the differences between Timestamp Barrier and existing Watermark in Flink are in the following table.

Modules

Timestamp Barrier

Watermark

Generation

JobManager must coordinate all source subtasks and generate a unified timestamp barrier from System Time or Event Time for them

Each source subtask generate timestamp barrier(watermark event) from System Time or Event Time

Checkpoint

Store <checkpoint, timestamp barrier> when the timestamp barrier is generated, so that the job can recover the same timestamp barrier for the uncompleted checkpoint.

None

Replay data

Store <timestamp barrier, offset> for source when it broadcast broadcasts timestamp barrier, so that the source can replay the same data according to the same timestamp barrier.

None

Align data

Align data for stateful operator(aggregation, join and etc.) and temporal operator(window)

Align data for temporal operator(window)

Computation

Operator compute computation for a specific timestamp barrier based on the results of a previous timestamp barrier.

Window operator only computes results in the window range.

Output

Operator output outputs or commit commits results when it collect collects all the timestamp barrierbarriers, including operators with data buffer or async operations.

Window operator support "emit" output

...

The main work in Flink and Table Store are as followed

Table Store

Add a listener and call back it when the job ends

ComponentMain Work


MetaService
  1. Manage the relationship between ETL job and table in Table Store , including source table, sink table.
  2. Manage the finished timestamp barrier of each table in Table Store
  3. Interaction between Flink and MetaService, such as register ETL job, get consistency version of table and ect.



Table Store

Catalog

  1. Register source and sink table with ETL job id.
  2. Create table based on a consistency version from MetaService

Source and SplitEnumerator
  1. SplitEnumerator managers the snapshot, split and timestamp barrier for specific table.
  2. Source read data and timestamp barrier from split, broadcast timestamp barrier
  3. Notify MetaService to update the completed timestamp barrier for tables.
  4. Notify MetaService to cleanup the information of the terminated ETL job.
Sink
  1. Write data to table store and commit data with timestamp barrier



Flink

Timestamp Barrier MechanismThe detailed and main work is in the above table
Planner
  1. Register job to MetaService to create relationship between source and sink tables.
  2. Create table based on a consistency version from MetaService 
JobManager
JobManager
  1. Add a listener and call back it when the job ends


Improvement
  1. Delta Join in Flink to improve state resource
  2. Storage & Computation Decouple in Table Store

Constraint

The current FLIP design has two constraints and it may continue to improve in the future

  1. Multiple jobs are not supported to write to the same table concurrently
  2. ETL topology does not support cycles

MetaService needs to detect these situations and report errors when ETL jobs are registered.

The Next Step

This is an overall FLIP for data consistency in streaming and batch ETL. Next, we would like to create FLIP for each functional module with detailed design. For example:

  1. Timestamp Barrier Coordination and Generation
  2. Timestamp Barrier Checkpoint and Recovery
  3. Timestamp Barrier Replay Data Implementation
  4. Timestamp Barrier Alignment and Computation In Operator
  5. Introduce Delta Join in Flink To Improve State Resource
  6. Introduce MetaService module and implement source/sink in Table Store and etc
  7. Job and Table management in MetaService such as exception handling, data revision and etc

Rejected Alternatives

Data consistency management

...


Data consistency of ETL Topology is our first phase of work. After completing this part, we plan to promote the capacity building and improvement of Flink + Table Store in future, mainly including the following aspects.in future, mainly including the following aspects.

  1. Support data consistency semantics. As mentioned above, we need to implement "Timestamp Barrier" to support full semantics data consistency. 
  2. Materialized View in SQL. Next, we hope to introduce materialized view syntax into Flink to improve user interaction experience. Queries can also be optimized based on materialized views to improve performance.

  3. Improve MetaService capabilities. ManagerService is a single point in the system, and it should supports failover. In the other way, MetaService supports managing Flink ETL jobs and tables in Table Store, accessed by other computing engines such as Spark and being an agent of Hive Metastore later.

    Improve data consistency semantics. As mentioned above, we need to implement "Timestamp Barrier" to support full semantics data consistency instead of "Aligned Checkpoint" in the first stage

    .

     

  4. Improve OLAP performance. We have created issues in FLINK-25318] Improvement of scheduler and execution for Flink OLAP to manage improvement of OLAP in Flink. At the same time, we hope to continue to enhance the online query capability of Table Store and improve the OLAP performance of Flink + Table Store.

  5. Improvement of data real-time. At present, our consistency design is based on Flink checkpoint mechanism and supports minute level delay. In the future, we hope we hope to support second level or even millisecond level data real-time on the premise of ensuring data consistency, which requires continuous optimization in computing and storage.

By promoting the above optimization and implementation, we hope that Flink + Table Store can support the full StreamingWarehouse capability. Users can create materialized views and execute OLAP queries in the system, just like using databases and data warehouses, and output data to the application layer (such as KV) as required.