Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the whole process of streaming data processing, there are mainly the following 4 How 5 Hows problems(general streaming and batch ETL):

...

Multiple ETL jobs and tables in Table Store form a topology. For example, there are dependencies between Table1, Table2, and Table3. Currently, users can't get relationship information between these tables, and when data on a table is late, it is difficult to trace the root cause based on job dependencies.

  • HOW to

...

  • share state between multiple jobs to save resources?

Multiple ETL jobs may have the same state data in different computation. For example, there're two JOIN ETL between three tables as follows

draw.io Diagram
600
bordertrue
diagramNamefigure2join
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth681521
revision1
As shown above, Flink ETL jobs will generate V11, V21, V31 in Table1,2,3 respectively for V1 in CDC. Suppose the following case: there's a base table in database, a user creates cascaded views or materialized views Table1, Table2 and Table3 based on it. Then the user executes complex queries on Table1, Table2, and Table3 and the query results are consistent with the base table. When the user creates the three tables in Table Store and incrementally updates data of them by Flink ETL jobs in real-time, these tables can be regarded as materialized views that are incrementally updated in real time. In the process of streaming data processing, how to define data correctness in query when users perform join query on these three tables? The case in point is how to ensure V11, V21 and V31 are read or not read in one query?

  • HOW to define E2E data processing delay of ETL jobs and tables in topology above?

Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?

  • HOW to revise the data in tables updated by streaming job?

When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?

In order to answer the above questions, we introduce Timestamp Barrier in Flink to align data, introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.

Proposed Design

Architecture

...

The data in State1 and Table1 are same, State2 , State3 and Table2  are same, State3 and Table3 are same.The more times the same table is joined, the more serious the resource waste is. We should reuse the data in the state and save resources.

  • HOW to define the data correctness in query?

draw.io Diagram
bordertrue
diagramNamefigure2
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth681
revision1

As shown above, Flink ETL jobs will generate V11, V21, V31 in Table1,2,3 respectively for V1 in CDC. Suppose the following case: there's a base table in database, a user creates cascaded views or materialized views Table1, Table2 and Table3 based on it. Then the user executes complex queries on Table1, Table2, and Table3 and the query results are consistent with the base table. When the user creates the three tables in Table Store and incrementally updates data of them by Flink ETL jobs in real-time, these tables can be regarded as materialized views that are incrementally updated in real time. In the process of streaming data processing, how to define data correctness in query when users perform join query on these three tables? The case in point is how to ensure V11, V21 and V31 are read or not read in one query?

  • HOW to define E2E data processing delay of ETL jobs and tables in topology above?

Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?

  • HOW to revise the data in tables updated by streaming job?

When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?

In order to answer the above questions, we introduce Timestamp Barrier in Flink to align data, introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.

Proposed Design

Architecture


We can regard each Flink ETL job as a single node with complex computation, and the table in Table Store as a data stream. Flink ETL and tables form a huge streaming job, which we call ETL Topology. We setup a MetaService node to manage the ETL Topology. The main architecture is:

draw.io Diagram
bordertrue
diagramNamefigure3
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth1081
revision3

There are two core points in the architecture: Timestamp Barrier Mechanism and MetaService 

  • Timestamp Barrier Mechanism

We need a barrier mechanism in Flink to guarantee the data consistency.

  1. Each ETL source needs to be assigned a unified Timestamp Barrier
  2. Stateful and temporal operators in Flink ETL job align and compute data according to the barrier.
  3. Sink operator in ETL job confirms data with barrier to sink tables in Table Store. 
  • MetaService component

MetaService  is the coordinator in ETL Topology , its capabilities are as followed:

1> Coordinate the Global Timestamp Barrier in ETL Topology

  1. As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global Timestamp Barrier .
  2. Timestamp Barrier  is transmitted between ETL job nodes by tables, then these job nodes can create  globally consistency snapshots in Table Store according to the barrier.

2> Manage dependencies between ETL jobs and tables

  1. MetaService manages the relationship between ETL jobs and tables in ETL Topology. Users can query these dependencies from MetaServices.
  2. MetaService manages Timestamp Barrier  in each ETL job, including barrier progress, completed barriers, etc.
  3. MetaService manages the Timestamp Barrier and snapshot of tables, including the latest completed snapshots, the relationship between barriers and snapshots.

3> Manage Data Consistency in Query On Table Store

  1. MetaService supports data consistency in query based on the management of dependencies between tables.
  2. MetaService determines the compaction and expiration of snapshots for each table according to snapshots being used by the OLAP and ETL jobs.

User Interfaces

User Interaction

Setup MetaService

In the first phase, we'd

...

There are two core points in the architecture: Timestamp Barrier Mechanism and MetaService 

  • Timestamp Barrier Mechanism

We need a barrier mechanism in Flink to guarantee the data consistency.

  1. Each ETL source needs to be assigned a unified Timestamp Barrier
  2. Stateful and temporal operators in Flink ETL job align and compute data according to the barrier.
  3. Sink operator in ETL job confirms data with barrier to sink tables in Table Store. 
  • MetaService component

MetaService  is the coordinator in ETL Topology , its capabilities are as followed:

1> Coordinate the Global Timestamp Barrier in ETL Topology

  1. As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global Timestamp Barrier .
  2. Timestamp Barrier  is transmitted between ETL job nodes by tables, then these job nodes can create  globally consistency snapshots in Table Store according to the barrier.

2> Manage dependencies between ETL jobs and tables

  1. MetaService manages the relationship between ETL jobs and tables in ETL Topology. Users can query these dependencies from MetaServices.
  2. MetaService manages Timestamp Barrier  in each ETL job, including barrier progress, completed barriers, etc.
  3. MetaService manages the Timestamp Barrier and snapshot of tables, including the latest completed snapshots, the relationship between barriers and snapshots.

3> Manage Data Consistency in Query On Table Store

  1. MetaService supports data consistency in query based on the management of dependencies between tables.
  2. MetaService determines the compaction and expiration of snapshots for each table according to snapshots being used by the OLAP and ETL jobs.

User Interfaces

User Interaction

Setup MetaService

In the first phase, we'd like to start a standalone MetaService with storage path and REST port in configuration.

...

  1. Transaction T has been committed, but the related snapshots in tables are not created, the query result is (user1, item1, 1000, 100, 10)
  2. When the related snapshots in tables have been created, the query result is (user1, item1, 2500, 300, 8.33333)
  3. Snapshots in a persistent storage won't be rolled back even when jobs fail, and the query result will always be (user1, item1, 2500, 300, 8.33333), it's Repeatable Read 

Reuse Data In State

After align data with Timestamp Barrier , join operators in jobs can keep Delta State in memory and join data in shared state as follows

draw.io Diagram
bordertrue
diagramNameJOIN_DELTA
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth818
revision1

Each join operator only need to store delta state with timestamp in memory, and when it needs to join data, it can get the total data from shared state and delta state.

Design of Data Consistency

...