Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Watermark mechanism cannot guarantee the consistency of data, so we choose the Checkpoint.

Data consistency coordinator

...

But there are some problems of Checkpoint for data consistency

  • Flink uses Checkpoint as a fault-tolerant mechanism, it supports aligned checkpoint, non-aligned checkpoint, and may even task local checkpoint in the future.
  • Even for Aligned Checkpoint, data consistency cannot be guaranteed for some operators, such as Temporal operators with timestamp or data buffer.

So after the implementation in the first stage, we need to upgrade the Watermark or implement a new Timestamp Barrier mechanism in Flink in future to support full semantics of Data Consistency. In a single ETL job

  1. Source generates Timestamp Barrier based on System Time or Event Time
  2. Aggregation and Temporal operators align data by Timestamp Barrier, perform computation and output results for each Timestamp Barrier
  3. Timestamp Barriers have order relations, Aggregation and Temporal operators should perform computation between Timestamp Barrier after do it for each Timestamp Barrier

ETL jobs write each Timestamp Barrier to snapshot in Table Store, and the downstream ETL can read the Timestamp Barrier, which makes the Timestamp Barrier be transferred between ETL jobs.

Finally, we can guarantee the consistency of data by Timestamp Barrier and exactly-once computation by Checkpoint independently.

Data consistency coordinator


By coordinating checkpoints among multiple jobs, the consistency of data among multiple ETL jobs Sink Tables can be ensured during query. Besides global checkpoint, we also consider adaptive checkpoint in each ETL.
Each ETL job manages its checkpoint and MetaServices manages the relationships of checkpoints between ETL jobs.

draw.io Diagram
bordertrue
diagramNamefigure11
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth601
revision1

As shown above, CP30 in Table1 and CP10 in Table2 generates CP3 in Table3, and so on. When users query on these tables, MetaService calculates the snapshot ids of them according to the checkpoints relationships in the ETL jobs.
In this way, we can define the data consistency of queries, but it's difficult to define the data processing delay between jobs and tables. For example, it is difficult to define the data delay from Table1, Table2 to Table3. As the number of cascaded layers increases, this definition will become very complex.

On the other hand, this proposal increases the cost of data operation and management. When the data of a table needs to be rolled back to the specified snapshot for some reason, each downstream table needs to be reset to a different snapshot. It's terrible. For the above reasons, we choose the global checkpoint mechanism in the first stage. 

On the other hand, this proposal increases the cost of data operation and management. When the data of a table needs to be rolled back to the specified snapshot for some reason, each downstream table needs to be reset to a different snapshot. It's terrible.

For the above reasons, we choose the global checkpoint mechanism in the first stage. But there are some problems of Checkpoint for data consistency

  • Flink uses Checkpoint as a fault-tolerant mechanism, it supports aligned checkpoint, non-aligned checkpoint, and may even task local checkpoint in the future.
  • Even for Aligned Checkpoint, data consistency cannot be guaranteed for some operators, such as Temporal operators with timestamp or data buffer.

So after the implementation in the first stage, we need to upgrade the Watermark or implement a new Timestamp Barrier mechanism in Flink to support full semantics of Data Consistency. In a single ETL job

  1. Source generates Timestamp Barrier based on System Time or Event Time
  2. Timestamp Barriers have order relations. Aggregation and Temporal operator perform computation in each Timestamp Barrier, then between Timestamp Barrier.
  3. Aggregation and  Temporal operator outputs results for each Timestamp Barrier

ETL jobs write each Timestamp Barrier to snapshot in Table Store, and the downstream ETL can read the Timestamp Barrier, which makes the Timestamp Barrier be transferred between ETL jobs.

Finally, we can guarantee the consistency of data by Timestamp Barrier and exactly-once computation by Checkpoint independently.

Roadmap In Future


Data consistency of ETL Topology is our first phase of work. After completing this part, we plan to promote the capacity building and improvement of Flink + Table Store in future, mainly including the following aspects.

...