You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

PREFACE:

Currently, paimon has very high support for stream write and stream read, but not enough for traditional batch processing. After the table is created, you need to display the specified bucket key and bucket number; otherwise, the AppendOnly table or changelog table for a single bucket is created. When there is only one bucket, concurrent read/write and compact cannot be performed, resulting in poor batch performance.

In a traditional offline bin, users don't care about buckets and don't want to care about buckets. In the case of batch off-line computing, the bucket concept created by paimon for real-time stream reading and writing does not apply. In the case of non-stream read and write, the guaranteed record order based on sequence.field does not make much sense. Because without streaming retract, only concurrent insert data flows into the table. The rest of the processing is done in batch mode.

Therefore, it is necessary to design a table, which needs to meet the following characteristics:
1. There is no concept of primary key and bucket key, which is equivalent to the offline table in the offline bucket.
2. Only inserting data in stream mode is supported.(Only “I” type data) As an offline table, data can be inserted synchronously in real time, but there is no need for real-time delete, update, etc.
3. streaming write, support concurrent write, concurrent compact.
4. update data and delete data through batch tasks.
5. Write first read order is not guaranteed.

SCENARIO:

The customer has a large amount of order transaction data and expects it to automatically flow into the offline table every time the transaction data is available, and batch task statistics will be performed at 12 o'clock every day
Combined with the flink computing engine, we can create tables when only offline batch tables are needed:

CREATE TABLE Orders ( order_id INT, order_type STRING, `date` TIMESTAMP, price INT, number INT ) PARTITIONED BY (order_type) WITH ( 'write-mode' = 'table' );


There isn’t any property about bucket.

We want this table to be used for olap offline analysis, such as once a day statistics. But its data volume and traffic volume are large, so we hope it can update by itself:

INSERT INTO Orders SELECT * FROM OrderSource;


Conduct off-line analysis and query to calculate the total sales in one day:

SELECT sum(price * number) FROM Orders GROUP BY DATE_FORMAT(`date`, 'yyyyMMdd’);


Statistical order type transaction quantity:

SELECT order_type, sum(*) FROM Orders GROUP BY order_type;


The order type has changed, and all historical data needs to be changed (flink-version >= 1.17) :

UPDATE Orders SET order_type = 'banana' WHERE order_type = 'apple' AND `date` > TO_TIMESTAMP('2020-02-02', 'yyyy-HH-dd’);


DESIGN:

At the paimon project level, we need a new table, a new write mode. In this mode: 

1. +I data only, no +U -U -D data types. 

2. All data goes to the default bucket=0 file directory (for consistency with the previous design). 

3. No sequence.number, data sequence reading and writing is not required 

4. Separate compaction from writer, and writer is no longer responsible for compact at the same time. This is for Solving the problem of compact when single bucket is written concurrently (default write-only=true when building table) 

5. Create new members: CompactionCoordinator and CompactionWorker. CompactionCoordinator is a single concurrent coordinator that receives data written by upstream writers. CompactionWorker is a multi-concurrent compaction executor that runs only the compaction task specified by the coordinator.


At the computing engine level, we build the following topology when writing in real time:

1. In the prepareSnapshot phase, the writer flushes the new file, the compaction coordinator receives the new file, and the compaction coordinator reads the last delta file from the latest snapshot and adds it to the restored files. 

Also, depending on a strategy, decide whether to create a compaction.

2. A compaction coordinator delivers a compaction task. Every compaction worker executes tasks based on any compaction that occurs and any new file submitted by the writer. Build the commit message again. Pass commit message to downstream committer after execution.

3. The snapshot stage saves the status information of each operator.

4. During the notify cp complete phase, the committer submits the file information that compacts. Generate a new snapshot. When a compaction coordinator next prepares snapshot, it reads the snapshot delta (and updates its restored files based on the saved snapshot id).


Use batch mode for scenarios such as delete and update. RowType such as +U -U -D is not used while performing deleting and updating operations. Real file replacement is used for deleting and updating. 

After each delete update operation, a new snapshot is generated.




  • No labels