Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread
Vote threadTBD
ISSUEhttps://github.com/apache/incubator-paimon/issues/1224
Release0.5

Motivation

Currently, paimon has very high support for stream write and stream read, but not enough for traditional batch processing. After the table is created, you need to display the specified bucket key and bucket number; otherwise, the AppendOnly table or changelog table for a single bucket is created. When there is only one bucket, concurrent read/write and compact cannot be performed, resulting in poor batch performance.

In a traditional offline bin, users don't care about buckets and don't want to care about buckets. In the case of batch off-line computing, the bucket concept created by paimon for real-time stream reading and writing does not apply. In the case of non-stream read and write, the guaranteed record order based on sequence.field does not make much sense. Because without streaming retract, only concurrent insert data flows into the table. The rest of the processing is done in batch process mode.

Therefore, it is necessary to design a tableway, which needs to perform good while regardless of bucket. Here is a proposal with a unaware-bucket table mode.

SCENARIO

The customer has a large amount of order transaction data and expects it to automatically flow into the offline table every time the transaction data is available, and batch task statistics will be performed at 12 o'clock every day
Combined with the flink computing engine, we can create tables when only offline batch tables are needed:CREATE TABLE Orders ( order_id INT, order_type STRING, `date` TIMESTAMP, price INT, number INT ) PARTITIONED BY AppendOnly Table with 'bucket' = '-1' (unaware of bucket):

Code Block
languagesql
titleCreateTable
CREATE TABLE Orders ( order_id INT, order_type STRING, `date` TIMESTAMP, price INT, number INT ) 
PARTITIONED BY (order_type) 
WITH ( 'write-mode' =

...

 'append-only', 'bucket' = '-1' );


There isn’t any property about bucket.

We want this table to be used for olap OLAP offline analysis, such as once a day statistics. But its data volume and traffic volume are large, so we hope it can update by itself:

Code Block
languagesql
titleStreamInsert
INSERT INTO Orders SELECT * FROM OrderSource;


Conduct off-line analysis and query to calculate the total sales in one day:

Code Block
languagesql
titleBatchQuery
SELECT sum(price * number) FROM Orders GROUP BY DATE_FORMAT(`date`, 'yyyyMMdd’);


Statistical order type transaction quantity:

Code Block
languagesql
titleBatchQuery
SELECT order_type, sum(*) FROM Orders GROUP BY order_type;


The order type has changed, and all historical data needs to be changed (flink-version >= 1.17) :

Code Block
languagesql
titleBatchUpdate
UPDATE Orders SET order_type = 'banana' WHERE order_type = 'apple' AND `date` > TO_TIMESTAMP('2020-02-02', 'yyyy-HH-dd’);


Public Interfaces

Only the table property 'write-modebucket' = 'table-1' is exported to public, which means this table unaware of bucket.


Proposed Changes

...

Unaware-Bucket AppendOnly Table

At the paimon project level, we need a new table, a new write modecan assign the bucket with -1 to tag that is regardless. In this mode: 

1. +I data only, no +U -U -D data types. Table based on AppendOnly Mode, Only "I" data appears in stream read/write.

2. All data goes to the default bucket=0 file directory (for consistency with the previous design). 

3. No sequence.number, data sequence reading and writing is not required. Write For parallelly one bucket write/read, write first read order is not guaranteed.

4. Support parallelly insert data into one bucket.


The sql "INSERT INTO Orders SELECT * FROM OrderSource" create a dag like below:


Image RemovedImage Added

The writes all belong to one bucket, they could insert into one bucket parallelly. So we don't worry about the performance while inserting.

Compaction

Another problem comes up while inserting more and more data. As traditional bin, we don't have compaction while inserting. In paimon, it's hard to trigger a compaction for one bucket parallelly. Below, come up one solution.

...

At the computing engine level, we build the following topology when writing in real time:


Image RemovedImage Added

1. In the prepareSnapshot phase, the The writer flushes the new file , the compaction coordinator receives the new file, and the compaction coordinator reads the last delta file from the latest snapshot and adds it to the restored filesto the committer as usual. At the mean time, the compaction coordinator scan snapshot for latest small files, add them to memory

Also, depending on a strategy, decide whether to create a compaction.

2. A compaction coordinator delivers a compaction task. Every compaction worker executes tasks based on any compaction that occurs and any new file submitted by the writer. Build the commit message again. Pass commit message to downstream committer after executionand actually do the compaction. After compaction task executed, they generate commit message to downstream committer.

3. The snapshot stage saves the status information of each operator. Notice that, the compaction coordinator and worker do not have to keep their state, every time restart, the compaction coordinator read the latest snapshot fully.

4. During the notify cp complete phase, the committer submits the file information that compactsinformations that compaction worker committed and writers committed. Generate a new snapshot. When a compaction coordinator next prepares snapshot, it reads the snapshot delta (and updates its restored files based on the saved snapshot id).

5. Anytime the file in compaction task, deleted by another job. The compaction job will fail when commit, and restart to keep the step with latest snapshot.


Use unaware-bucket append only table Use batch mode for scenarios such as delete and update. RowType such as +U -U -D is not used while performing deleting and updating operations. Real file replacement is used for deleting and updating. 

After each delete update operation, a new snapshot is generated.

Compatibility, Deprecation, and Migration Plan

None

Test Plan

UT tests: verify all the component works, including: compaction coordinator, compaction worker, etc

IT tests: verify logic. Such as, whether the dag is correct, does the compaction works correctly in a flink job

Rejected Alternatives

  • Still put compaction in writers, but only one writer could trigger compaction at a time. (rejected: it will slow down the inserting. compaction writer will run in a poor performance.)
  • Start another compaction process to trigger compaction. (rejected: it's a waste of resource.)
  • Create a new table mode(batch) to realize all this. (rejected: it will make situations complex.)