Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. MetaService manages the relationship between ETL jobs and tables in ETL Topology. Users can query these dependencies from MetaServices.
  2. MetaService manages Timestamp Barrier  in each ETL job, including barrier progress, finished completed barriers, etc.
  3. MetaService manages the Timestamp Barrier  and snapshot of tables, including the latest completed snapshots, the relationship between barriers and snapshots.

...

We add a new metastore type: table-store , which manages the Catalog and data consistency in Table Store. Users can create a Catalog with metastore table-store  in Sql-Client, and specify the address and consistency type by uri and consistency-type. Flink ETL job, which reads from and writes to Table Store will be managed by MetaService to ensure data consistency. In the first phasestage, table-store  metastore only supports FileSystemCatalog and will support HiveCatalog later. The user cases are shown as followed.

...

The coordination of Timestamp Barrier is divided into two parts: the barrier within each one ETL job and across ETL jobs.

1. Timestamp Barrier within one ETL job

There are two tables in the ETL Topology

  • Root Table: The sink tables in Table Store that ETL jobs consume external source(Kafka/cdc, ectetc) and write results to them.

...

Timestamp Barrier will be transmitted in data stream between subtasks after all the records belong to it, and each record which is processed by operator has a timestamp field of equals to the Timestamp Barrier .  Besides the source, there are three types of operators as followed.

...

Root Table is sources of ETL Topology and the Intermediate Table is streaming edge and sink. Each vertex in it is an independent Flink job, in which JobManager schedules and reads snapshots from each table.

Each job JobManager interacts with MetaService, creates and sends global timestamp barriers to its sources. The sources collect and broadcast the timestamp barriers. ETL job generates snapshots in sink tables with timestamp barrier information, then the downstream ETL job can read the timestamp barrier information directly, which ensures the timestamp barrier can be transferred transmitted among jobs.

The overall process of global timestamp barrier is as follow

draw.io Diagram
bordertrue
diagramNamefigure5
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth781
revision24

There are two layers in Global CheckpointTimestamp Barrier: MetaService and JobManager. MetaService regards each ETL job as a single node, manages the global timestamp barrier in the ETL Topology; JobManager interacts with MetaService and manages the global timestamp barrier in each ETL job.
There are two parts in the global timestamp barrier processing: interaction between MetaService and JobManager, and interaction between JobManager and Source Node.

...

  1. JobManager of each ETL job requests a start timestamp barrier from MetaService for its sources when it is started.

  2. When a ETL job finishes completes a timestamp barrier and commit the data to Table Store , it reports the timestamp barrier to MetaService .
  • Interaction between JobManager and Source Node

  1. JobManager reads and manages snapshot and timestamp barrier from Table Store , when it collect all the timestamp barrier of table, it sends the barrier to source subtasks.

  2. Source Node processes splits of snapshots. When it receives timestamp barrier from JobManager, it broadcasts timestamp barrier after finishing specified splits.

...

MetaService manages start time, finish time, total cost of checkpoints for timestamp barriers for each job, it helps users to analyze the E2E delay and optimize the ETL jobs.

...

  • The determination of writing data

Flink jobs write snapshots commit data with timestamp information to Table Store according to their timestamp barrier. Each job commits data only when the specified timestamp are completed, which means the job writes the determined data in Table Store before and after failover.

...

Because of determination and orderliness, the failover of a single ETL job will not cause the failover of the entire ETL Topology. The JobManager of each ETL job only needs to process the failover within the job. To do that, we need to support failover of Timestamp Barrier , which meansincludes:

  1. Recover timestamp barriers from Checkpoint . This means that the The boundaries of checkpoint and timestamp barrier are aligned, and the job can recover the same timestamp barrier for failed checkpoint. For example, there are timestamp barrier 1, 2, 3 in checkpoint 1, and the ETL job is processing data for checkpoint 2 with timestamp 3, 4. When the job failed, it will recover from checkpoint 1 and assign the same timestamp 3 and 4 for checkpoint 2.
  2. Replay data for the same timestamp barriers. For the above example, when job recover from checkpoint 1 and replay data for timestamp 3 and 4, it must produce the same data as before failover.

To achieve that, Flink should store (Timestamp <Timestamp Barrier, Offset) Offset> and (Checkpoint<Checkpoint, Timestamp Barrier) Barrier> information when a timestamp barrier is generated.

After implementing this function, in addition to the above failover processing, we can do something easily when some table data needs to be revised due to the certainty of the snapshot, timestamp and checkpoint of checkpoint of Table Store. For example, when we need to revise data in Table3, we can roll back to a specified checkpoint in all downstream cascaded ETL jobs and tables.

...

The main work in Timestamp Barrier  and the differences between Timestamp Barrier and existing Watermark in Flink are in the following table.


Timestamp Barrier

Watermark

Generation

JobManager must coordinate all source subtasks and generate a unified timestamp barrier from System Time or Event Time for them

Each source subtask generate timestamp barrier(watermark event) from System Time or Event Time

Checkpoint

Store (checkpoint<checkpoint, timestamp barrier) barrier> when the timestamp barrier is generated, so that the job can recover the same timestamp barrier for the uncompleted checkpoint.

None

Replay data

Store (timestamp <timestamp barrier, offset) offset> for source when it broadcast timestamp barrier, so that the source can replay the same data according to the same timestamp barrier.

None

Align data

Align data for stateful operator(aggregation, join and ectetc.) and temporal operator(window)

Align data for temporal operator(window)

Computation

Operator compute for a specific timestamp barrier based on the results of a previous timestamp barrier.

Window operator only computes results in the window range.

Output

Operator output or commit results when it collect all the timestamp barrier, including operators with data buffer or async operations.

Window operator support "emit" output

...

This is an overall FLIP for data consistency in streaming and batch ETL. Next, we would like to create FLIP for each functional module with detailed design. For example, Design of :

  1. Timestamp Barrier

...

  1. Coordination and Generation
  2. Timestamp Barrier Checkpoint and Recovery
  3. Timestamp Barrier Replay Data Implementation
  4. Timestamp Barrier Alignment and Computation

...

  1. In Operator
  2. Introduce MetaService in Table Store and

...

  1. etc

Rejected Alternatives

Data consistency management


What we need in Flink is a Timestamp Barrier Mechanism  to align data in stateful and temporal operator. As shown above, the existing Watermark cannot align data. At present, Aligned Checkpoint is the only one which can align data in stateful operator such as aggregation and join operators in Flink. But there are also some problems of Checkpoint  for data consistency

...