...
Watermark mechanism cannot guarantee the consistency of data, so we choose the Checkpoint.
Data consistency coordinator
...
But there are some problems of Checkpoint for data consistency
- Flink uses Checkpoint as a fault-tolerant mechanism, it supports aligned checkpoint, non-aligned checkpoint, and may even task local checkpoint in the future.
- Even for Aligned Checkpoint, data consistency cannot be guaranteed for some operators, such as Temporal operators with timestamp or data buffer.
So after the implementation in the first stage, we need to upgrade the Watermark or implement a new Timestamp Barrier mechanism in Flink in future to support full semantics of Data Consistency. In a single ETL job
- Source generates Timestamp Barrier based on System Time or Event Time
- Aggregation and Temporal operators align data by Timestamp Barrier, perform computation and output results for each Timestamp Barrier
- Timestamp Barriers have order relations, Aggregation and Temporal operators should perform computation between Timestamp Barrier after do it for each Timestamp Barrier
ETL jobs write each Timestamp Barrier to snapshot in Table Store, and the downstream ETL can read the Timestamp Barrier, which makes the Timestamp Barrier be transferred between ETL jobs.
Finally, we can guarantee the consistency of data by Timestamp Barrier and exactly-once computation by Checkpoint independently.
Data consistency coordinator
By coordinating checkpoints among multiple jobs, the consistency of data among multiple ETL jobs Sink Tables can be ensured during query. Besides global checkpoint, we also consider adaptive checkpoint in each ETL.
Each ETL job manages its checkpoint and MetaServices manages the relationships of checkpoints between ETL jobs.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
As shown above, CP30 in Table1 and CP10 in Table2 generates CP3 in Table3, and so on. When users query on these tables, MetaService calculates the snapshot ids of them according to the checkpoints relationships in the ETL jobs.
In this way, we can define the data consistency of queries, but it's difficult to define the data processing delay between jobs and tables. For example, it is difficult to define the data delay from Table1, Table2 to Table3. As the number of cascaded layers increases, this definition will become very complex.
On the other hand, this proposal increases the cost of data operation and management. When the data of a table needs to be rolled back to the specified snapshot for some reason, each downstream table needs to be reset to a different snapshot. It's terrible. For the above reasons, we choose the global checkpoint mechanism in the first stage.
On the other hand, this proposal increases the cost of data operation and management. When the data of a table needs to be rolled back to the specified snapshot for some reason, each downstream table needs to be reset to a different snapshot. It's terrible.
For the above reasons, we choose the global checkpoint mechanism in the first stage. But there are some problems of Checkpoint for data consistency
- Flink uses Checkpoint as a fault-tolerant mechanism, it supports aligned checkpoint, non-aligned checkpoint, and may even task local checkpoint in the future.
- Even for Aligned Checkpoint, data consistency cannot be guaranteed for some operators, such as Temporal operators with timestamp or data buffer.
So after the implementation in the first stage, we need to upgrade the Watermark or implement a new Timestamp Barrier mechanism in Flink to support full semantics of Data Consistency. In a single ETL job
- Source generates Timestamp Barrier based on System Time or Event Time
- Timestamp Barriers have order relations. Aggregation and Temporal operator perform computation in each Timestamp Barrier, then between Timestamp Barrier.
- Aggregation and Temporal operator outputs results for each Timestamp Barrier
ETL jobs write each Timestamp Barrier to snapshot in Table Store, and the downstream ETL can read the Timestamp Barrier, which makes the Timestamp Barrier be transferred between ETL jobs.
Finally, we can guarantee the consistency of data by Timestamp Barrier and exactly-once computation by Checkpoint independently.
Roadmap In Future
Data consistency of ETL Topology is our first phase of work. After completing this part, we plan to promote the capacity building and improvement of Flink + Table Store in future, mainly including the following aspects.
...