...
org.apache.seatunnel.api.sink.SaveModeHandler
SaveModeHandler is an executor that executes saveMode functions. This interface is used to implement the specific logic of saveMode functions.
Code Block |
---|
|
package org.apache.seatunnel.api.sink; |
...
public interface SaveModeHandler extends AutoCloseable { |
...
void handleSchemaSaveMode(); |
...
void handleDataSaveMode(); |
...
default void handleSaveMode() { |
...
...
...
...
SaveModeHandler interface has a default implementation org. Apacheapache. Seatunnelseatunnel. APIapi. Sinksink.DefaultSaveModeHandler
Code Block |
---|
|
package org.apache.seatunnel.api.sink; |
...
...
public class DefaultSaveModeHandler implements SaveModeHandler { |
...
public SchemaSaveMode schemaSaveMode; |
...
public DataSaveMode dataSaveMode; |
...
...
public TablePath tablePath; |
...
public CatalogTable catalogTable; |
...
...
public DefaultSaveModeHandler( |
...
SchemaSaveMode schemaSaveMode, |
...
DataSaveMode dataSaveMode, |
...
...
CatalogTable catalogTable, |
...
...
...
...
...
...
catalogTable.getTableId().toTablePath(), |
...
...
...
...
...
public void handleSchemaSaveMode() { |
...
switch (schemaSaveMode) { |
...
...
...
...
case CREATE_SCHEMA_WHEN_NOT_EXIST: |
...
createSchemaWhenNotExist(); |
...
...
case ERROR_WHEN_SCHEMA_NOT_EXIST: |
...
errorWhenSchemaNotExist(); |
...
...
...
throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode); |
...
...
...
...
public void handleDataSaveMode() { |
...
...
case KEEP_SCHEMA_DROP_DATA: |
...
...
...
case KEEP_SCHEMA_AND_DATA: |
...
...
...
...
...
...
case ERROR_WHEN_DATA_EXISTS: |
...
...
...
...
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode); |
...
...
...
protected void recreateSchema() { |
...
...
...
...
...
...
protected void createSchemaWhenNotExist() { |
...
...
...
...
...
protected void errorWhenSchemaNotExist() { |
...
...
throw new SeaTunnelRuntimeException(SINK_TABLE_NOT_EXIST, "The sink table not exist"); |
...
...
...
protected void keepSchemaDropData() { |
...
...
...
...
...
protected void keepSchemaAndData() {} |
...
protected void customProcessing() { |
...
...
...
protected void errorWhenDataExists() { |
...
...
throw new SeaTunnelRuntimeException( |
...
SOURCE_ALREADY_HAS_DATA, "The target data source already has data"); |
...
...
...
protected boolean tableExists() { |
...
return catalog.tableExists(tablePath); |
...
...
protected void dropTable() { |
...
catalog.dropTable(tablePath, true); |
...
...
protected void createTable() { |
...
catalog.createTable(tablePath, catalogTable, true); |
...
...
protected void truncateTable() { |
...
catalog.truncateTable(tablePath, true); |
...
...
protected boolean dataExists() { |
...
return catalog.isExistsData(tablePath); |
...
...
protected void executeCustomSql() { |
...
catalog.executeSql(customSql); |
...
...
...
public void close() throws Exception { |
...
...
...
Code execution time
The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method
The code in the MultipleTableJobConfigParser class
Code Block |
---|
|
private SinkAction<?, ?, ?, ?> createSinkAction( |
...
CatalogTable catalogTable, |
...
Map<TablePath, CatalogTable> sinkTableMap, |
...
Set<Action> inputActions, |
...
ReadonlyConfig readonlyConfig, |
...
...
...
...
...
...
Optional<CatalogTable> insteadTable; |
...
if (sinkTableMap.size() == 1) { |
...
insteadTable = sinkTableMap.values().stream().findFirst(); |
...
...
// TODO: another table full name map |
...
...
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath())); |
...
...
if (insteadTable.isPresent()) { |
...
catalogTable = insteadTable.get(); |
...
...
SeaTunnelSink<?, ?, ?, ?> sink = |
...
FactoryUtil.createAndPrepareSink( |
...
catalogTable, readonlyConfig, classLoader, factoryId); |
...
sink.setJobContext(jobConfig.getJobContext()); |
...
SinkConfig actionConfig = |
...
new SinkConfig(catalogTable.getTableId().toTablePath().toString()); |
...
long id = idGenerator.getNextId(); |
...
...
JobConfigParser.createSinkActionName( |
...
configIndex, factoryId, actionConfig.getMultipleRowTableId()); |
...
SinkAction<?, ?, ?, ?> sinkAction = |
...
...
...
...
new ArrayList<>(inputActions), |
...
...
...
...
if (!isStartWithSavePoint) { |
...
...
...
sinkAction.setParallelism(parallelism); |
...
...
...
public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) { |
...
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) { |
...
SupportSaveMode saveModeSink = (SupportSaveMode) sink; |
...
try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) { |
...
saveModeHandler.handleSaveMode(); |
...
...
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); |
...
...
...
Automatic table building library name and table name building scheme
...