Discussion thread | TBD |
---|---|
Vote thread | TBD |
ISSUE | https://github.com/apache/incubator-paimon/issues/1811 |
Release | TBD |
Motivation
Currently, Paimon doesn’t have event notification mechanism. In many users' production environments, users need to perceive the state changes of Paimon table, such as whether a new file has been committed to the table, in which partitions the committed files are, the size and number of the committed files, the status and type of compaction, operations like table creation, deletion, and schema changes, etc. So, we need to introduce a Listener system for Paimon.
Public Interfaces
Event
Event is the base event interface, which indicate a class is an event.
public interface Event extends Serializable { }
Listener
listener interface that can receive event notifications.
public interface Listener extends Serializable { /** Initialize a listener given a map of catalog properties. */ void initialize(Map<String, String> options); /** Notify an event. */ void notify(Event event); /** The identifying name of a Listener. */ String name(); }
Listeners
All listeners will be registered with Listeners, which is responsible for initializing the listeners and notifying them of events.
/** registration and notification for listeners. */ public class Listeners { private List<Listener> listeners = new ArrayList<>(); /** Register a listener to Listeners. */ public register(Listener listener) { } /** Notifies all Listener that a specific event has occurred. */ public void notify(Event event) { } /** Create a Listeners without any Listener. */ public static Listeners emptyListeners() { } }
Proposed Changes
Listener load
Listeners are loaded via SPI and initialized with the options of the catalog when CatalogFactory creates a catalog. Multiple initialized Listeners are registered in Listeners. There is no strict order between multiple listeners, and an exception thrown by one listener will not affect others. When implementing their own listeners, users should try to avoid long-running calls.
Usage with Listener
Suppose the user has a Listener implementation as shown below, and places it into the classpath.
public class MyListener implements Listener{ @Override public void initialize(Map<String, String> options) { String value1 = options.get("key1"); String value2 = options.get("key2"); .... } @Override public void notify(Event event) { ... } @Override public String name() { return "my_listener"; } }
Adding the corresponding configuration during the creation of the catalog will activate the listener named "my_listener". The params key1 and key2 will be
CREATE CATALOG my_catalog WITH ( 'type' = 'paimon', 'warehouse' = 'warehouse path', 'listener.names' = 'my_listener,xxx,xxx', 'listener.option.my_listener.key1' = 'value1', 'listener.option.my_listener.key2' = 'value2' );
Event Lists
CommitEvent
Each commit operation on the table generates a CommitEvent regardless of whether it succeeds or fails. including three commit types: APPEND, COMPACT, and OVERWRITE.
public class CommitEvent implements Event{ private List<ManifestEntry> tableFiles; private List<ManifestEntry> changelogFiles; private List<IndexManifestEntry> indexFiles; private long identifier; private Long watermark; private Map<Integer, Long> logOffsets; private Snapshot.CommitKind commitKind; private Throwable throwable; /** The file path of the table in the filesystem. */ private String path; }
TriggerCompactEvent
Since the compaction task is likely to take a long time to execute, a TriggerCompactEvent will be generated before the compaction task begins in order to perceive its execution status.
public class TriggerCompactEvent implements Event{ private final boolean fullCompaction; private final BinaryRow partition; private final int bucket; private final List<DataFileMeta> inputs; /** The file path of the table in the filesystem. */ private final String path; }
CompactEvent
A CompactEvent will be generated for every compaction, regardless of whether it succeeds or fails.
public class CompactEvent implements Event{ private final boolean fullCompaction; private final BinaryRow partition; private final int bucket; private final List<DataFileMeta> beforeFiles; private final List<DataFileMeta> afterFiles; private final Throwable throwable; /** The file path of the table in the filesystem. */ private final String path; }
CreateTableEvent
After successfully creating a table, a CreateTableEvent will be generated.
public class CreateTableEvent implements Event { /** The file path of the table in the filesystem. */ private String path; private Schema schema; }
DropTableEvent
After successfully dropping a table, a DropTableEvent will be generated.
public class Drop TableEvent implements Event { /** The file path of the table in the filesystem. */ private String path; }
RenameTableEvent
After successfully renaming a table, a RenameTableEvent will be generated.
public class RenameTab leEvent implements Event{ /** The old file path of the table in the filesystem. */ private String oldPath; /** The new path of the table in the filesystem. */ private String newPath; }
AlterTableEvent
After successfully altering a table, a RenameTableEvent will be generated.
public class AlterTableEvent implements Event{ /** The file path of the table in the filesystem. */ private String path; private List<SchemaChange> schemaChanges; }
Compatibility, Deprecation, and Migration Plan
None
Test Plan
IT tests: Verify if the listener configuration is effective and if various events are notified correctly