You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Discussion thread
Vote thread
Issue
Release-

This PIP is co-worked by Ming Li , Guojun Li and Fang Yong .

Motivation

Paimon is a Streaming Lakehouse, Flink streaming and batch jobs can use Paimon tables as data sources and sinks. Flink can be combined with Paimon to complete the entire ETL processing, and the overall process is as follows.

We would like to manage Flink Streaming & Batch ETL processing in Paimon based on the current ability of Flink, including:

1. Table lineage in Paimon. We want to create lineage between Paimon source and sink tables based on Flink job name, then we can the entire ETL topology for Paimon tables and Flink jobs.

2. Data lineage for Table. Flink streaming job creates a snapshot for Paimon table when it completes a checkpoint, we can create the snapshot lineage for source and sink tables.

Based on the above lineage, we could support the following capabilities in the integrated Flink ETL processing and Paimon tables

1. Query Paimon tables dependency, which allows users to manage their tables and Flink jobs better.

2. Locating and analyzing issues. When there is a delay in the data of a Paimon table, users can get the upstream tables and jobs, check whether there are any issues in them.

3. Data management for streaming and batch ETL, for example, according to the data lineage, users can easily submit debug jobs, compare the results between the debug job and streaming job.

4. Data correction for streaming ETL. When users need to correct data, they can rollback all tables and jobs to a unify "version" based on the data lineage, and recover the ETL after data correction.

Architecture

We would like to introduce System Database named "paimon" for each catalog in Paimon, it stores the properties of catalog and lineage information. Users can create a Paimon catalog with table and data lineage options, and if the source or sink tables of ETL job are in the catalog, the lineage information will be saved. The overall process is as followed.

1. Create a catalog with options table-lineage and data-lineage . NOTICE: these two options are immutable.

-- Create a catalog with lineage options
CREATE CATALOG my_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 'warehouse path',
    'table-lineage' = 'true',
    'data-lineage' = 'true'
);

USE CATALOG my_catalog;

-- Select the options for the current catalog
SELECT * FROM paimon.catalog_options;
+-----------------------+
|     name      | value |
+-----------------------+
| table-lineage | true  |
+-----------------------+
| data-lineage  | true  |
+-----------------------+
2 row in set

2. Create two tables and submit Flink ETL job.

-- Create a word table
CREATE TABLE word_table (word STRING);

-- Create a word count table
CREATE TABLE word_count_table (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BITING
);

-- Submit a flink job to ingest data from kafka to word_table
INSERT INTO word_table SELECT FROM ...;

SET 'pipeline.name'='job1';
INSERT INTO word_count_table
    SELECT word, count(*) FROM word_table 
    GROUP BY word; 

Paimon will create a relationship between word_table  and word_count_table  with job1 , as mentioned above, it includes table lineage and data lineage. For example, Checkpoint 1 in job1 consumes Snapshot 3 of word_table, and products Snapshot 5 in word_count_table, and so on. The information of table and data lineage is shown as follows.

3. Users can debug and validate their data of tables based on the table and data lineage, the steps are

a) Users can get the information that "Snapshot 5 of word_table generates Snapshot 7 in word_count_table" from lineage tables.

b) According to the time travel ability of Paimon, users can submit a Flink or Spark batch job to read Snapshot 5 from word_table, generates data to a new table word_count_table_snapshot_result whose schema is same as word_count_table.

c) Compare the data in word_count_table_snapshot_result and Snapshot 7 of word_count_table to validate the result of streaming ETL.

-- Create table word_count_table_snapshot_result from word_count_table
CREATE TABLE word_count_table_snapshot_result LIKE word_count_table;

-- Read snapshot 5 from word_table and write data to word_count_table_snapshot_result
-- in batch mode
INSERT INTO word_count_table_snapshot_result
    SELECT word, count(*)
    FROM word_table /*+ OPTIONS('scan.snapshot-id'='5') */
    GROUP BY word;

-- Compare the data in word_count_table_snapshot_result with
-- snapshot 7 in word_count_table and get the diffs
SELECT * FROM word_count_table_snapshot_result as L 
    FULL OUTER JOIN word_count_table /*+ OPTIONS('scan.snapshot-id'='7') */ as R
    ON L.word=R.word
    where L.word IS NULL OR R.word IS NULL OR L.cnt != R.cnt;


Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the PIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels