Discussion threadTBD
Vote threadTBD
ISSUEhttps://github.com/apache/incubator-paimon/issues/1811
ReleaseTBD

Motivation

Currently, Paimon doesn’t have event notification mechanism. In many users' production environments, users need to perceive the state changes of Paimon table, such as whether a new file has been committed to the table, in which partitions the committed files are, the size and number of the committed files, the status and type of compaction, operations like table creation, deletion, and schema changes, etc. So, we need to introduce a Listener system for Paimon.

Public Interfaces

Event

Event is the base event interface, which indicate a class is an event.

Event
public interface Event extends Serializable {
}

Listener

listener interface that can receive event notifications. 

Listener
public interface Listener extends Serializable {

    /** Notify an event. */
    void notify(Event event);
}

ListenerFactory

A Factory for Listener.

ListenerFactory
public interface ListenerFactory extends Serializable {

    /** Initialize a listener given a map of catalog properties. */
    Listener create(Options options);

    /** The identifying name of a Listener. */
    String name();
}

Proposed Changes

Listener

ListenerFactory are loaded via SPI and created Listener with the options of the catalog. There is no strict order between multiple listeners, and an exception thrown by one listener will not affect others. When implementing their own listeners, users should try to avoid long-running calls. 

Usage with Listener

Suppose the user has a Listener and ListenerFactory implementation as shown below, and places it into the classpath.

MyListener
public class MyListener implements Listener{

	public MyListener(String value1, String value2) {
		....
	}

    @Override
    public void notify(Event event) {
        ...
    }
}


MyListenerFactory
public interface MyListenerFactory implements ListenerFactory {

    @Override
    public Listener create(Options options) {
	    String value1 = options.get("key1");
        String value2 = options.get("key2");
        return new MyListener(value1, value2);
	}

    @Override
    public String name() {
        return "my_listener";
    } 
}


Adding the corresponding configuration during the creation of the catalog will activate the listener named "my_listener". The params key1 and key2 will be 

CREATE CATALOG my_catalog WITH (    
'type' = 'paimon',
'warehouse' = 'warehouse path',
'listener.names' = 'my_listener,xxx,xxx',
'listener.option.my_listener.key1' = 'value1',
'listener.option.my_listener.key2' = 'value2'
);

Event Lists

CommitEvent

Each commit operation on the table generates a CommitEvent regardless of whether it succeeds or fails. including three commit types: APPEND, COMPACT, and OVERWRITE. Many features such as independent compaction service and advancing downstream tasks based on watermark rely on CommitEvent.

CommitEvent
public class CommitEvent implements Event{

    private List<ManifestEntry> tableFiles;
    private List<ManifestEntry> changelogFiles;
    private List<IndexManifestEntry> indexFiles;
    private long identifier;
    private Long watermark;
    private Map<Integer, Long> logOffsets;
    private Snapshot.CommitKind commitKind;


    private Throwable throwable;

 	/** The file path of the table in the filesystem. */
 	private String path; 
}

TriggerCompactEvent & CompactEvent

Since the compaction task is likely to take a long time to execute, a TriggerCompactEvent will be generated before the compaction task begins in order to perceive its execution status. A CompactEvent will be generated for every compaction finish regardless of whether it succeeds or fails. Through these two events, users can accurately know the start time and file status of each task, which can provide reference for users to judge whether the merge task is healthy. In addition, most of the compaction-related information that users are concerned about can be obtained through these two events. 

TriggerCompactEvent
public class TriggerCompactEvent implements Event{

    private final boolean fullCompaction;

    private final BinaryRow partition;

    private final int bucket;

    private final List<DataFileMeta> inputs;

	/** The file path of the table in the filesystem. */
	private final String path;
}


CompactEvent
public class CompactEvent implements Event{

    private final boolean fullCompaction;

    private final BinaryRow partition;

    private final int bucket;

    private final List<DataFileMeta> beforeFiles;

    private final List<DataFileMeta> afterFiles;

    private final Throwable throwable;

	/** The file path of the table in the filesystem. */
    private final String path;
}

CreateTableEvent & DropTableEvent & RenameTableEvent & AlterTableEvent

These four events are generated when a table is successfully created, dropped, altered, or renamed in the catalog. In features like an independent compaction service, it is necessary to timely perceive these status changes of the table. For example, when an option of a table is modified, the new compaction parameters need to be checked in time to determine whether to immediately perform compaction on the table. Create and drop events can promptly notify the compaction service of table creation and deletion, which affects its scheduling strategy. The rename event can prompt the compaction service to correspond the execution record information of the historical table with the new table name.Without a listener mechanism, it would be necessary to continuously scan tables through polling, which would result in high resource consumption and poor timeliness.

CreateTableEvent
public class CreateTableEvent implements Event {

	/** The file path of the table in the filesystem. */
    private String path;

    private Schema schema;
}


DropTableEvent
public class Drop
TableEvent implements Event {

	/** The file path of the table in the filesystem. */
    private String path;
}


RenameTableEvent
public class RenameTab
leEvent implements Event{

 	/** The old file path of the table in the filesystem. */
    private String oldPath;

 	/** The new path of the table in the filesystem. */
    private String newPath;
}


AlterTableEvent
public class AlterTableEvent implements Event{

 	/** The file path of the table in the filesystem. */
    private String path;

    private List<SchemaChange> schemaChanges;
}

Compatibility, Deprecation, and Migration Plan

None

Test Plan

IT tests: Verify if the listener configuration is effective and if various events are notified correctly

Rejected Alternatives


  • No labels