Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


We can regard each Flink ETL job as a single node with complex computation, and the table in Table Store as a data stream. Flink ETL and tables form a huge streaming job, which we call ETL Topology. We setup a MetaService node to manage the ETL Topology. The main architecture is:

draw.io Diagram
bordertrue
diagramNamefigure3
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth1081
revision2

The capabilities of MetaService are as followed:

  • Coordinate the Global Checkpoint in ETL Topology

3

There are two core points in the architecture: Timestamp Barrier Mechanism and MetaService 

  • Timestamp Barrier Mechanism

We need a barrier mechanism in Flink to guarantee the data consistency.

  1. Each ETL source needs to be assigned a unified Timestamp Barrier
  2. Aggregate and temporal  operators in Flink ETL job align input data according to the barrier.
  3. Sink operator in ETL job confirms data with barrier to
  4. As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global checkpoint barrier.

  5. Each ETL job creates snapshots with checkpoint info on
  6. sink tables in Table Store.
  7.  
  • MetaService component

MetaService  is the coordinator in ETL Topology , its capabilities are as followed:

1> Coordinate the Global Timestamp Barrier in ETL Topology

  1. As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global Timestamp Barrier .
  2. Timestamp Barrier 
  3. Checkpoint barrier
  4. is transmitted between ETL job nodes by tables, then these job nodes can create  globally consistency snapshots in Table Store according to the
  5. checkpoint
  6. barrier.

2> Manage dependencies between ETL jobs and tables

  1. MetaService manages the relationship between ETL jobs and tables in ETL Topology. Users can query these dependencies from MetaServices.
  2. MetaService manages
  3. checkpoints
  4. Timestamp Barrier  in each ETL job, including
  5. checkpoint
  6. barrier progress,
  7. aborted checkpoint
  8. finished barriers, etc.
  9. MetaService manages the
  10. checkpoints
  11. Timestamp Barrier  and
  12. snapshots
  13. snapshot of tables, including the latest completed snapshots, the relationship between
  14. checkpoints
  15. barriers and snapshots.

3> Manage Data Consistency in Query On Table Store

  1. MetaService supports data consistency in query based on the management of dependencies between tables.
  2. MetaService determines the compaction and expiration of snapshots for each table according to snapshots being used by the OLAP and ETL jobs.

User Interfaces

User Interaction

...

Design of Data Consistency

Global

...

Timestamp Barrier Mechanism

The coordination of Timestamp Barrier is divided into two parts: the barrier within each ETL job and across ETL jobs.

1. Timestamp Barrier within ETL job

There are two tables in the ETL Topology

...

ETL jobs which consume the same external source can't be managed by a global checkpointtimestamp barrier. For example, two ETL jobs consume a Kafka Topic with system timestamp barrier, and write results to Table1 and Table2 in Table Store. So we only guarantee the consistency of ETL jobs and tables in Table Store: that means users must load external data into Table Store by Flink ETL job which generates timestamp barrier for it, then we guarantee data consistency based on ETL Topology.

Correspondingly, there're two ETL types: Root ETL  reads data from external sources and write data to Root Table ,  where Intermediate ETL reads data from Root Table and Intermediate Table . The main difference between them is the way to generate Timestamp Barrier .

JobManager in Root ETL will generate a new unified Timestamp Barrier  itself according to the sources with different strategy, such as system timestamp or event timestamp, and write it into the table in Table Store . The overall process is as followed.

draw.io Diagram
bordertrue
diagramNameroot etl align barrier
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth481
revision1

Intermediate ETL  cannot generate a new Timestamp Barrier  itself. It must read Timestamp Barrier  from data in Table Store , report it to JobManager and then broadcast it to the downstream tasks.

draw.io Diagram
bordertrue
diagramNameintermediate etl
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth571
revision1

Timestamp Barrier will be transmitted in data stream between subtasks after all the records belong to it, and each record which is processed by operator has a timestamp field of the Timestamp Barrier .  Besides the source, there are three types of operators as followed.

  • Stateless operator. The operator processes every input record and output the result which it just does before. It does not need to align data with Timestamp Barrier , and when it receives Timestamp Barrier , it should broadcast the barrier to downstream tasks. 
  • Stateful operator and temporal operator. Records in a same Timestamp Barrier  are out of order, stateful and temporal operators should align them according to their timestamp field. The operators will execute computation when they collect all the timestamp barrier, and broadcast it downstream tasks. There's a sequence relationship between timestamp barriers, and records between timestamp barriers are ordered. It means that the operators compute and output results for a timestamp barrier based on the result of a previous timestamp barrier.
  • Sink operator. Sink streaming output results to Table Store , and commit the results when it collect all the timestamp barrier. The source of downstream ETL job can prefetch data from Table Store , but should produce data after the upstream sink committed.


2. Timestamp Barrier across ETL jobs. So we only guarantee the consistency of ETL jobs and tables in Table Store: that means users must load external data into Table Store, then we guarantee data consistency based on ETL Topology.

Root Table is sources of ETL Topology and the Intermediate Table is streaming edge and sink. Each vertex in it is an independent Flink job, in which SplitEnumerator schedules and reads snapshots from each table.

Each job SplitEnumerator interacts with MetaService, creates and sends global checkpoint timestamp barriers to its sources. The sources collect and broadcast the checkpoint timestamp barriers. ETL job generates snapshots in sink tables with checkpoint timestamp barrier information, then the downstream ETL job can read the checkpoint timestamp barrier information directly, which ensures the checkpoint timestamp barrier can be transferred among jobs.

The overall process of global checkpoint timestamp barrier is as follow

draw.io Diagram
bordertrue
diagramNamefigure5
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth1021
revision1

...