...
Discussion thread | https://lists.apache.org/thread/46pc7t6v8nd5zy8shhdzy6k774lnsxbg |
---|---|
Vote thread | |
JIRA | |
Release |
Table of Contents |
---|
Motivation
Table Store supports streaming and batch data processing, Flink ETL jobs can read data from and write data to Table Store in streaming and batch. Following is the architecture
...
In the whole process of streaming data processing, there are mainly the following 4 How 5 Hows problems(general streaming and batch ETL):
...
Multiple ETL jobs and tables in Table Store form a topology. For example, there are dependencies between Table1, Table2, and Table3. Currently, users can't get relationship information between these tables, and when data on a table is late, it is difficult to trace the root cause based on job dependencies.
- HOW to
...
- share state between multiple jobs to save resources?
Multiple ETL jobs may have the same state data in different computation. For example, there're two JOIN ETL between three tables as follows
draw.io Diagram | |||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
HOW to define E2E data processing delay of ETL jobs and tables in topology above?
Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?
HOW to revise the data in tables updated by streaming job?
When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?
In order to answer the above questions, we introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.
Proposed Design
Architecture
...
|
The data in State1
and Table1
are same, State2
, State3
and Table2
are same, State3
and Table3
are same.The more times the same table is joined, the more serious the resource waste is. We should reuse the data in the state and save resources.
HOW to define the data correctness in query?
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
As shown above, Flink ETL jobs will generate V11, V21, V31 in Table1,2,3 respectively for V1 in CDC. Suppose the following case: there's a base table in database, a user creates cascaded views or materialized views Table1, Table2 and Table3 based on it. Then the user executes complex queries on Table1, Table2, and Table3 and the query results are consistent with the base table. When the user creates the three tables in Table Store and incrementally updates data of them by Flink ETL jobs in real-time, these tables can be regarded as materialized views that are incrementally updated in real time. In the process of streaming data processing, how to define data correctness in query when users perform join query on these three tables? The case in point is how to ensure V11, V21 and V31 are read or not read in one query?
HOW to define E2E data processing delay of ETL jobs and tables in topology above?
Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?
HOW to revise the data in tables updated by streaming job?
When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?
In order to answer the above questions, we introduce Timestamp Barrier
in Flink to align data, introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.
Proposed Design
Architecture
We can regard each Flink ETL job as a single node with complex computation, and the table in Table Store as a data stream. Flink ETL and tables form a huge streaming job, which we call ETL Topology
. We setup a MetaService node to manage the ETL Topology
. The main architecture is:
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
There are two core points in the architecture: Timestamp Barrier Mechanism
and MetaService
Timestamp Barrier Mechanism
We need a barrier mechanism in Flink to guarantee the data consistency.
- Each ETL source needs to be assigned a unified
Timestamp Barrier
- Stateful and temporal operators in Flink ETL job align and compute data according to the barrier.
- Sink operator in ETL job confirms data with barrier to sink tables in Table Store.
MetaService
component
MetaService
is the coordinator in ETL Topology
, its capabilities are as followed:
1> Coordinate the Global Timestamp Barrier in ETL Topology
- As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global
Timestamp Barrier
. Timestamp Barrier
is transmitted between ETL job nodes by tables, then these job nodes can create globally consistency snapshots in Table Store according to the barrier.
2> Manage dependencies between ETL jobs and tables
- MetaService manages the relationship between ETL jobs and tables in
ETL Topology
. Users can query these dependencies from MetaServices. - MetaService manages
Timestamp Barrier
in each ETL job, including barrier progress, completed barriers, etc. - MetaService manages the
Timestamp Barrier
and snapshot of tables, including the latest completed snapshots, the relationship
...
There are two core points in the architecture: Timestamp Barrier Mechanism
and MetaService
Timestamp Barrier Mechanism
We need a barrier mechanism in Flink to guarantee the data consistency.
- Each ETL source needs to be assigned a unified
Timestamp Barrier
- Aggregate and temporal operators in Flink ETL job align input data according to the barrier.
- Sink operator in ETL job confirms data with barrier to sink tables in Table Store.
MetaService
component
MetaService
is the coordinator in ETL Topology
, its capabilities are as followed:
1> Coordinate the Global Timestamp Barrier in ETL Topology
- As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global
Timestamp Barrier
. Timestamp Barrier
is transmitted between ETL job nodes by tables, then these job nodes can create globally consistency snapshots in Table Store according to the barrier.
2> Manage dependencies between ETL jobs and tables
- MetaService manages the relationship between ETL jobs and tables in
ETL Topology
. Users can query these dependencies from MetaServices. - MetaService manages
Timestamp Barrier
in each ETL job, including barrier progress, finished barriers, etc. - MetaService manages the
Timestamp Barrier
and snapshot of tables, including the latest completed snapshots, the relationship between barriers and snapshots.
...
We add a new metastore type: table-store
, which manages the Catalog
and data consistency in Table Store. Users can create a Catalog with metastore table-store
in Sql-Client, and specify the address and consistency type by uri
and consistency-type
. Flink ETL job, which reads from and writes to Table Store will be managed by MetaService to ensure data consistency. In the first phasestage, table-store
metastore only supports FileSystemCatalog
and will support HiveCatalog
later. The user cases are shown as followed.
Code Block | ||
---|---|---|
| ||
-- create a catalog with MetaService CREATE CATALOG my_catalog WITH ( 'type'='table-store', 'warehouse'='file:/tmp/table_store', 'metastore' = 'table-store', 'uri'='http://<meta-service-host-name>:<port>', 'consistency'='strongReadCommitted' ); USE CATALOG my_catalog; -- create three user shopping tables in my_catalog which will be managed by MetaService CREATE TABLE word_value shopping ( worduserId STRINGBIGINT, PRIMARYitemId KEYBIGINT, NOTamount ENFORCEDBIGINT, valprice BIGINTDOUBLE ); CREATE TABLE worduser_item_countamount ( worduserId STRINGBIGINT, PRIMARY KEY NOT ENFORCEDitemId BIGINT, cnttotalAmount BIGINT ); CREATE TABLE worduser_item_sumprice ( worduserId STRINGBIGINT, PRIMARY KEY NOT ENFORCEDitemId BIGINT, val_sumtotalPrice BIGINTDOUBLE ); |
Users can create a source table and three streaming jobs. The jobs write data to the three tables.
Code Block | ||
---|---|---|
| ||
-- create a wordshopping data generator table CREATE TEMPORARY TABLE wordshopping_tablesource ( worduserId STRINGBIGINT, valitemId BIGINT, amount BIGINT, price DOUBLE ) WITH ( 'connector' = 'datagen', 'fields.word.length' = '14'); -- table store requires checkpoint interval in streaming mode SET 'execution.checkpointing.interval' = '10 s'; -- write streaming data to word_valueshopping, worduser_item_countamount and worduser_item_sumprice tables INSERT INTO word_valueshopping SELECT word userId, itemId, amount, valprice FROM wordshopping_tablesource; INSERT INTO worduser_item_countamount SELECT word user_id, item_id, countsum(*amount) FROM word_valueshopping GROUP BY word user_id, item_id; INSERT INTO worduser_item_sumprice SELECT worduser_id, item_id, sum(valprice) FROM word_valueshopping GROUP BY word;user_id, item_id; |
The ETL Topology
is as followed
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Users can query data from the three tables.
Code Block | ||
---|---|---|
| ||
-- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau'; -- switch to batch mode RESET 'execution.checkpointing.interval'; SET 'execution.runtime-mode' = 'batch'; -- olap query the table SELECT T1.worduserId, T1.cnt as t1cnt,itemId T1.sum_valtotalAmount as t1sum_valamount, T2.cnttotalPrice as t2cntprice, T3T2.sum_valtotalPrice as t3sum_val FROM (SELECT word, count(*) as cnt, sum(val) as sum_val FROM word_value GROUP BY word) / T1.totalAmount as avgPrice FROM user_item_amount T1 JOIN worduser_item_countprice T2 JOIN word_sum T3 ON T1.wordON T1.userId=T2.worduserId and T2T1.worditemId=T3.worditemId; |
Since the data between jobs and tables is streaming, the results t1cnt and t2cnt, t1sum_val and t3sum_val are different without consistency guarantee amount, price and avgPrice are not correct; while MetaService
guarantees data consistency, the results t1cnt and t2cnt, t1sum_val and t3sum_val will be the same amount, price and avgPrice will be correct.
Query consistency information
...
Code Block | ||
---|---|---|
| ||
SELECT T.table_name FROM __META_JOB_SOURCE S JOIN __META_JOB_Sink T ON S.job_id=T.job_id WHERE S.table_name='Table1' |
Data Consistency Type
...
Timestamp Barrier
divides unbounded streaming data in ETL Topology
into multiple bounded data set, each bounded data set can be seen as a big transaction
in streaming processing. The key points of transaction are as follows
- If the records in a "epoch" (Timestamp Barrier) are finished writing to a table, we call the transaction is PROCESSED in the table.
- If the table creates a snapshot for the records in a "epoch", we call the transaction is WRITTEN in the table.
- If a transaction is PROCESSED in all tables, we call the transaction is PRECOMMIT
- If a transaction is WRITTEN in all tables, we call the transaction is COMMIT
When job fails, the records is not WRITTEN in a table will be "rolled back". Same as the above example, suppose the data in the tables are as follows
- user_item_amount: (user1, item1, 100)
- user_item_price: (user1, item1, 1000)
- shopping: (user1, item1, 200, 1500) with
Timestamp Barrier
T is processing by ETL jobs.
User performs query SELECT userId, itemId, totalPrice, totalAmount, totalPrice / totalAmount as avgPrice FROM UserItemAmount a JOIN UserItemPrice p ON a.userId=p.userId and a.itemId=p.itemId
on user_item_amount and user_item_price multiple times.
According to the characteristics of transaction, the following data consistency can be supported
- Read Uncommitted
Read Uncommitted refers to querying table data of uncommitted transactions. When some tables in a transaction have PROCESSED data, the remaining tables are not PROCESSED, and the transaction will not been PRECOMMIT yet. For example
- The data is PROCESSED in user_item_price: (user1, item1, 2500).
- The data is not PROCESSED in user_item_amount: (user1, item1, 100).
- The result of user's query will be (user1, item1, 2500, 100, 25) which is not a consistency result.
- Read Committed
Read Committed
refers to querying table data of PRECOMMIT transactions only. When a transaction
is PRECOMMIT, data in all tables are PROCESSED. Then the query can read the consistency data according to specific transaction
. For example
- The transaction T is not PRECOMMIT, the query result is (user1, item1, 1000, 100, 10)
- The transaction T has been PRECOMMIT, the query result is (user1, item1, 2500, 300, 8.33333)
Read Committed
doesn't support Repeatable Read
, which means when jobs fail after transaction T is PRECOMMIT, the data in tables will be rolled back and the query result will fallback from (user1, item1, 2500, 300, 8.33333) to (user1, item1, 1000, 100, 10)
- Repeatable Read
Repeatable Read
only reads data is WRITTEN in tables. The snapshots in a table won't be rolled back even when jobs fail. For example
- Transaction T has been PROCESSED, but the related snapshots in tables are not created, the query result is (user1, item1, 1000, 100, 10)
- When the related snapshots in tables have been created, the query result is (user1, item1, 2500, 300, 8.33333)
- Snapshots in a persistent storage won't be rolled back even when jobs fail, and the query result will always be (user1, item1, 2500, 300, 8.33333), it's
Repeatable Read
If Repeatable Read
only reads data of a COMMIT transaction, the data will be consistency; otherwise, the data in a query will be in different transaction.
Reuse Data In State
After align data with Timestamp Barrier
, join operators in jobs can keep Delta State
in memory and join data in shared state as follows
draw.io Diagram | ||||||
---|---|---|---|---|---|---|
|
Query1:SELECT * FROM table1
Query2:SELECT * FROM table1 JOIN table2
Query3:SELECT * FROM table1 JOIN table2 JOIN table3
Strong Consistency
It will guarantee strong data consistency among queries above. Query gets the minimum version of all the related tables according to the source tables and the dependencies between them, which ensure data consistency between related tables. For the examples above, Query1, Query2 and Query3 will get Min(table1 version, table2 version) for table1 and table2, Min(table3 version) for table3.
Weak Consistency
It doesn't guarantee the data consistency among queries above, but only the data consistency of a single query. At this time, each query can get its latest version of tables, this ensures better data freshness.
Design of Data Consistency
Global Timestamp Barrier Mechanism
The coordination of Timestamp Barrier
is divided into two parts: the barrier within each ETL job and across ETL jobs.
1. Timestamp Barrier
within ETL job
There are two tables in the ETL Topology
Root Table
: The sink tables in Table Store that ETL jobs consume external source(Kafka/cdc, ect) and write results to them.
Intermediate Table
: The sink tables in Table Store that ETL jobs consumeRoot Table
s andIntermediate Table
s, then write results to them.
ETL jobs which consume the same external source can't be managed by a global timestamp barrier. For example, two ETL jobs consume a Kafka Topic with system timestamp barrier, and write results to Table1 and Table2 in Table Store. So we only guarantee the consistency of ETL jobs and tables in Table Store: that means users must load external data into Table Store by Flink ETL job which generates timestamp barrier for it, then we guarantee data consistency based on ETL Topology
.
Correspondingly, there're two ETL types: Root ETL
reads data from external sources and write data to Root Table
, where Intermediate ETL
reads data from Root Table
and Intermediate Table
. The main difference between them is the way to generate Timestamp Barrier
.
JobManager
in Root ETL
will generate a new unified Timestamp Barrier
itself according to the sources with different strategy, such as system timestamp or event timestamp, and write it into the table in Table Store
. The overall process is as followed.
...
Intermediate ETL
cannot generate a new Timestamp Barrier
itself. It must read Timestamp Barrier
from data in Table Store
, report it to JobManager
and then broadcast it to the downstream tasks.
draw.io Diagram | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Timestamp Barrier
will be transmitted in data stream between subtasks after all the records belong to it, and each record which is processed by operator has a timestamp field of the Timestamp Barrier
. Besides the source, there are three types of operators as followed.
- Stateless operator. The operator processes every input record and output the result which it just does before. It does not need to align data with
Timestamp Barrier
, and when it receivesTimestamp Barrier
, it should broadcast the barrier to downstream tasks. - Stateful operator and temporal operator. Records in a same
Timestamp Barrier
are out of order, stateful and temporal operators should align them according to their timestamp field. The operators will execute computation when they collect all the timestamp barrier, and broadcast it downstream tasks. There's a sequence relationship between timestamp barriers, and records between timestamp barriers are ordered. It means that the operators compute and output results for a timestamp barrier based on the result of a previous timestamp barrier. - Sink operator. Sink streaming output results to
Table Store
, and commit the results when it collect all the timestamp barrier. The source of downstream ETL job can prefetch data fromTable Store
, but should produce data after the upstream sink committed.
2. Timestamp Barrier
across ETL jobs
Root Table
is sources of ETL Topology
and the Intermediate Table
is streaming edge and sink. Each vertex in it is an independent Flink job, in which SplitEnumerator
schedules and reads snapshots from each table.
Each job SplitEnumerator
interacts with MetaService
, creates and sends global timestamp barriers to its sources. The sources collect and broadcast the timestamp barriers. ETL job generates snapshots in sink tables with timestamp barrier information, then the downstream ETL job can read the timestamp barrier information directly, which ensures the timestamp barrier can be transferred among jobs.
|
Each join operator only need to store delta state with timestamp in memory, and when it needs to join data, it can get the total data from shared state and delta state.
Design of Data Consistency
Global Timestamp Barrier Mechanism
The coordination of Timestamp Barrier
is divided into two parts: the barrier within one ETL job and across ETL jobs.
1. Timestamp Barrier
within one ETL job
There are two tables in the ETL Topology
Root Table
: The sink tables in Table Store that ETL jobs consume external source(Kafka/cdc, etc) and write results to them.
Intermediate Table
: The sink tables in Table Store that ETL jobs consumeRoot Table
s andIntermediate Table
s, then write results to them.
ETL jobs which consume the same external source can't be managed by a global timestamp barrier. For example, two ETL jobs consume a Kafka Topic with system timestamp barrier, and write results to Table1 and Table2 in Table Store. So we only guarantee the consistency of ETL jobs and tables in Table Store: that means users must load external data into Table Store by Flink ETL job which generates timestamp barrier for it, then we guarantee data consistency based on ETL Topology
.
Correspondingly, there're two ETL types: Root ETL
reads data from external sources and write data to Root Table
, where Intermediate ETL
reads data from Root Table
and Intermediate Table
. The main difference between them is the way to generate Timestamp Barrier
.
JobManager
in Root ETL
will generate a new unified Timestamp Barrier
itself according to the sources with different strategy, such as system timestamp or event timestamp, and write it into the table in Table Store
. The overall process is as followed.The overall process of global timestamp barrier is as follow
draw.io Diagram | |||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
There are two layers in Global Checkpoint
: MetaService
and JobManager
. MetaService
regards each ETL job as a single node, manages the global checkpoint in the ETL Topology
; JobManager
interacts with MetaService
and manages the global checkpoint in each ETL job.
There are two parts in the global checkpoint processing: interaction between MetaService
and JobManager
, and interaction between JobManager
and Source Node
.
Interaction between
MetaService
andJobManager
ETL job only triggers checkpoint by
SplitEnumerator
manually.When each
SplitEnumerator
in ETL job finishes reading a snapshot inRootTable
, it requests checkpoint fromMetaService
and broadcasts barrier downstream.When each
SplitEnumerator
reads checkpoint from snapshot inIntermediate Table
, it reports checkpoint toMetaService
and broadcasts barrier downstream.
Interaction between
JobManager
andSource Node
SplitEnumerator
sends checkpoint toCheckpointCoordinator
andSource Node
, then theCheckpointCoordinator
will send checkpoint toSource Node
after it receives checkpoints from allSplitEnumerator
s.Source Node processes splits of snapshots. When it receives checkpoint from
SplitEnumerator
andCheckpointCoordinator
, it broadcasts checkpoint barrier after finishing specified snapshot.
...
Data Consistency Management
...
Intermediate ETL
cannot generate a new Timestamp Barrier
itself. It must read Timestamp Barrier
from data in Table Store
, report it to JobManager
and then broadcast it to the downstream tasks.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Timestamp Barrier
will be transmitted in data stream between subtasks after all the records belong to it, and each record which is processed by operator has a timestamp field equals to the Timestamp Barrier
. Besides the source, there are three types of operators as followed.
- Stateless operator. The operator completely ignore the timestamp barrier, processes every input record and output the result which it just does before. But it should collect all Timestamp Barrier and broadcast the barrier to downstream tasks.
- Stateful/Temporal operator, should either
- If the business doesn't require ordering, it could process the records immediately as before
- If the business requires ordering, it buffers the records internally like current windowed/temporal operator are doing. Records in each "epoch" (as demarcated by timestamp barriers) will be processed after the previous "epoch" is finished, just like pre-aggregate. It means that the operators compute and output results for a timestamp barrier based on the result of a previous timestamp barrier.
- Sink operator.
- If the external system requires ordered writes (something like Kafka topic or append only store), the sinks would have to buffer the writes until a "timestamp barrier" arrives
- For sinks which might support writing the data simultaneously to different "epochs". For example writing files bucketed by each epoch. Each bucket/epoch could be committed independently
2. Timestamp Barrier
across ETL jobs
Root Table
is sources of ETL Topology
and the Intermediate Table
is streaming edge and sink. Each vertex in it is an independent Flink job, in which JobManager schedules and reads snapshots from each table.
Each JobManager
interacts with MetaService
, creates and sends global timestamp barriers to its sources. The sources collect and broadcast the timestamp barriers. ETL job generates snapshots in sink tables with timestamp barrier, then the downstream ETL job can read the timestamp barrier directly, which ensures the timestamp barrier can be transmitted among jobs.
The overall process of global timestamp barrier is as follow
draw.io Diagram | ||||||
---|---|---|---|---|---|---|
|
- The topology of ETL jobs and tables
MetaService
manages dependencies between tables and ETL jobs. Based on the relationship information, it supports consistent reading and computing in OLAP, calculates the delay for E2E and each ETL job, helps users to find the bottleneck jobs. When revising data on tables, users can rollback snapshots on tables and state in ETL jobs based on the dependencies.
- Relationship between checkpoints and snapshots of each table
MetaService
ensures data consistency among ETL/OLAP jobs and tables by managing the relationship between checkpoint and snapshot.
Firstly, it's used to ensure the consistency of checkpoint and snapshot among ETL jobs that consume the same table. For example, a Root Table
is consumed by an ETL job and MetaService creates checkpoints on snapshots for it. When a new ETL job consumed this table is started, MetaService will create the same checkpoint on snapshots for it according to the previous job.
draw.io Diagram | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Secondly, it helps to ensure that the checkpoint consistency between tables when an ETL job consumes them. For example, an ETL job consumes Table1 and Table2. When the job is started, it will get snapshot ids for Table1 and Table2 with the same checkpoint from MetaService, even when the progresses of Table1 and Table2 are different. This ensures that the checkpoint of the job can be aligned.
Finally, OLAP/Batch jobs read snapshots from source tables with the same checkpoint too, and this ensures data consistency in job computation.
- The aborted checkpoints of each table
MetaService supports solving the checkpoint alignment problem of ETL jobs consuming multiple tables by managing aborted checkpoints. When an ETL job consumes multiple tables, checkpoint aborted in one of them will cause the job to fail align at the checkpoint. Each job must get the following answers from MetaService
before it triggers checkpoint
- The ETL job doesn't generate any checkpoint, it schedules and processes splits in the next snapshot directly.
- The ETL job performs checkpoint, then schedules the next snapshot.
- The ETL job aborts specified checkpoint, then performs another checkpoint.
- The completed checkpoints of each table
MetaService
manages completed checkpoints of each table and guarantees data consistency in OLAP query. OLAP query should request versions of source tables from MetaService
, and MetaService
calculates snapshot ids of tables based on the dependencies between tables, completed checkpoints and snapshots in each table and consistency type requirement. OLAP reads data from tables according to the given snapshot ids, which ensure the data consistency for it.
- Information about tables and snapshots being used by the jobs
MetaService manages information about snapshots being used by ETL jobs or OLAP on tables, then determines which snapshots of tables can be safely deleted, compacted without affecting the jobs who are reading the data, ensures these jobs can read correct data.
- Checkpoint progress of each ETL job
MetaService manages start time, finish time, total cost of checkpoints for each job, it helps users to analyze the E2E delay and optimize the ETL jobs.
ETL Jobs Failover
...
|
There are two layers in Global Timestamp Barrier
: MetaService
and JobManager
. MetaService
regards each ETL job as a single node, manages the global timestamp barrier in the ETL Topology
; JobManager
interacts with MetaService
and manages the global timestamp barrier in each ETL job.
There are two parts in the global timestamp barrier processing: interaction between MetaService
and JobManager
, and interaction between JobManager
and Source Node
.
Interaction between
MetaService
andJobManager
JobManager
of each ETL job requests a start timestamp barrier fromMetaService
for its sources when it is started.- When a ETL job completes a timestamp barrier and commit the data to
Table Store
, it reports the timestamp barrier toMetaService
.
Interaction between
JobManager
andSource Node
JobManager
reads and manages snapshot and timestamp barrier fromTable Store
, when it collects all the timestamp barrier of table, it sends the barrier to source subtasks.Source Node processes splits of snapshots. When it receives timestamp barrier from
JobManager
, it broadcasts timestamp barrier after finishing specified splits.
The interactions among JobManager and Source Node
are as followed.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Data Consistency Management
MetaService
manages checkpoints between jobs and versions of each table, the main information includes
- The topology of ETL jobs and tables
MetaService
manages dependencies between tables and ETL jobs. Based on the relationship information, it supports consistent reading and computing in OLAP, calculates the delay for E2E and each ETL job, helps users to find the bottleneck jobs. When revising data on tables, users can rollback snapshots on tables and state in ETL jobs based on the dependencies.
- Relationship between timestamps and snapshots of each table
MetaService
ensures data consistency among ETL/OLAP jobs and tables by managing the relationship between timestamp and snapshot.
Firstly, it's used to ensure the consistency of timestamp and snapshot among ETL jobs that consume the same table. For example, a Root Table
is consumed by an ETL job and MetaService creates timestamps on snapshots for it. When a new ETL job consumed this table is started, MetaService will create the same timestamp on snapshots for it according to the previous job.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Secondly, it helps to ensure that the timestamp barrier consistency between tables when an ETL job consumes them. For example, an ETL job consumes Table1 and Table2. When the job is started, it will get snapshot ids for Table1 and Table2 with the same timestamp from MetaService, even when the progresses of Table1 and Table2 are different. This ensures that the timestamp of the job can be aligned.
Finally, OLAP/Batch jobs read snapshots from source tables with the same checkpoint too, and this ensures data consistency in job computation.
- The completed timestamp of each table
MetaService
manages completed timestamps of each table and guarantees data consistency in OLAP query. OLAP query should request versions of source tables from MetaService
, and MetaService
calculates snapshot ids of tables based on the dependencies between tables, completed timestamps and snapshots in each table and consistency type requirement. OLAP reads data from tables according to the given snapshot ids, which ensure the data consistency for it.
- Information about tables and snapshots being used by the jobs
MetaService manages information about snapshots being used by ETL jobs or OLAP on tables, then determines which snapshots of tables can be safely deleted, compacted without affecting the jobs who are reading the data, ensures these jobs can read correct data.
- Timestamp progress of each ETL job
MetaService manages start time, finish time, total cost of timestamp barriers for each job, it helps users to analyze the E2E delay and optimize the ETL jobs.
ETL Jobs Failover
Each ETL job may fail in the ETL Topology
, but unlike the general Flink Streaming Job, it should not cause the failover of ETL Topology
. The ETL job in ETL Topology
must meet the following conditions
- The determination of reading data
Flink jobs read snapshots from Table Store. When a job fails, it must be able to reread snapshots according to the previous timestamp from checkpoint. If the relationship between timestamp and snapshot is determined, and the timestamp can be recovered from checkpoint, the failed job can read the same data from the same snapshot according to the same timestamp, which means the job will read determined data from Table Store before and after failover.
- The determination of writing data
Flink jobs commit data with timestamp information to Table Store according to their timestamp barrier. Each job commits data only when the specified timestamp are completed, which means the job writes the determined data in Table Store before and after failover.
- Orderliness of data and computation
Flink jobs read and write snapshots which are according to their timestamps. Timestamp barriers will be aligned in each job and among multiple jobs. This means that although the data in one timestamp barrier is out of order, the data and computation between timestamp barriers across multiple jobs are in order.
Because of determination and orderliness, the failover of a single ETL job will not cause the failover of the entire ETL Topology
. The JobManager of each ETL job only needs to process the failover within the job. To do that, we need to support failover of Timestamp Barrier
, which includes:
- Recover timestamp barriers from
Checkpoint
. The boundaries of checkpoint and timestamp barrier are aligned, and the job can recover the same timestamp barrier for failed checkpoint. For example, there are timestamp barrier 1, 2, 3 in checkpoint 1, and the ETL job is processing data for checkpoint 2 with timestamp 3, 4. When the job failed, it will recover from checkpoint 1 and assign the same timestamp 3 and 4 for checkpoint 2. - Replay data for the same timestamp barriers. For the above example, when job recover from checkpoint 1 and replay data for timestamp 3 and 4, it must produce the same data as before failover.
To achieve that, Flink should store <Timestamp Barrier, Offset> and <Checkpoint, Timestamp Barrier> information when a timestamp barrier is generated.
After implementing this function, in addition to the above failover processing, we can do something easily when some table data needs to be revised due to the certainty of the snapshot, timestamp and checkpoint of Table Store. For example, when we need to revise data in Table3, we can roll back to a specified checkpoint in all downstream cascaded ETL jobs and tables.
- Incremental processing
All table snapshots are aligned according to a unified checkpoint. When a specified table data needs to be revised, we just need to rollback all its downstream tables to a unified snapshot, reset the streaming jobs' state to the specified checkpoint, and then restart the jobs to consume incremental data.
- Full processing
Due to reasons such as the ETL jobs' state TTL, we cannot perform incremental processing. At this time, we can perform full processing, clear the data and ETL state of all downstream tables and jobs, and then restart the jobs to consume the data in full.
Incremental processing is as followed.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Start And Stop ETL Jobs
Register Tables
Flink ETL job needs to register its source and sink tables with MetaService when it is submitted. At present, the client will create the specified TableStoreSource and TableStoreSink from Table Store in the process of generating the Flink execution plan. In this process, we can register the jobid and table information with MetaService.
MetaService creates relationship between the source and sink tables by the jobid. After an ETL job generates the plan, it may not be submitted to the cluster successfully due to some exceptions such as network or resources. The register information of tables can't be accessed and can only be accessed after the job is submitted to cluster and the SplitEnumerator registers itself to MetaService too.
Query Data Versions
ETL and OLAP jobs must get snapshot ids of tables from MetaService when they are submitted to the cluster according to consistency requirement. Flink jobs can get versions of tables when they create them in FlinkCatalog. The main processes are as followed.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
- Stop ETL Job
The relationship between source and sink tables of an ETL job should be deleted when the job terminates. We can add a listener JobTerminatedListener in Flink, and notify JobManager to send delete event to MetaService when job is terminated.
Summary
The main work in Timestamp Barrier
and the differences between Timestamp Barrier
and existing Watermark
in Flink are in the following table.
Modules | Timestamp Barrier | Watermark |
Generation |
| Each source subtask generate timestamp barrier(watermark event) from System Time or Event Time |
Checkpoint | Store <checkpoint, timestamp barrier> when the timestamp barrier is generated, so that the job can recover the same timestamp barrier for the uncompleted checkpoint. | None |
Replay data | Store <timestamp barrier, offset> for source when it broadcasts timestamp barrier, so that the source can replay the same data according to the same timestamp barrier. | None |
Align data | Align data for stateful operator(aggregation, join and etc.) and temporal operator(window) | Align data for temporal operator(window) |
Computation | Operator computation for a specific timestamp barrier based on the results of a previous timestamp barrier. | Window operator only computes results in the window range. |
Output | Operator outputs or commits results when it collects all the timestamp barriers, including operators with data buffer or async operations. | Window operator support "emit" output |
The main work in Flink
and Table Store
are as followed
Component | Main Work | |
---|---|---|
MetaService |
| |
Table Store | Catalog |
|
Source and SplitEnumerator |
| |
Sink |
| |
Flink | Timestamp Barrier Mechanism | The detailed and main work is in the above table |
Planner |
| |
JobManager |
| |
Improvement |
|
Constraint
The current FLIP design has two constraints and it may continue to improve in the future
- Multiple jobs are not supported to write to the same table concurrently
- ETL topology does not support cycles
MetaService needs to detect these situations and report errors when ETL jobs are registered.
The Next Step
This is an overall FLIP for data consistency in streaming and batch ETL. Next, we would like to create FLIP for each functional module with detailed design. For example:
- Timestamp Barrier Coordination and Generation
- Timestamp Barrier Checkpoint and Recovery
- Timestamp Barrier Replay Data Implementation
- Timestamp Barrier Alignment and Computation In Operator
- Introduce Delta Join in Flink To Improve State Resource
- Introduce MetaService module and implement source/sink in Table Store and etc
- Job and Table management in MetaService such as exception handling, data revision and etc
Rejected Alternatives
Data consistency management
What we need in Flink is a Timestamp Barrier Mechanism
to align data in stateful and temporal operator. As shown above, the existing Watermark
cannot align data. At present, Aligned Checkpoint
is the only one which can align data in stateful operator such as aggregation and join operators in Flink. But there are some problems of Checkpoint
for data consistency
- Flink uses Checkpoint as a fault-tolerant mechanism, it supports aligned checkpoint, non-aligned checkpoint, and may even task local checkpoint in the future.
- Even for Aligned Checkpoint, data consistency cannot be guaranteed for some operators, such as Temporal operators with timestamp or data buffer.
Data consistency coordinator
By coordinating timestamp barriers between
- The determination of reading data
Flink jobs read snapshots from Table Store. When a job fails, it will reread snapshots according to the latest checkpoint. The relationship between checkpoint and snapshot is determined. The failed job can read the same data from the same snapshot according to the same checkpoint, which means the job will read determined data from Table Store before and after failover.
- The determination of writing data
Flink jobs write snapshots with checkpoint information to Table Store according to their checkpoints. Each job creates snapshots only when the specified checkpoints are completed, which means the job writes the determined data in Table Store before and after failover.
- Orderliness of data and computation
Flink jobs read and write snapshots which are according to their checkpoints. Checkpoint barriers will be aligned in each job and among multiple jobs. This means that although the data in one checkpoint is out of order, the data and computation between checkpoints across multiple jobs are in order.
Because of determination and orderliness, the failover of a single ETL job will not cause the failover of the entire ETL Topology
. The CheckpointCoordinator of each ETL job only needs to process the failover within the job.
At the same time, in addition to the above failover processing, we can do something easily when some table data needs to be revised due to the certainty of the snapshot and checkpoint of Table Store. For example, when we need to revise data in Table3, we can roll back to a specified checkpoint in all downstream cascaded ETL jobs and tables.
- Incremental processing
All table snapshots are aligned according to a unified checkpoint. When a specified table data needs to be revised, we just need to rollback all its downstream tables to a unified snapshot, reset the streaming jobs' state to the specified checkpoint, and then restart the jobs to consume incremental data.
- Full processing
Due to reasons such as the ETL jobs' state TTL, we cannot perform incremental processing. At this time, we can perform full processing, clear the data and ETL state of all downstream tables and jobs, and then restart the jobs to consume the data in full.
Incremental processing is as followed.
...
Start And Stop ETL Jobs
Register Tables
Flink ETL job needs to register its source and sink tables with MetaService when it is submitted. At present, the client will create the specified TableStoreSource and TableStoreSink from Table Store in the process of generating the Flink execution plan. In this process, we can register the jobid and table information with MetaService. The REST api in MetaService is
Code Block | ||
---|---|---|
| ||
/register/source/:jobID/:catalog/:database/:table
/register/sink/:jobID/:catalog/:database/:table |
MetaService creates relationship between the source and sink tables by the jobid. After an ETL job generates the plan, it may not be submitted to the cluster successfully due to some exceptions such as network or resources. The register information of tables can't be accessed and can only be accessed after the job is submitted to cluster and the SplitEnumerator registers itself to MetaService too.
Query Data Versions
...
REST apis should be added in MetaService to response versions of tables according to the consistency requirement.
Code Block | ||
---|---|---|
| ||
/version/:consistencyType/:catalog/:database/:table |
- Stop ETL Job
The relationship between source and sink tables of an ETL job should be deleted when the job terminates. We can add a listener JobTerminatedListener in Flink, and notify SplitEnumerator to send delete event to MetaService when job is terminated. The interface and api is as followed
Code Block | ||
---|---|---|
| ||
/** * Listener for job and notify it when job is stopped or canceled. **/
public interface JobTerminationListener {
void notifyJobTerminated(Job jobID);
}
// Unregister in MetaService REST
/unregister/:jobID |
Rejected Alternatives
Data consistency management
...
Parse timestamps from data as watermarks.
ETL generates versions based on watermarks, writes them to Table Store, and flows among multiple ETL jobs.
Flink OLAP queries the table from the snapshot of Table Store according to the versions.
Main Problems
Currently watermark in Flink cannot align data.
...
As shown above, there are 2 watermarks T1 and T2, T1 < T2. The StreamTask reads data in order: V11,V12,V21,T1(channel1),V13,T1(channel2). At this time, StreamTask will confirm that watermark T1 is completed, but the data beyond T1 has been processed(V13) and the results are written to sink table.
Watermark mechanism cannot guarantee the consistency of data, so we choose the Checkpoint.
But there are some problems of Checkpoint for data consistency
- Flink uses Checkpoint as a fault-tolerant mechanism, it supports aligned checkpoint, non-aligned checkpoint, and may even task local checkpoint in the future.
- Even for Aligned Checkpoint, data consistency cannot be guaranteed for some operators, such as Temporal operators with timestamp or data buffer.
So after the implementation in the first stage, we need to upgrade the Watermark or implement a new Timestamp Barrier mechanism in Flink in future to support full semantics of Data Consistency. In a single ETL job
- Source generates Timestamp Barrier based on System Time or Event Time
- Aggregation and Temporal operators align data by Timestamp Barrier, perform computation and output results for each Timestamp Barrier
- Timestamp Barriers have order relations, Aggregation and Temporal operators should perform computation between Timestamp Barrier after do it for each Timestamp Barrier
ETL jobs write each Timestamp Barrier to snapshot in Table Store, and the downstream ETL can read the Timestamp Barrier, which makes the Timestamp Barrier be transferred between ETL jobs.
Finally, we can guarantee the consistency of data by Timestamp Barrier and exactly-once computation by Checkpoint independently.
Data consistency coordinator
By coordinating checkpoints among multiple jobs, the consistency of data among multiple ETL jobs Sink Tables can be ensured during query. Besides global checkpointtimestamp barrier between jobs, we also consider adaptive checkpoint in each ETLtimestamp barrier.
Each ETL job manages its checkpoint timestamp barrier and MetaServices manages the relationships of checkpoints timestamp barriers between ETL jobs.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
As shown above, CP30 Timestamp30 in Table1 and CP10 Timestamp10 in Table2 generates CP3 Timestamp3 in Table3, and so on. When users query on these tables, MetaService calculates the snapshot ids of them according to the checkpoints timestamp barriers relationships in the ETL jobs.
In this way, we can define the data consistency of queries, but it's difficult to define the data processing delay between jobs and tables. For example, it is difficult to define the data delay from Table1, Table2 to Table3. As the number of cascaded layers increases, this definition will become very complex.
...
Data consistency of ETL Topology is our first phase of work. After completing this part, we plan to promote the capacity building and improvement of Flink + Table Store in future, mainly including the following aspects.in future, mainly including the following aspects.
- Support data consistency semantics. As mentioned above, we need to implement "Timestamp Barrier" to support full semantics data consistency.
Materialized View in SQL. Next, we hope to introduce materialized view syntax into Flink to improve user interaction experience. Queries can also be optimized based on materialized views to improve performance.
Improve MetaService capabilities. ManagerService is a single point in the system, and it should supports failover. In the other way, MetaService supports managing Flink ETL jobs and tables in Table Store, accessed by other computing engines such as Spark and being an agent of Hive Metastore later.
Improve data consistency semantics. As mentioned above, we need to implement "Timestamp Barrier" to support full semantics data consistency instead of "Aligned Checkpoint" in the first stage.
Improve OLAP performance. We have created issues in FLINK-25318] Improvement of scheduler and execution for Flink OLAP to manage improvement of OLAP in Flink. At the same time, we hope to continue to enhance the online query capability of Table Store and improve the OLAP performance of Flink + Table Store.
Improvement of data real-time. At present, our consistency design is based on Flink checkpoint mechanism and supports minute level delay. In the future, we hope we hope to support second level or even millisecond level data real-time on the premise of ensuring data consistency, which requires continuous optimization in computing and storage.
By promoting the above optimization and implementation, we hope that Flink + Table Store can support the full StreamingWarehouse capability. Users can create materialized views and execute OLAP queries in the system, just like using databases and data warehouses, and output data to the application layer (such as KV) as required.