Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Motivation


Table Store supports streaming and batch data processing, Flink ETL jobs can read data from and write data to Table Store in streaming and batch. Following is the architecture

...

Root Table is sources of ETL Topology and the Intermediate Table is streaming edge and sink. Each vertex in it is an independent Flink job, in which SplitEnumerator schedules JobManager schedules and reads snapshots from each table.

Each job SplitEnumerator JobManager interacts with MetaService, creates and sends global timestamp barriers to its sources. The sources collect and broadcast the timestamp barriers. ETL job generates snapshots in sink tables with timestamp barrier information, then the downstream ETL job can read the timestamp barrier information directly, which ensures the timestamp barrier can be transferred among jobs.

The overall process of global timestamp barrier is as follow

draw.io Diagram
bordertrue
diagramNamefigure5
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth1021781
revision12

There are two layers in Global Checkpoint: MetaService and JobManager. MetaService regards each ETL job as a single node, manages the global checkpoint timestamp barrier in the ETL Topology; JobManager interacts with MetaService and manages the global checkpoint timestamp barrier in each ETL job.
There are two parts in the global checkpoint timestamp barrier processing: interaction between MetaService and JobManager, and interaction between JobManager and Source Node.

  • Interaction between MetaService and JobManager

  1. ETL job only triggers checkpoint by SplitEnumerator manuallyJobManager of each ETL job requests a start timestamp barrier from MetaService for its sources when it is started.

  2. When each SplitEnumerator in a ETL job finishes reading a snapshot in RootTable, it requests checkpoint from MetaService and broadcasts barrier downstream.
  3. When each SplitEnumerator reads checkpoint from snapshot in Intermediate Table, it reports checkpoint to MetaService and broadcasts barrier downstream.

  • Interaction between JobManager and Source Node

  1. a timestamp barrier and commit the data to Table Store , it reports the timestamp barrier to MetaService .
  • Interaction between JobManager and Source Node

  1. JobManager manages snapshot and timestamp barrier from Table Store , when it collect all the timestamp barrier of table, it sends the barrier to source subtasksSplitEnumerator sends checkpoint to CheckpointCoordinator and Source Node, then the CheckpointCoordinator will send checkpoint to Source Node after it receives checkpoints from all SplitEnumerators.

  2. Source Node processes splits of snapshots. When it receives checkpoint from SplitEnumerator and CheckpointCoordinatortimestamp barrier from JobManager, it broadcasts checkpoint timestamp barrier after finishing specified snapshotsplits.

The interactions among SplitEnumerator, CheckpointCoordinator and JobManager and Source Node are as followed.

draw.io Diagram
bordertrue
diagramNamefigure6
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth721741
revision12

Data Consistency Management

...

MetaService manages dependencies between tables and ETL jobs. Based on the relationship information, it supports consistent reading and computing in OLAP, calculates the delay for E2E and each ETL job, helps users to find the bottleneck jobs. When revising data on tables, users can rollback snapshots on tables and state in ETL jobs based on the dependencies.

  • Relationship between checkpoints timestamps and snapshots of each table

MetaService ensures data consistency among ETL/OLAP jobs and tables by managing the relationship between checkpoint timestamp and snapshot.

Firstly, it's used to ensure the consistency of checkpoint timestamp and snapshot among ETL jobs that consume the same table. For example, a Root Table is consumed by an ETL job and MetaService creates checkpoints timestamps on snapshots for it. When a new ETL job consumed this table is started, MetaService will create the same checkpoint timestamp on snapshots for it according to the previous job.

draw.io Diagram
bordertrue
diagramNamefigure7
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth841
revision12

Secondly, it helps to ensure that the checkpoint timestamp barrier consistency between tables when an ETL job consumes them. For example, an ETL job consumes Table1 and Table2. When the job is started, it will get snapshot ids for Table1 and Table2 with the same checkpoint timestamp from MetaService, even when the progresses of Table1 and Table2 are different. This ensures that the checkpoint timestamp of the job can be aligned.

Finally, OLAP/Batch jobs read snapshots from source tables with the same checkpoint too, and this ensures data consistency in job computation.

  • The aborted checkpoints completed timestamp of each table

MetaService supports solving the checkpoint alignment problem of ETL jobs consuming multiple tables by managing aborted checkpoints. When an ETL job consumes multiple tables, checkpoint aborted in one of them will cause the job to fail align at the checkpoint. Each job must get the following answers from MetaService before it triggers checkpoint

  1. The ETL job doesn't generate any checkpoint, it schedules and processes splits in the next snapshot directly.
  2. The ETL job performs checkpoint, then schedules the next snapshot.
  3. The ETL job aborts specified checkpoint, then performs another checkpoint.
  • The completed checkpoints of each table

MetaService manages completed checkpoints of each table and guarantees data consistency in OLAP query. OLAP query should request versions of source tables from MetaService, and MetaService calculates snapshot ids of tables based on the dependencies between tables, completed checkpoints and snapshots in each table and consistency type requirement. OLAP reads data from tables according to the given snapshot ids, which ensure the data consistency for it.

  • Information about tables and snapshots being used by the jobs

MetaService manages information about snapshots being used by ETL jobs or OLAP on tables, then determines which snapshots of tables can be safely deleted, compacted without affecting the jobs who are reading the data, ensures these jobs can read correct data.

  • Checkpoint progress of each ETL job

MetaService manages start time, finish time, total cost of checkpoints for each job, it helps users to analyze the E2E delay and optimize the ETL jobs.

ETL Jobs Failover

manages completed timestamps of each table and guarantees data consistency in OLAP query. OLAP query should request versions of source tables from MetaService, and MetaService calculates snapshot ids of tables based on the dependencies between tables, completed timestamps and snapshots in each table and consistency type requirement. OLAP reads data from tables according to the given snapshot ids, which ensure the data consistency for it.

  • Information about tables and snapshots being used by the jobs

MetaService manages information about snapshots being used by ETL jobs or OLAP on tables, then determines which snapshots of tables can be safely deleted, compacted without affecting the jobs who are reading the data, ensures these jobs can read correct data.

  • Timestamp progress of each ETL job

MetaService manages start time, finish time, total cost of checkpoints for each job, it helps users to analyze the E2E delay and optimize the ETL jobs.

ETL Jobs Failover


Each ETL job may fail in the ETL Topology, but unlike the general Flink Streaming Job, it should not cause the failover of ETL Topology. The ETL job in ETL Topology must meet the following conditionsEach ETL job may fail in the ETL Topology, but unlike the general Flink Streaming Job, it won't cause the failover of ETL Topology. The ETL job only needs to recover from failvoer itself for the following reasons

  • The determination of reading data

Flink jobs read snapshots from Table Store. When a job fails, it will reread must be able to reread snapshots according to the latest previous timestamp from checkpoint. The If the relationship between checkpoint timestamp and snapshot is determined. The , and the timestamp can be recovered from checkpoint, the failed job can read the same data from the same snapshot according to the same checkpointtimestamp, which means the job will read determined data from Table Store before and after failover.

...

Flink jobs write snapshots with checkpoint timestamp information to Table Store according to their checkpointstimestamp barrier. Each job creates snapshots commits data only when the specified checkpoints timestamp are completed, which means the job writes the determined data in Table Store before and after failover.

...

Flink jobs read and write snapshots which are according to their checkpointstimestamps. Checkpoint Timestamp barriers will be aligned in each job and among multiple jobs. This means that although the data in one checkpoint timestamp barrier is out of order, the data and computation between checkpoints timestamp barriers across multiple jobs are in order.

Because of determination and orderliness, the failover of a single ETL job will not cause the failover of the entire ETL Topology. The CheckpointCoordinator JobManager of each ETL job only needs to process the failover within the job.

At the same time, in addition to the above failover processing, we can do something easily when some table data needs to be revised due to the certainty of the snapshot and checkpoint of Table Store. For example, when we need to revise data in Table3, we can roll back to a specified checkpoint in all downstream cascaded ETL jobs and tables.

  • Incremental processing

All table snapshots are aligned according to a unified checkpoint. When a specified table data needs to be revised, we just need to rollback all its downstream tables to a unified snapshot, reset the streaming jobs' state to the specified checkpoint, and then restart the jobs to consume incremental data.

  • Full processing

To do that, we need to support failover of Timestamp Barrier , which means:

  1. Recover timestamp barriers from Checkpoint . This means that the boundaries of checkpoint and timestamp barrier are aligned, and the job can recover the same timestamp barrier for failed checkpoint. For example, there are timestamp barrier 1, 2, 3 in checkpoint 1, and the ETL job is processing data for checkpoint 2 with timestamp 3, 4. When the job failed, it will recover from checkpoint 1 and assign the same timestamp 3 and 4 for checkpoint 2.
  2. Replay data for the same timestamp barriers. For the above example, when job recover from checkpoint 1 and replay data for timestamp 3 and 4, it must produce the same data as before failover.

To achieve that, Flink should store (Timestamp Barrier, Offset) and (Checkpoint, Timestamp Barrier) information when a timestamp barrier is generated.

After implementing this function, in addition to the above failover processing, we can do something easily when some table data needs to be revised due to the certainty of the snapshot and checkpoint of Table Store. For example, when we need to revise data in Table3, we can roll back to a specified checkpoint in all downstream cascaded ETL jobs and tables.

  • Incremental processing

All table snapshots are aligned according to a unified checkpoint. When a specified table data needs to be revised, we just need to rollback all its downstream tables to a unified snapshot, reset the streaming jobs' state to the specified checkpoint, and then restart the jobs to consume incremental data.

  • Full processing

Due to reasons such as the ETL jobs' state TTL, we cannot perform incremental processing. At this time, we can perform full processing, clear the data and ETL state of all downstream tables and jobs, and then restart the jobs to consume the data in full.
Incremental Due to reasons such as the ETL jobs' state TTL, we cannot perform incremental processing. At this time, we can perform full processing, clear the data and ETL state of all downstream tables and jobs, and then restart the jobs to consume the data in full.
Incremental processing is as followed.

...

Flink ETL job needs to register its source and sink tables with MetaService when it is submitted. At present, the client will create the specified TableStoreSource and TableStoreSink from Table Store in the process of generating the Flink execution plan. In this process, we can register the jobid and table information with MetaService. The REST api in MetaService is

...

...

/register/source/:jobID/:catalog/:database/:table
/register/sink/:jobID/:catalog/:database/:table

MetaService creates relationship between the source and sink tables by the jobid. After an ETL job generates the plan, it may not be submitted to the cluster successfully due to some exceptions such as network or resources. The register information of tables can't be accessed and can only be accessed after the job is submitted to cluster and the SplitEnumerator registers itself to MetaService too.

...

ETL and OLAP jobs must get snapshot ids of tables from MetaService when they are submitted to the cluster according to consistency requirement. Flink jobs can get versions of tables when they create them in FlinkCatalog. The main processes are as followed.

draw.io Diagram
bordertrue
diagramNamefigure9
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramNamediagramWidthfigure9495
simpleViewerrevisionfalse1width
linksauto
tbstyletop
lboxtrue
diagramWidth495
revision1

REST apis should be added in MetaService to response versions of tables according to the consistency requirement.

Code Block
languagebash
/version/:consistencyType/:catalog/:database/:table
  • Stop ETL Job

The relationship between source and sink tables of an ETL job should be deleted when the job terminates. We can add a listener JobTerminatedListener in Flink, and notify SplitEnumerator to send delete event to MetaService when job is terminated. The interface and api is as followed

Code Block
languagejava
/** * Listener for job and notify it when job is stopped or canceled. **/ 
public interface JobTerminationListener {
 void notifyJobTerminated(Job jobID);
} 

// Unregister in MetaService REST 
/unregister/:jobID

Rejected Alternatives

Data consistency management

...

  • Stop ETL Job

The relationship between source and sink tables of an ETL job should be deleted when the job terminates. We can add a listener JobTerminatedListener in Flink, and notify JobManager to send delete event to MetaService when job is terminated.

Summary

The main work in Timestamp Barrier  and differences between Timestamp Barrier and existing Watermark in Flink are in the following table.


Timestamp Barrier

Watermark

Generation

JobManager must coordinate all source subtasks and generate a unified timestamp barrier from System Time or Event Time for them

Each source subtask generate timestamp barrier(watermark event) from System Time or Event Time

Checkpoint

Store (checkpoint, timestamp barrier) when the timestamp barrier is generated, so that the job can recover the same timestamp barrier for the uncompleted checkpoint.

None

Replay data

Store (timestamp barrier, offset) for source when it broadcast timestamp barrier, so that the source can replay the same data according to the same timestamp barrier.

None

Align data

Align data for stateful operator(aggregation, join and ect.) and temporal operator(window)

Align data for temporal operator(window)

Computation

Operator compute for a specific timestamp barrier based on the results of a previous timestamp barrier.

Window operator only computes results in the window range.

Output

Operator output or commit results when it collect all the timestamp barrier, including operators with data buffer or async operations.

Window operator support "emit" output


The main work in Flink and Table Store are as followed


ComponentMain Work






Table Store

MetaService
  1. Manage the relationship between ETL job and table in Table Store , including source table, sink table.
  2. Manage the finished timestamp barrier of each table in Table Store
  3. Interaction between Flink and MetaService, such as register ETL job, get consistency version of table and ect.

Catalog

  1. Register source and sink table with ETL job id.
  2. Create table based on a consistency version from MetaService

Source and SplitEnumerator
  1. SplitEnumerator managers the snapshot, split and timestamp barrier for specific table.
  2. Source read data and timestamp barrier from split, broadcast timestamp barrier
  3. Notify MetaService to update the completed timestamp barrier for tables.
  4. Notify MetaService to cleanup the information of the terminated ETL job.
Sink
  1. Write data to table store and commit data with timestamp barrier



Flink

Timestamp Barrier MechanismThe detailed and main work is in the above table
Planner
  1. Register job to MetaService to create relationship between source and sink tables.
  2. Create table based on a consistency version from MetaService 
JobManager
  1. Add a listener and call back it when the job ends

The Next Step

This is an overall FLIP for data consistency in streaming and batch ETL. Next, we would like to create FLIP for each functional module with detailed design. For example, Design of Timestamp Barrier Generation, Design of Timestamp Barrier Failover, Design of Timestamp Barrier Alignment and Computation, Design of MetaService in Table Store and ect.

Rejected Alternatives

Data consistency management


What we need in Flink is a Timestamp Barrier Mechanism  to align data in stateful and temporal operator. As shown above, the existing Watermark cannot align data. At present, Aligned Checkpoint is the only one which can align data in stateful operator such as aggregation and join operators in Flink. But there are also some problems of Checkpoint  for data consistency

  • Flink uses Checkpoint as a fault-tolerant mechanism, it supports aligned checkpoint, non-aligned checkpoint, and may even task local checkpoint in the future.
  • Even for Aligned Checkpoint, data consistency cannot be guaranteed for some operators, such as Temporal operators with timestamp or data buffer.

Data consistency coordinator


By coordinating timestamp barriers between

  1. Parse timestamps from data as watermarks.

  2. ETL generates versions based on watermarks, writes them to Table Store, and flows among multiple ETL jobs.

  3. Flink OLAP queries the table from the snapshot of Table Store according to the versions.

Main Problems
Currently watermark in Flink cannot align data.

...

As shown above, there are 2 watermarks T1 and T2, T1 < T2. The StreamTask reads data in order: V11,V12,V21,T1(channel1),V13,T1(channel2). At this time, StreamTask will confirm that watermark T1 is completed, but the data beyond T1 has been processed(V13) and the results are written to sink table.

Watermark mechanism cannot guarantee the consistency of data, so we choose the Checkpoint.

But there are some problems of Checkpoint for data consistency

  • Flink uses Checkpoint as a fault-tolerant mechanism, it supports aligned checkpoint, non-aligned checkpoint, and may even task local checkpoint in the future.
  • Even for Aligned Checkpoint, data consistency cannot be guaranteed for some operators, such as Temporal operators with timestamp or data buffer.

So after the implementation in the first stage, we need to upgrade the Watermark or implement a new Timestamp Barrier mechanism in Flink in future to support full semantics of Data Consistency. In a single ETL job

  1. Source generates Timestamp Barrier based on System Time or Event Time
  2. Aggregation and Temporal operators align data by Timestamp Barrier, perform computation and output results for each Timestamp Barrier
  3. Timestamp Barriers have order relations, Aggregation and Temporal operators should perform computation between Timestamp Barrier after do it for each Timestamp Barrier

ETL jobs write each Timestamp Barrier to snapshot in Table Store, and the downstream ETL can read the Timestamp Barrier, which makes the Timestamp Barrier be transferred between ETL jobs.

Finally, we can guarantee the consistency of data by Timestamp Barrier and exactly-once computation by Checkpoint independently.

Data consistency coordinator

By coordinating checkpoints among multiple jobs, the consistency of data among multiple ETL jobs Sink Tables can be ensured during query. Besides global checkpointtimestamp barrier between jobs, we also consider adaptive checkpoint in each ETLtimestamp barrier.
Each ETL job manages its checkpoint timestamp barrier and MetaServices manages the relationships of checkpoints timestamp barriers between ETL jobs.

draw.io Diagram
bordertrue
diagramNamefigure11
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth601
revision12

As shown above, CP30 Timestamp30 in Table1 and CP10 Timestamp10 in Table2 generates CP3 Timestamp3 in Table3, and so on. When users query on these tables, MetaService calculates the snapshot ids of them according to the checkpoints timestamp barriers relationships in the ETL jobs.
In this way, we can define the data consistency of queries, but it's difficult to define the data processing delay between jobs and tables. For example, it is difficult to define the data delay from Table1, Table2 to Table3. As the number of cascaded layers increases, this definition will become very complex.

...