You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Discussion thread
Vote thread
JIRA


Release

Motivation

Overall


Table Store supports streaming and batch data processing, Flink ETL jobs can read data from and write data to Table Store in streaming and batch. Following is the architecture

Main processes

  1. Each Flink ETL job is independent, it manages and performs checkpoints in its job manager.
  2. Each ETL job generates snapshot data in Table Store according to its checkpoint independently.
  3. Flink OLAP/Batch jobs read snapshots of tables from Table Store and performs complex computations such as join and agg.

Proposal


Suppose the following case: there's a base table in database, a user creates cascaded views or materialized views Table1, Table2 and Table3 based on it. Then the user executes complex queries on Table1, Table2, and Table3 and the query results are consistent with the base tables. When the user creates the three tables in Table Store and incrementally updates data of them by Flink ETL jobs in real-time, these tables can be regarded as materialized views that are incrementally updated in real time. Unfortunately, Flink ETL jobs mainly have four HOWs in this case(general streaming and batch ETL):

  • HOW to manage the relationship between ETL jobs?

Multiple ETL jobs and tables in Table Store form a topology. For example, there are dependencies between Table1, Table2, and Table3. Currently, users can't get relationship information between these tables, and when data on a table is late, it is difficult to trace the root cause based on job dependencies.

  • HOW to define the data correctness in query?


As shown above, Flink ETL jobs will generate V11, V21, V31 in Table1,2,3 respectively for V1 in CDC. How to define data correctness in query when users perform join query on these three tables? The case in point is how to ensure V11, V21 and V31 are read or not read in one query?

  • HOW to define E2E data processing delay of ETL jobs and tables in topology above?

Flink ETL jobs update tables above in real-time, there are dependencies between them. While the data is flowing, how to define the data delay in these tables? For the above example, how to define the E2E delay of streaming data from CDC to Table2? How much does the delay of each ETL job affect the E2E delay, and which ETL job needs to be optimized?

  • HOW to revise the data in tables updated by streaming job?

When one of the tables needs to be revised, how to revise it in the streaming process on the premise of ensuring the correctness of the data? For instance, the data in Table1 needs to be revised, what should the users do in the topology to ensure that the data is not lost or repeated?

In order to answer the above questions, we introduce MetaService in Table Store to coordinate Flink ETL jobs, manage the relationships and dependencies between ETL jobs and tables, and support data consistency in Table Store.

Proposed Design

Architecture


We can regard each Flink ETL job as a single node with complex computation, and the table in Table Store as a data stream. Flink ETL and tables form a huge streaming job, which we call ETL Topology. We setup a MetaService node to manage the ETL Topology. The main architecture is:

The capabilities of MetaService are as followed:

  • Coordinate the Global Checkpoint in ETL Topology
    1. As the coordinator of ETL Topology, MetaService interacts with source ETL job and generates a global checkpoint barrier.

    2. Each ETL job creates snapshots with checkpoint info on sink tables in Table Store.

    3. Checkpoint barrier is transmitted between ETL job nodes by tables, then these job nodes can create  globally consistency snapshots in Table Store according to the checkpoint.

  • Manage dependencies between ETL jobs and tables
    1. MetaService manages the relationship between ETL jobs and tables in ETL Topology. Users can query these dependencies from MetaServices.

    2. MetaService manages checkpoints in each ETL job, including checkpoint progress, aborted checkpoint, etc.

    3. MetaService manages the checkpoints and snapshots of tables, including the latest completed snapshots, the relationship between checkpoints and snapshots.

  • Manage Data Consistency in Query On Table Store
    1. MetaService supports data consistency in query based on the management of dependencies between tables.

    2. MetaService determines the compaction and expiration of snapshots for each table according to snapshots being used by the OLAP and ETL jobs.

User Interfaces

User Interaction

Setup MetaService

In the first phase, we'd like to start a standalone MetaService with storage path and REST port in configuration.

User Cases

We add a new metastore type: table-store , which manages the Catalog and data consistency in Table Store. Users can create a Catalog with metastore table-store  in Sql-Client, and specify the address and consistency type by uri and consistency-type. Flink ETL job, which reads from and writes to Table Store will be managed by MetaService to ensure data consistency. In the first phase, table-store  metastore only supports FileSystemCatalog and will support HiveCatalog later. The user cases are shown as followed.

-- create a catalog with MetaService
CREATE CATALOG my_catalog WITH (
 'type'='table-store',
 'warehouse'='file:/tmp/table_store',
 'metastore' = 'table-store',
 'uri'='http://<meta-service-host-name>:<port>',
 'consistency'='strong' );

USE CATALOG my_catalog;

-- create three tables in my_catalog which will be managed by MetaService
CREATE TABLE word_value (
 word STRING PRIMARY KEY NOT ENFORCED,
 val BIGINT );

CREATE TABLE word_count (
 word STRING PRIMARY KEY NOT ENFORCED,
 cnt BIGINT );

CREATE TABLE word_sum (
 word STRING PRIMARY KEY NOT ENFORCED,
 val_sum BIGINT );


Users can create a source table and three streaming jobs. The jobs write data to the three tables.

-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
 word STRING,
 val BIGINT ) WITH (
 'connector' = 'datagen',
 'fields.word.length' = '1');

-- table store requires checkpoint interval in streaming mode 
SET 'execution.checkpointing.interval' = '10 s'; 

-- write streaming data to word_value, word_count and word_sum tables 
INSERT INTO word_value SELECT word, val FROM word_table;
INSERT INTO word_count SELECT word, count(*) FROM word_value GROUP BY word;
INSERT INTO word_sum SELECT word, sum(val) FROM word_value GROUP BY word;


Users can query data from the three tables.

-- use tableau result mode 
SET 'sql-client.execution.result-mode' = 'tableau'; 

-- switch to batch mode 
RESET 'execution.checkpointing.interval'; 
SET 'execution.runtime-mode' = 'batch'; 

-- olap query the table 
SELECT
   T1.word,
   T1.cnt as t1cnt,
   T1.sum_val as t1sum_val,
   T2.cnt as t2cnt,
   T3.sum_val as t3sum_val
 FROM
 (SELECT word, count(*) as cnt, sum(val) as sum_val
   FROM word_value GROUP BY word) T1
 JOIN word_count T2
 JOIN word_sum T3
 ON T1.word=T2.word and T2.word=T3.word;

Since the data between jobs and tables is streaming, the results t1cnt and t2cnt, t1sum_val and t3sum_val are different without consistency guarantee; while MetaService guarantees data consistency, the results t1cnt and t2cnt, t1sum_val and t3sum_val will be the same.

Query consistency information

MetaService stores consistency information in Table Store as tables

  • ETL job with source tables
CREATE TABLE __META_ETL_SOURCE (
 job_id STRING,     -- The id of streaming etl job
 table_name STRING, -- The source table of the etl job
 PRIMARY KEY(job_id, table_name));
  • ETL job with sink table
CREATE TABLE __META_ETL_SINK (
 job_id STRING,     -- The id of streaming etl job
 table_name STRING, -- The sink table of the etl job
 PRIMARY KEY(table_name));
  • Table name with version
CREATE TABLE __META_TABLE_VERSION (
 table_name STRING, -- The table name
 version INT,       -- The version of the table
 table_type STRING, -- Root or Intermediate
 PRIMARY KEY(table_name));


User can query dependencies and versions from these tables. For example, query the ETL jobs' sink tables for Table1

SELECT T.table_name
 FROM __META_JOB_SOURCE S
 JOIN __META_JOB_Sink T ON S.job_id=T.job_id
 WHERE S.table_name='Table1'

Data Consistency Type


When a query is submitted, it gets different versions of tables from MetaService according to different delay requirements.


As tables shown above, suppose there are three queries

  1. Query1:SELECT * FROM table1

  2. Query2:SELECT * FROM table1 JOIN table2

  3. Query3:SELECT * FROM table1 JOIN table2 JOIN table3

  • Strong Consistency

It will grantee strong data consistency among queries above. Query gets the minimum version of all the related tables according to the source tables and the dependencies between them, which ensure data consistency between related tables. For the examples above, Query1, Query2 and Query3 will get Min(table1 version, table2 version) for table1 and table2, Min(table3 version) for table3.

  • Weak Consistency

It doesn't grantee the data consistency among queries above, but only the data consistency of a single query. At this time, each query can get its latest version of tables.

Design of Data Consistency

Global Checkpoint


There are two tables in the ETL Topology

  • Root Table: The sink tables in Table Store that ETL jobs consume external source(Kafka/cdc, ect) and write results to them.

  • Intermediate Table: The sink tables in Table Store that ETL jobs consume Root Tables and Intermediate Tables, then write results to them.


ETL jobs which consume the same external source can't be managed by a global checkpoint. For example, two ETL jobs consume a Kafka Topic, and write results to Table1 and Table2 in Table Store. So we only guarantee the consistency of ETL jobs and tables in Table Store: that means users must load external data into Table Store, then we guarantee data consistency based on ETL Topology.

Root Table is sources of ETL Topology and the Intermediate Table is streaming edge and sink. Each vertex in it is an independent Flink job, in which SplitEnumerator schedules and reads snapshots from each table.

Each job SplitEnumerator interacts with MetaService, creates and sends global checkpoint barriers to its sources. The sources collect and broadcast the checkpoint barriers. ETL job generates snapshots in sink tables with checkpoint information, then the downstream ETL job can read the checkpoint information directly, which ensures the checkpoint barrier can be transferred among jobs.

The overall process of global checkpoint is as follow

There are two layers in Global Checkpoint: MetaService and JobManager. MetaService regards each ETL job as a single node, manages the global checkpoin in the ETL Topology; JobManager interacts with MetaService and manages the global checkpoint in each ETL job.
There are two parts in the global checkpoint processing: interaction between MetaService and JobManager, and interaction between JobManager and Source Node.

  • Interaction between MetaService and JobManager

  1. ETL job only triggers checkpoint by SplitEnumerator manually.

  2. When each SplitEnumerator in ETL job finishes reading a snapshot in RootTable, it requests checkpoint from MetaService and broadcasts barrier downstream.

  3. When each SplitEnumerator reads checkpoint from snapshot in Intermediate Table, it reports checkpoint to MetaService and broadcasts barrier downstream.

  • Interaction between JobManager and Source Node

  1. SplitEnumerator sends checkpoint to CheckpointCoordinator and Source Node, then the CheckpointCoordinator will send checkpoint to Source Node after it receives checkpoints from all SplitEnumerators.

  2. Source Node processes splits of snapshots. When it receives checkpoint from SplitEnumerator and CheckpointCoordinator, it broadcasts checkpoint barrier after finishing specified snapshot.

The interactions among SplitEnumerator, CheckpointCoordinator and Source Node are as followed.

Data Consistency Management


MetaService manages checkpoints between jobs and versions of each table, the main information includes

  • The topology of ETL jobs and tables

MetaService manages dependencies between tables and ETL jobs. Based on the relationship information, it supports consistent reading and computing in OLAP, calculates the delay for E2E and each ETL job, helps users to find the bottleneck jobs. When revising data on tables, users can rollback snapshots on tables and state in ETL jobs based on the dependencies.

  • Relationship between checkpoints and snapshots of each table

MetaService ensures data consistency among ETL/OLAP jobs and tables by managing the relationship between checkpoint and snapshot.

Firstly, it's used to ensure the consistency of checkpoint and snapshot among ETL jobs that consume the same table. For example, a Root Table is consumed by an ETL job and MetaService creates checkpoints on snapshots for it. When a new ETL job consumed this table is started, MetaService will create the same checkpoint on snapshots for it according to the previous job.

Secondly, it helps to ensure that the checkpoint consistency between tables when an ETL job consumes them. For example, an ETL job consumes Table1 and Table2. When the job is started, it will get snapshot ids for Table1 and Table2 with the same checkpoint from MetaService, even when the progresses of Table1 and Table2 are different. This ensures the checkpoint of the job can be aligned.

Finally, OLAP/Batch jobs read snapshots from source tables with the same checkpoint too, and this ensures data consistency in job computation.

  • The aborted checkpoints of each table

MetaService supports solving the checkpoint alignment problem of ETL jobs consuming multiple tables by managing aborted checkpoints. When an ETL job consumes multiple tables, checkpoint aborted in one of them will cause the job to fail align at the checkpoint. Each job must get the following answers from MetaService before it triggers checkpoint

  1. The ETL job doesn't generate any checkpoint, it schedules and processes splits in the next snapshot directly.
  2. The ETL job performs checkpoint, then schedules the next snapshot.
  3. The ETL job aborts specified checkpoint, then performs another checkpoint.
  • The completed checkpoints of each table

MetaService manages completed checkpoints of each table and grantees data consistency in OLAP query. OLAP query should request versions of source tables from MetaService, and MetaService calculates snapshot ids of tables based on the dependencies between tables, completed checkpoints and snapshots in each table and consistency type requirement. OLAP reads data from tables according to the given snapshot ids, which ensure the data consistency for it.

  • Information about tables and snapshots being used by the jobs

MetaService manages information about snapshots being used by ETL jobs or OLAP on tables, then determines which snapshots of tables can be safely deleted, compacted without affecting the jobs who are reading the data, ensures these jobs can read correct data.

  • Checkpoint progress of each ETL job

MetaService manages start time, finish time, total cost of checkpoints for each job, it helps users to analyze the E2E delay and optimize the ETL jobs.

ETL Jobs Failover


Each ETL job may fail in the ETL Topology, but unlike the general Flink Streaming Job, it won't cause the failover of ETL Topology. The ETL job only needs to recover from failvoer itself for the following reasons

  • The determination of reading data

Flink jobs read snapshots from Table Store. When a job fails, it will read snapshots according to the latest checkpoint. The relationship between checkpoint and snapshot is determined. The failed job can read the same data from the same snapshot according to the same checkpoint, which means the job will read determined data from Table Store before and after failover.

  • The determination of writing data

Flink jobs write snapshots with checkpoint information to Table Store according to their checkpoints. Each job creates snapshots only when the specified checkpoints are completed, which means the job writes the determined data in Table Store before and after failover.

  • Orderliness of data and computation

Flink jobs read and write snapshots which are according to their checkpoints. Checkpoint barriers will be aligned in each job and among multiple jobs. This means that although the data in one checkpoint is out of order, the data and computation between checkpoints across multiple jobs are in order.

Because of determination and orderliness, the failover of a single ETL job will not cause the failover of the entire ETL Topology. The CheckpointCoordinator of each ETL job only needs to process the failover within the job.

At the same time, in addition to the above failover processing, we can do something easily when some table data needs to be revised due to the certainty of the snapshot and checkpoint of Table Store. For example, when we need to revise data in Table3, we can roll back to a specified checkpoint in all downstream cascaded ETL jobs and tables.

  • Incremental processing

All table snapshots are aligned according to a unified checkpoint. When a specified table data needs to be revised, we just need to rollback all its downstream tables to a unified snapshot, reset the streaming jobs' state to the specified checkpoint, restart the jobs to consume incremental data.

  • Full processing

Due to reasons such as the ETL jobs' state TTL, we cannot perform incremental processing. At this time, we can perform full processing, clear the data and ETL state of all downstream tables and jobs, and then restart the jobs to consume the data in full.
Incremental processing is as followed.

Start And Stop ETL Jobs

  • Register Tables

Flink ETL job needs to register its source and sink tables with MetaService when it is submitted. At present, the client will create the specified TableStoreSource and TableStoreSink from Table Store in the process of generating the Flink execution plan. In this process, we can register the jobid and table information with MetaService. The REST api in MetaService is

/register/source/:jobID/:catalog/:database/:table /register/sink/:jobID/:catalog/:database/:table

MetaService creates relationship between the source and sink tables by the jobid. After an ETL job generates the plan, it may not be submitted to the cluster successfully due to some exceptions such as network or resources. The register information of tables can't be accessed and can only be accessed after the job is submitted to cluster and the SplitEnumerator registers itself to MetaService too.

  • Query Data Versions

ETL and OLAP jobs must get snapshot ids of tables from MetaService when they are submitted to the cluster according to consistency requirement. Flink jobs can get versions of tables when they create them in FlinkCatalog. The main processes are as followed.

REST apis should be added in MetaService to response versions of tables according to the consistency requirement.

/version/consistencyType/:catalog/:database/:table
  • Stop ETL Job

The relationship between source and sink tables of an ETL job should be deleted when the job goes termination. We can add a listener JobTerminatedListener in Flink, and notify SplitEnumerator to send delete event to MetaService when job is terminated. The interface and api is as followed

/** * Listener for job and notify it when job is stopped or canceled. **/ 
public interface JobTerminationListener {
 void notifyJobTerminated(Job jobID);
} 

// Unregister in MetaService REST 
/unregister/:jobID

Rejected Alternatives

Data consistency management


Data consistency is essentially the management of multiple versions of data. By unifying the data version, the system can control the visibility of data reading. In addition to checkpoint, we also considered the watermark mechanism.

  1. Parse timestamps from data as watermarks.

  2. ETL generates versions based on watermarks, writes them to Table Store, and flows among multiple ETL jobs.

  3. Flink OLAP queries the table from the snapshot of Table Store according to the versions.

Main Problems
Currently watermark in Flink cannot align data.

As shown above, there are 2 watermarks T1 and T2, T1 < T2. The StreamTask reads data in order: V11,V12,V21,T1(channel1),V13,T1(channel2). At this time, StreamTask will confirm that watermark T1 is completed, but the data beyond T1 has been processed(V13) and the results are written to sink table.

Watermark mechanism cannot guarantee the consistency of data, so we choose the Checkpoint.

Data consistency coordinator


By coordinating checkpoints among multiple jobs, the consistency of data among multiple ETL jobs Sink Tables can be ensured during query. Besides global checkpoint, we also consider adaptive checkpoint in each ETL.
Each ETL job manages its checkpoint and MetaServices manages the relationships of checkpoints between ETL jobs.


As shown above, CP30 in Table1 and CP10 in Table2 generates CP3 in Table3, and so on. When users query on these tables, MetaService calculates the snapshot ids of them according to the checkpoints relationships in the ETL jobs.
In this way, we can define the data consistency of queries, but it's difficult to define the data processing delay between jobs and tables. For example, it is difficult to define the data delay from Table1, Table2 to Table3. As the number of cascaded layers increases, this definition will become very complex.

On the other hand, this proposal increases the cost of data operation and management. When the data of a table needs to be rolled back to the specified snapshot for some reason, each downstream table needs to be reset to a different snapshot. It's terrible.
For the above reasons, we choose the global checkpoint mechanism in the first stage.

RoadMap In Future


Data consistency of ETL Topology is our first phase of work. After completing this part, we plan to promote the capacity building and improvement of Flink + Table Store in future, mainly including the following aspects.

  1. Materialized View in SQL. Next, we hope to introduce materialized view syntax into Flink to improve user interaction experience. Queries can also be optimized based on materialized views to improve performance.

  2. Improve MetaService capabilities. MetaService supports managing Flink ETL jobs and tables in Table Store, accessed by other computing engines such as Spark and being an agent of Hive Metastore later.

  3. Improve OLAP performance. We have created issues in FLINK-25318] Improvement of scheduler and execution for Flink OLAP to manage improvement of OLAP in Flink. At the same time, we hope to continue to enhance the online query capability of Table Store and improve the OLAP performance of Flink + Table Store.

  4. Improvement of data real-time. At present, our consistency design is based on Flink checkpoint mechanism and supports minute level delay. In the future, we will hope to support second level or even millisecond level data real-time on the premise of ensuring data consistency.

By promoting the above optimization and implementation, we hope that Flink + Table Store can support the full StreamingWarehouse capability. Users can create materialized views and execute OLAP queries in the system, just like using databases and data warehouses, and output data to the application layer (such as KV) as required.

  • No labels