You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Discussion thread
Vote threadTBD
ISSUEhttps://github.com/apache/incubator-paimon/issues/1224
ReleaseTBD

Motivation

Currently, paimon has very high support for stream write and stream read, but not enough for traditional batch processing. After the table is created, you need to display the specified bucket key and bucket number; otherwise, the AppendOnly table or changelog table for a single bucket is created. When there is only one bucket, concurrent read/write and compact cannot be performed, resulting in poor batch performance.

In a traditional offline bin, users don't care about buckets and don't want to care about buckets. In the case of batch off-line computing, the bucket concept created by paimon for real-time stream reading and writing does not apply. In the case of non-stream read and write, the guaranteed record order based on sequence.field does not make much sense. Because without streaming retract, only concurrent insert data flows into the table. The rest of the processing is done in batch process mode.

Therefore, it is necessary to design a way, which needs to perform good while regardless of bucket. Here is a proposal with a negative bucket.

SCENARIO

The customer has a large amount of order transaction data and expects it to automatically flow into the offline table every time the transaction data is available, and batch task statistics will be performed at 12 o'clock every day
Combined with the flink computing engine, we can create AppendOnly Table with negative bucket(regardless of bucket):

CREATE TABLE Orders ( order_id INT, order_type STRING, `date` TIMESTAMP, price INT, number INT ) PARTITIONED BY (order_type) WITH ( 'write-mode' = 'append-only', 'bucket' = '-1' );


There isn’t any property about bucket.

We want this table to be used for OLAP offline analysis, such as once a day statistics. But its data volume and traffic volume are large, so we hope it can update by itself:

INSERT INTO Orders SELECT * FROM OrderSource;


Conduct off-line analysis and query to calculate the total sales in one day:

SELECT sum(price * number) FROM Orders GROUP BY DATE_FORMAT(`date`, 'yyyyMMdd’);


Statistical order type transaction quantity:

SELECT order_type, sum(*) FROM Orders GROUP BY order_type;


The order type has changed, and all historical data needs to be changed (flink-version >= 1.17) :

UPDATE Orders SET order_type = 'banana' WHERE order_type = 'apple' AND `date` > TO_TIMESTAMP('2020-02-02', 'yyyy-HH-dd’);


Public Interfaces

Only the table property 'bucket' = '-1' is exported to public.


Proposed Changes

Negative Bucket AppendOnly Table

At the paimon project level, we can assign the bucket negative number to tag that is regardless. In this mode: 

1. Table based on AppendOnly Mode, Only "I" data appears in stream read/write.

2. All data goes to the default bucket=0 file directory (for consistency with the previous design). 

3. For parallelly one bucket write/read, write first read order is not guaranteed.

4. Support parallelly insert data into one bucket.


The sql INSERT INTO Orders SELECT * FROM OrderSource create a dag like below:

The writes all belong to one bucket, they could insert into one bucket parallelly. So we don't worry about the performance while inserting.

Compaction

Another problem comes up while inserting more and more data. As traditional bin, we don't have compaction while inserting. In paimon, it's hard to trigger a compaction for one bucket parallelly. Below, come up one solution.

1. Separate compaction from writer, and writer is no longer responsible for compact at the same time. 

2. Create new members: CompactionCoordinator and CompactionWorker. CompactionCoordinator is a single concurrent coordinator that receives data written by upstream writers. CompactionWorker is a multi-concurrent compaction executor that runs only the compaction task specified by the coordinator.


At the computing engine level, we build the following topology when writing in real time:

1. In the prepareSnapshot phase, the writer flushes the new file, the compaction coordinator receives the new file, and the compaction coordinator reads the last delta file from the latest snapshot and adds it to the restored files. 

Also, depending on a strategy, decide whether to create a compaction.

2. A compaction coordinator delivers a compaction task. Every compaction worker executes tasks based on any compaction that occurs and any new file submitted by the writer. Build the commit message again. Pass commit message to downstream committer after execution.

3. The snapshot stage saves the status information of each operator.

4. During the notify cp complete phase, the committer submits the file information that compacts. Generate a new snapshot. When a compaction coordinator next prepares snapshot, it reads the snapshot delta (and updates its restored files based on the saved snapshot id).


Use negative bucket append only table for scenarios such as delete and update. RowType such as +U -U -D is not used while performing deleting and updating operations. Real file replacement is used for deleting and updating. 

After each delete update operation, a new snapshot is generated.

Compatibility, Deprecation, and Migration Plan

None

Test Plan

UT tests: verify all the component works, including: compaction coordinator, compaction worker, etc

IT tests: verify logic. Such as, whether the dag is correct, does the compaction works correctly in a flink job

Rejected Alternatives

  • Still put compaction in writers, but only one writer could trigger compaction at a time. (rejected: it will slow down the inserting. compaction writer will run in a poor performance.)
  • Start another compaction process to trigger compaction. (rejected: it's a waste of resource.)


  • No labels