Discussion thread | TBD |
---|---|
Vote thread | TBD |
ISSUE | https://github.com/apache/incubator-paimon/issues/1811 |
Release | TBD |
Motivation
Currently, Paimon doesn’t have event notification mechanism. In many users' production environments, users need to perceive the state changes of Paimon table, such as whether a new file has been committed to the table, in which partitions the committed files are, the size and number of the committed files, the status and type of compaction, operations like table creation, deletion, and schema changes, etc. So, we need to introduce a Listener system for Paimon.
Public Interfaces
Event
Event is the base event interface, which indicate a class is an event.
public interface Event extends Serializable { }
Listener
listener interface that can receive event notifications.
public interface Listener extends Serializable { /** Initialize a listener given a map of catalog properties. */ void initialize(Map<String, String> options); /** Notify an event. */ void notify(Event event); /** The identifying name of a Listener. */ String name(); }
Listeners
All listeners will be registered with Listeners, which is responsible for initializing the listeners and notifying them of events.
/** registration and notification for listeners. */ public class Listeners { private List<Listener> listeners = new ArrayList<>(); /** Register a listener to Listeners. */ public register(Listener listener) { } /** Notifies all Listener that a specific event has occurred. */ public void notify(Event event) { } /** Create a Listeners without any Listener. */ public static Listeners emptyListeners() { } }
Proposed Changes
Listener load
Listeners are loaded via SPI and initialized with the options of the catalog when CatalogFactory creates a catalog. Multiple initialized Listeners are registered in Listeners. There is no strict order between multiple listeners, and an exception thrown by one listener will not affect others. When implementing their own listeners, users should try to avoid long-running calls.
Usage with Listener
Suppose the user has a Listener implementation as shown below, and places it into the classpath.
public class MyListener implements Listener{ @Override public void initialize(Map<String, String> options) { String value1 = options.get("key1"); String value2 = options.get("key2"); .... } @Override public void notify(Event event) { ... } @Override public String name() { return "my_listener"; } }
Adding the corresponding configuration during the creation of the catalog will activate the listener named "my_listener". The params key1 and key2 will be
CREATE CATALOG my_catalog WITH ( 'type' = 'paimon', 'warehouse' = 'warehouse path', 'listener.names' = 'my_listener,xxx,xxx', 'listener.option.my_listener.key1' = 'value1', 'listener.option.my_listener.key2' = 'value2' );
Event Lists
CommitEvent
Each commit operation on the table generates a CommitEvent regardless of whether it succeeds or fails. including three commit types: APPEND, COMPACT, and OVERWRITE. Many features such as independent compaction service and advancing downstream tasks based on watermark rely on CommitEvent.
public class CommitEvent implements Event{ private List<ManifestEntry> tableFiles; private List<ManifestEntry> changelogFiles; private List<IndexManifestEntry> indexFiles; private long identifier; private Long watermark; private Map<Integer, Long> logOffsets; private Snapshot.CommitKind commitKind; private Throwable throwable; /** The file path of the table in the filesystem. */ private String path; }
TriggerCompactEvent & CompactEvent
Since the compaction task is likely to take a long time to execute, a TriggerCompactEvent will be generated before the compaction task begins in order to perceive its execution status. A CompactEvent will be generated for every compaction finish regardless of whether it succeeds or fails. Through these two events, users can accurately know the start time and file status of each task, which can provide reference for users to judge whether the merge task is healthy. In addition, most of the compaction-related information that users are concerned about can be obtained through these two events.
public class TriggerCompactEvent implements Event{ private final boolean fullCompaction; private final BinaryRow partition; private final int bucket; private final List<DataFileMeta> inputs; /** The file path of the table in the filesystem. */ private final String path; }
public class CompactEvent implements Event{ private final boolean fullCompaction; private final BinaryRow partition; private final int bucket; private final List<DataFileMeta> beforeFiles; private final List<DataFileMeta> afterFiles; private final Throwable throwable; /** The file path of the table in the filesystem. */ private final String path; }
CreateTableEvent & DropTableEvent & RenameTableEvent & AlterTableEvent
These four events are generated when a table is successfully created, dropped, altered, or renamed in the catalog. In features like an independent compaction service, it is necessary to timely perceive these status changes of the table. For example, when an option of a table is modified, the new compaction parameters need to be checked in time to determine whether to immediately perform compaction on the table. Create and drop events can promptly notify the compaction service of table creation and deletion, which affects its scheduling strategy. The rename event can prompt the compaction service to correspond the execution record information of the historical table with the new table name.Without a listener mechanism, it would be necessary to continuously scan tables through polling, which would result in high resource consumption and poor timeliness.
public class CreateTableEvent implements Event { /** The file path of the table in the filesystem. */ private String path; private Schema schema; }
public class Drop TableEvent implements Event { /** The file path of the table in the filesystem. */ private String path; }
public class RenameTab leEvent implements Event{ /** The old file path of the table in the filesystem. */ private String oldPath; /** The new path of the table in the filesystem. */ private String newPath; }
public class AlterTableEvent implements Event{ /** The file path of the table in the filesystem. */ private String path; private List<SchemaChange> schemaChanges; }
Compatibility, Deprecation, and Migration Plan
None
Test Plan
IT tests: Verify if the listener configuration is effective and if various events are notified correctly