...
In the whole process of streaming data processing, there are mainly the following 4 How 5 Hows problems(general streaming and batch ETL):
...
Multiple ETL jobs and tables in Table Store form a topology. For example, there are dependencies between Table1, Table2, and Table3. Currently, users can't get relationship information between these tables, and when data on a table is late, it is difficult to trace the root cause based on job dependencies.
- HOW to
...
- share state between multiple jobs to save resources?
Multiple ETL jobs may have the same state data in different computation. For example, there're two JOIN ETL between three tables as follows
draw.io Diagram | |||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
HOW to define E2E data processing delay of ETL jobs and tables in topology above?
Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?
HOW to revise the data in tables updated by streaming job?
When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?
In order to answer the above questions, we introduce Timestamp Barrier
in Flink to align data, introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.
Proposed Design
Architecture
...
|
The data in State1
and Table1
are same, State2
, State3
and Table2
are same, State3
and Table3
are same.The more times the same table is joined, the more serious the resource waste is. We should reuse the data in the state and save resources.
HOW to define the data correctness in query?
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
As shown above, Flink ETL jobs will generate V11, V21, V31 in Table1,2,3 respectively for V1 in CDC. Suppose the following case: there's a base table in database, a user creates cascaded views or materialized views Table1, Table2 and Table3 based on it. Then the user executes complex queries on Table1, Table2, and Table3 and the query results are consistent with the base table. When the user creates the three tables in Table Store and incrementally updates data of them by Flink ETL jobs in real-time, these tables can be regarded as materialized views that are incrementally updated in real time. In the process of streaming data processing, how to define data correctness in query when users perform join query on these three tables? The case in point is how to ensure V11, V21 and V31 are read or not read in one query?
HOW to define E2E data processing delay of ETL jobs and tables in topology above?
Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?
HOW to revise the data in tables updated by streaming job?
When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?
In order to answer the above questions, we introduce Timestamp Barrier
in Flink to align data, introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.
Proposed Design
Architecture
We can regard each Flink ETL job as a single node with complex computation, and the table in Table Store as a data stream. Flink ETL and tables form a huge streaming job, which we call ETL Topology
. We setup a MetaService node to manage the ETL Topology
. The main architecture is:
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
There are two core points in the architecture: Timestamp Barrier Mechanism
and MetaService
Timestamp Barrier Mechanism
We need a barrier mechanism in Flink to guarantee the data consistency.
- Each ETL source needs to be assigned a unified
Timestamp Barrier
- Stateful and temporal operators in Flink ETL job align and compute data according to the barrier.
- Sink operator in ETL job confirms data with barrier to sink tables in Table Store.
MetaService
component
MetaService
is the coordinator in ETL Topology
, its capabilities are as followed:
1> Coordinate the Global Timestamp Barrier in ETL Topology
- As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global
Timestamp Barrier
. Timestamp Barrier
is transmitted between ETL job nodes by tables, then these job nodes can create globally consistency snapshots in Table Store according to the barrier.
2> Manage dependencies between ETL jobs and tables
- MetaService manages the relationship between ETL jobs and tables in
ETL Topology
. Users can query these dependencies from MetaServices. - MetaService manages
Timestamp Barrier
in each ETL job, including barrier progress, completed barriers, etc. - MetaService manages the
Timestamp Barrier
and snapshot of tables, including the latest completed snapshots, the relationship between barriers and snapshots.
3> Manage Data Consistency in Query On Table Store
- MetaService supports data consistency in query based on the management of dependencies between tables.
MetaService determines the compaction and expiration of snapshots for each table according to snapshots being used by the OLAP and ETL jobs.
User Interfaces
User Interaction
Setup MetaService
In the first phase, we'd
...
There are two core points in the architecture: Timestamp Barrier Mechanism
and MetaService
Timestamp Barrier Mechanism
We need a barrier mechanism in Flink to guarantee the data consistency.
- Each ETL source needs to be assigned a unified
Timestamp Barrier
- Stateful and temporal operators in Flink ETL job align and compute data according to the barrier.
- Sink operator in ETL job confirms data with barrier to sink tables in Table Store.
MetaService
component
MetaService
is the coordinator in ETL Topology
, its capabilities are as followed:
1> Coordinate the Global Timestamp Barrier in ETL Topology
- As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global
Timestamp Barrier
. Timestamp Barrier
is transmitted between ETL job nodes by tables, then these job nodes can create globally consistency snapshots in Table Store according to the barrier.
2> Manage dependencies between ETL jobs and tables
- MetaService manages the relationship between ETL jobs and tables in
ETL Topology
. Users can query these dependencies from MetaServices. - MetaService manages
Timestamp Barrier
in each ETL job, including barrier progress, completed barriers, etc. - MetaService manages the
Timestamp Barrier
and snapshot of tables, including the latest completed snapshots, the relationship between barriers and snapshots.
3> Manage Data Consistency in Query On Table Store
- MetaService supports data consistency in query based on the management of dependencies between tables.
MetaService determines the compaction and expiration of snapshots for each table according to snapshots being used by the OLAP and ETL jobs.
User Interfaces
User Interaction
Setup MetaService
In the first phase, we'd like to start a standalone MetaService with storage path and REST port in configuration.
...
Timestamp Barrier
divides unbounded streaming data in ETL Topology
into multiple bounded data set, each bounded data set can be seen as a big transaction
in streaming processing. Transaction
in streaming processing has following characteristics
- Each
transaction
consists of multiple operations in tables, each sink operation commits data to table accordingTimestamp Barrier. The
transaction
will be committed after all the operations are committed. - There is a sequential relationship between multiple transactions in processing data. They commit data to the same table serially.
- There're three states in a table for specific
transaction
: PreCommit, Commit and Snapshot- PreCommit: Sink has committed data to table according to
Timestamp Barrier
, but the relatedtransaction
is processing and not committed. The committed data in the table may be rolled back if the job fails. - Commit: The
transaction
related to a specificTimestamp Barrier
is committed, and the data in tables may be rolled back if jobs fail. - Snapshot: The
transaction
related to a specificTimestamp Barrier
is committed and all the tables generate snapshots for thetransaction
. The data in the tables won't be rolled back even when jobs fail.
- PreCommit: Sink has committed data to table according to
The key points of transaction are as follows
- If the records in a "epoch" (Timestamp Barrier) are finished writing to a table, we call the transaction is PROCESSED in the table.
- If the table creates a snapshot for the records in a "epoch", we call the transaction is WRITTEN in the table.
- If a transaction is PROCESSED in all tables, we call the transaction is PRECOMMIT
- If a transaction is WRITTEN in all tables, we call the transaction is COMMIT
When job fails, the records is not WRITTEN in a table will be "rolled back". Same as the above example, suppose the data in the tables are as follows
...
User performs query SELECT userId, itemId, totalPrice, totalAmount, totalPrice / totalAmount as avgPrice FROM UserItemAmount a JOIN UserItemPrice p ON a.userId=p.userId and a.itemId=p.itemId
on tables user_item_amount and user_item_price multiple times.
...
Read Uncommitted refers to querying table data of uncommitted transactions. When some tables in a transaction have committed data according to the Timestamp Barrier
and can be read by query, but PROCESSED data, the remaining tables have are not been committedPROCESSED, and the transaction has will not been committedPRECOMMIT yet. For example
- The committed data is PROCESSED in user_item_price are : (user1, item1, 2500).
- The uncommitted data is not PROCESSED in user_item_amount are : (user1, item1, 100).
- The result of user's query will be (user1, item1, 2500, 100, 25) which is not a consistency result.
...
Read Committed
refers to querying table data of committed PRECOMMIT transactions only, it is default consistency in MetaService
. When a transaction
is committedPRECOMMIT, data in all tables are committedPROCESSED. Then the query can read the consistency data according to specific transaction
. For example
- The transaction T is not committedPRECOMMIT, the query result is (user1, item1, 1000, 100, 10)
- The transaction T has been committedPRECOMMIT, the query result is (user1, item1, 2500, 300, 8.33333)
Read Committed
doesn't support Repeatable Read
, which means when jobs fail after transaction T is PRECOMMIT, the data in tables will be rolled back and the query result will be rolled back fallback from (user1, item1, 2500, 300, 8.33333) to (user1, item1, 1000, 100, 10)
...
Repeatable Read
only reads data of a specific transaction
from snapshotis WRITTEN in tables. The snapshots in a table won't be rolled back even when job fails, query can get a committed transaction from snapshots of tablesjobs fail. For example
- Transaction T has been committedPROCESSED, but the related snapshots in tables are not created, the query result is (user1, item1, 1000, 100, 10)
- When the related snapshots in tables have been created, the query result is (user1, item1, 2500, 300, 8.33333)Snapshots in a persistent storage won't be rolled back even when jobs fail, and the query result will always be (user1, item1, 2500, 300, 8.33333), it's
Repeatable Read
- Snapshots in a persistent storage won't be rolled back even when jobs fail, and the query result will always be (user1, item1, 2500, 300, 8.33333), it's
Repeatable Read
If Repeatable Read
only reads data of a COMMIT transaction, the data will be consistency; otherwise, the data in a query will be in different transaction.
Reuse Data In State
After align data with Timestamp Barrier
, join operators in jobs can keep Delta State
in memory and join data in shared state as follows
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Each join operator only need to store delta state with timestamp in memory, and when it needs to join data, it can get the total data from shared state and delta state.
Design of Data Consistency
...
- Stateless operator. The operator completely ignore the timestamp barrier, processes every input record and output the result which it just does before. It does not need to align data with But it should collect all Timestamp Barrier , and when it receives
Timestamp Barrier
, it should broadcast the barrier to downstream tasks. - Stateful operator and temporal operator. Records in a same
Timestamp Barrier
are out of order, stateful and temporal operators should align them according to their timestamp field. The operators will execute computation when they collect all the timestamp barrier, and broadcast it downstream tasks. There's a sequence relationship between timestamp barriers, and records between timestamp barriers are ordered/Temporal operator, should either- If the business doesn't require ordering, it could process the records immediately as before
- If the business requires ordering, it buffers the records internally like current windowed/temporal operator are doing. Records in each "epoch" (as demarcated by timestamp barriers) will be processed after the previous "epoch" is finished, just like pre-aggregate. It means that the operators compute and output results for a timestamp barrier based on the result of a previous timestamp barrier.
- Sink operator. Sink streaming output results to
Table Store
, and commit the results when it collects all the timestamp barrier. The source of downstream ETL job can prefetch data fromTable Store
, but should produce data after the upstream sink committed.- If the external system requires ordered writes (something like Kafka topic or append only store), the sinks would have to buffer the writes until a "timestamp barrier" arrives
- For sinks which might support writing the data simultaneously to different "epochs". For example writing files bucketed by each epoch. Each bucket/epoch could be committed independently
2. Timestamp Barrier
across ETL jobs
...
Component | Main Work | |
---|---|---|
MetaService |
| |
Table Store | Catalog |
|
Source and SplitEnumerator |
| |
Sink |
| |
Flink | Timestamp Barrier Mechanism | The detailed and main work is in the above table |
Planner |
| |
JobManager |
| |
Improvement |
|
Constraint
The current FLIP design has two constraints and it may continue to improve in the future
...
- Timestamp Barrier Coordination and Generation
- Timestamp Barrier Checkpoint and Recovery
- Timestamp Barrier Replay Data Implementation
- Timestamp Barrier Alignment and Computation In Operator
- Introduce Delta Join in Flink To Improve State Resource
- Introduce MetaService module and implement source/sink in Table Store and etc
- Job and Table management in MetaService such as exception handling, data revision and etc
...