Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

org.apache.seatunnel.api.sink.SaveModeHandler
SaveModeHandler is an executor that executes saveMode functions. This interface is used to implement the specific logic of saveMode functions.

Code Block
languagejava
package org.apache.seatunnel.api.sink;

...



public interface SaveModeHandler extends AutoCloseable {

...



    void handleSchemaSaveMode();

...



    void handleDataSaveMode();

...



    default void handleSaveMode() {

...


        handleSchemaSaveMode();

...


        handleDataSaveMode();

...


    }

...


}

SaveModeHandler interface has a default implementation org. Apacheapache. Seatunnelseatunnel. APIapi. Sinksink.DefaultSaveModeHandler

Code Block
languagejava
package org.apache.seatunnel.api.sink;

...



@AllArgsConstructor

...


public class DefaultSaveModeHandler implements SaveModeHandler {

...



    public SchemaSaveMode schemaSaveMode;

...


    public DataSaveMode dataSaveMode;

...


    public Catalog catalog;

...


    public TablePath tablePath;

...


    public CatalogTable catalogTable;

...


    public String customSql;

...



    public DefaultSaveModeHandler(

...


            SchemaSaveMode schemaSaveMode,

...


            DataSaveMode dataSaveMode,

...


            Catalog catalog,

...


            CatalogTable catalogTable,

...


            String customSql) {

...


        this(

...


                schemaSaveMode,

...


                dataSaveMode,

...


                catalog,

...


                catalogTable.getTableId().toTablePath(),

...


                catalogTable,

...


                customSql);

...


    }

...



    @Override

...


    public void handleSchemaSaveMode() {

...


        switch (schemaSaveMode) {

...


            case RECREATE_SCHEMA:

...


                recreateSchema();

...


                break;

...


            case CREATE_SCHEMA_WHEN_NOT_EXIST:

...


                createSchemaWhenNotExist();

...


                break;

...


            case ERROR_WHEN_SCHEMA_NOT_EXIST:

...


                errorWhenSchemaNotExist();

...


                break;

...


            default:

...


                throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode);

...


        }

...


    }

...



    @Override

...


    public void handleDataSaveMode() {

...


        switch (dataSaveMode) {

...


            case KEEP_SCHEMA_DROP_DATA:

...


                keepSchemaDropData();

...


                break;

...


            case KEEP_SCHEMA_AND_DATA:

...


                keepSchemaAndData();

...


                break;

...


            case CUSTOM_PROCESSING:

...


                customProcessing();

...


                break;

...


            case ERROR_WHEN_DATA_EXISTS:

...


                errorWhenDataExists();

...


                break;

...


            default:

...


                throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);

...


        }

...


    }

...



    protected void recreateSchema() {

...


        if (tableExists()) {

...


            dropTable();

...


        }

...


        createTable();

...


    }

...



    protected void createSchemaWhenNotExist() {

...


        if (!tableExists()) {

...


            createTable();

...


        }

...


    }

...



    protected void errorWhenSchemaNotExist() {

...


        if (!tableExists()) {

...


            throw new SeaTunnelRuntimeException(SINK_TABLE_NOT_EXIST, "The sink table not exist");

...


        }

...


    }

...



    protected void keepSchemaDropData() {

...


        if (tableExists()) {

...


            truncateTable();

...


        }

...


    }

...



    protected void keepSchemaAndData() {}

...



    protected void customProcessing() {

...


        executeCustomSql();

...


    }

...



    protected void errorWhenDataExists() {

...


        if (dataExists()) {

...


            throw new SeaTunnelRuntimeException(

...


                    SOURCE_ALREADY_HAS_DATA, "The target data source already has data");

...


        }

...


    }

...



    protected boolean tableExists() {

...


        return catalog.tableExists(tablePath);

...


    }

...



    protected void dropTable() {

...


        catalog.dropTable(tablePath, true);

...


    }

...



    protected void createTable() {

...


        catalog.createTable(tablePath, catalogTable, true);

...


    }

...



    protected void truncateTable() {

...


        catalog.truncateTable(tablePath, true);

...


    }

...



    protected boolean dataExists() {

...


        return catalog.isExistsData(tablePath);

...


    }

...



    protected void executeCustomSql() {

...


        catalog.executeSql(customSql);

...


    }

...



    @Override

...


    public void close() throws Exception {

...


        catalog.close();

...


    }

...


}

Code execution time


The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method

The code in the MultipleTableJobConfigParser class


Code Block
languagejava
private SinkAction<?, ?, ?, ?> createSinkAction(

...


        CatalogTable catalogTable,

...


        Map<TablePath, CatalogTable> sinkTableMap,

...


        Set<Action> inputActions,

...


        ReadonlyConfig readonlyConfig,

...


        ClassLoader classLoader,

...


        Set<URL> factoryUrls,

...


        String factoryId,

...


        int parallelism,

...


        int configIndex) {

...


    Optional<CatalogTable> insteadTable;

...


    if (sinkTableMap.size() == 1) {

...


        insteadTable = sinkTableMap.values().stream().findFirst();

...


    } else {

...


        // TODO: another table full name map

...


        insteadTable =

...


                Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));

...


    }

...


    if (insteadTable.isPresent()) {

...


        catalogTable = insteadTable.get();

...


    }

...


    SeaTunnelSink<?, ?, ?, ?> sink =

...


            FactoryUtil.createAndPrepareSink(

...


                    catalogTable, readonlyConfig, classLoader, factoryId);

...


    sink.setJobContext(jobConfig.getJobContext());

...


    SinkConfig actionConfig =

...


            new SinkConfig(catalogTable.getTableId().toTablePath().toString());

...


    long id = idGenerator.getNextId();

...


    String actionName =

...


            JobConfigParser.createSinkActionName(

...


                    configIndex, factoryId, actionConfig.getMultipleRowTableId());

...


    SinkAction<?, ?, ?, ?> sinkAction =

...


            new SinkAction<>(

...


                    id,

...


                    actionName,

...


                    new ArrayList<>(inputActions),

...


                    sink,

...


                    factoryUrls,

...


                    actionConfig);

...


    if (!isStartWithSavePoint) {

...


        handleSaveMode(sink);

...


    }

...


    sinkAction.setParallelism(parallelism);

...


    return sinkAction;

...


}

...



public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {

...


    if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {

...


        SupportSaveMode saveModeSink = (SupportSaveMode) sink;

...


        try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {

...


            saveModeHandler.handleSaveMode();

...


        } catch (Exception e) {

...


            throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);

...


        }

...


    }

...


}

Automatic table building library name and table name building scheme

...