...
We can regard each Flink ETL job as a single node with complex computation, and the table in Table Store as a data stream. Flink ETL and tables form a huge streaming job, which we call ETL Topology
. We setup a MetaService node to manage the ETL Topology
. The main architecture is:
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
The capabilities of MetaService are as followed:
- Coordinate the Global Checkpoint in
ETL Topology
|
There are two core points in the architecture: Timestamp Barrier Mechanism
and MetaService
Timestamp Barrier Mechanism
We need a barrier mechanism in Flink to guarantee the data consistency.
- Each ETL source needs to be assigned a unified
Timestamp Barrier
- Aggregate and temporal operators in Flink ETL job align input data according to the barrier.
- Sink operator in ETL job confirms data with barrier to
As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global checkpoint barrier.
Each ETL job creates snapshots with checkpoint info on - sink tables in Table Store.
MetaService
component
MetaService
is the coordinator in ETL Topology
, its capabilities are as followed:
1> Coordinate the Global Timestamp Barrier in ETL Topology
- As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global
Timestamp Barrier
. Timestamp Barrier
Checkpoint barrier - is transmitted between ETL job nodes by tables, then these job nodes can create globally consistency snapshots in Table Store according to the checkpoint
- barrier.
2> Manage dependencies between ETL jobs and tables
- MetaService manages the relationship between ETL jobs and tables in
ETL Topology
. Users can query these dependencies from MetaServices. - MetaService manages checkpoints
-
Timestamp Barrier
in each ETL job, including checkpoint - barrier progress, aborted checkpoint
- finished barriers, etc.
- MetaService manages the checkpoints
-
Timestamp Barrier
and snapshots - snapshot of tables, including the latest completed snapshots, the relationship between checkpoints
- barriers and snapshots.
3> Manage Data Consistency in Query On Table Store
- MetaService supports data consistency in query based on the management of dependencies between tables.
MetaService determines the compaction and expiration of snapshots for each table according to snapshots being used by the OLAP and ETL jobs.
User Interfaces
User Interaction
...
Design of Data Consistency
Global
...
Timestamp Barrier Mechanism
The coordination of Timestamp Barrier
is divided into two parts: the barrier within each ETL job and across ETL jobs.
1. Timestamp Barrier
within ETL job
There are two tables in the ETL Topology
...
ETL jobs which consume the same external source can't be managed by a global checkpointtimestamp barrier. For example, two ETL jobs consume a Kafka Topic with system timestamp barrier, and write results to Table1 and Table2 in Table Store. So we only guarantee the consistency of ETL jobs and tables in Table Store: that means users must load external data into Table Store by Flink ETL job which generates timestamp barrier for it, then we guarantee data consistency based on ETL Topology
.
Correspondingly, there're two ETL types: Root ETL
reads data from external sources and write data to Root Table
, where Intermediate ETL
reads data from Root Table
and Intermediate Table
. The main difference between them is the way to generate Timestamp Barrier
.
JobManager
in Root ETL
will generate a new unified Timestamp Barrier
itself according to the sources with different strategy, such as system timestamp or event timestamp, and write it into the table in Table Store
. The overall process is as followed.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Intermediate ETL
cannot generate a new Timestamp Barrier
itself. It must read Timestamp Barrier
from data in Table Store
, report it to JobManager
and then broadcast it to the downstream tasks.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Timestamp Barrier
will be transmitted in data stream between subtasks after all the records belong to it, and each record which is processed by operator has a timestamp field of the Timestamp Barrier
. Besides the source, there are three types of operators as followed.
- Stateless operator. The operator processes every input record and output the result which it just does before. It does not need to align data with
Timestamp Barrier
, and when it receivesTimestamp Barrier
, it should broadcast the barrier to downstream tasks. - Stateful operator and temporal operator. Records in a same
Timestamp Barrier
are out of order, stateful and temporal operators should align them according to their timestamp field. The operators will execute computation when they collect all the timestamp barrier, and broadcast it downstream tasks. There's a sequence relationship between timestamp barriers, and records between timestamp barriers are ordered. It means that the operators compute and output results for a timestamp barrier based on the result of a previous timestamp barrier. - Sink operator. Sink streaming output results to
Table Store
, and commit the results when it collect all the timestamp barrier. The source of downstream ETL job can prefetch data fromTable Store
, but should produce data after the upstream sink committed.
2. Timestamp Barrier
across ETL jobs. So we only guarantee the consistency of ETL jobs and tables in Table Store: that means users must load external data into Table Store, then we guarantee data consistency based on ETL Topology
.
Root Table
is sources of ETL Topology
and the Intermediate Table
is streaming edge and sink. Each vertex in it is an independent Flink job, in which SplitEnumerator
schedules and reads snapshots from each table.
Each job SplitEnumerator
interacts with MetaService
, creates and sends global checkpoint timestamp barriers to its sources. The sources collect and broadcast the checkpoint timestamp barriers. ETL job generates snapshots in sink tables with checkpoint timestamp barrier information, then the downstream ETL job can read the checkpoint timestamp barrier information directly, which ensures the checkpoint timestamp barrier can be transferred among jobs.
The overall process of global checkpoint timestamp barrier is as follow
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...