Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The table structure and data of the target end are processed before the synchronization task. Currently, this function is not available.

Process

draw.io Board Diagram
bordertrue
diagramNameSaveMode Function Process
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth606
revision2

Public Interfaces

org.apache.seatunnel.api.sink.SupportSaveMode
SupportSaveMode is an interface that Sink needs to implement if the SaveMode function is to be implemented. The interface defines an interface to obtain the saveMode actuator

...


The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method

The code in the the SinkExecuteProcessor and MultipleTableJobConfigParser class


Code Block
languagejava
private SinkAction<?, ?, ?, ?> createSinkAction( @Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
 CatalogTable catalogTable,
        Map<TablePath, CatalogTable> sinkTableMap,
throws TaskExecuteException {
      Set<Action> inputActions,
 DataStream<Row> input = upstreamDataStreams.get(0);
    ReadonlyConfig readonlyConfig,
   for (int i = 0; ClassLoaderi classLoader,
   < plugins.size(); i++) {
     Set<URL> factoryUrls,
      Config sinkConfig String factoryId,= pluginConfigs.get(i);
        int parallelism,    SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
        int configIndex) {
    Optional<CatalogTable> insteadTable;
    if (sinkTableMapplugins.sizeget(i);
   == 1) {
       DataStream<Row> insteadTablestream = sinkTableMap.valuesfromSourceTable(sinkConfig).stream().findFirst(orElse(input);
    } else {
      seaTunnelSink.setTypeInfo(
  // TODO: another table full name map
        insteadTable =
   (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePathif (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())); {
       }
    if (insteadTable.isPresent()) {
   SupportSaveMode saveModeSink    catalogTable = insteadTable.get(SupportSaveMode) seaTunnelSink;
    }
    SeaTunnelSink<?, ?, ?, ?> sink =
   try (SaveModeHandler saveModeHandler       FactoryUtil.createAndPrepareSink(= saveModeSink.getSaveModeHandler()) {
                    catalogTable, readonlyConfig, classLoader, factoryIdsaveModeHandler.handleSaveMode();
     sink.setJobContext(jobConfig.getJobContext());
     SinkConfig actionConfig =
    } catch (Exception      new SinkConfig(catalogTable.getTableId().toTablePath().toString());e) {
    long id = idGenerator.getNextId();
    String actionName =
         throw new  JobConfigParser.createSinkActionName(SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                }
    configIndex, factoryId, actionConfig.getMultipleRowTableId());
    SinkAction<?, ?, ?,}
 ?> sinkAction =
         DataStreamSink<Row> dataStreamSink =
 new SinkAction<>(
                    id,stream.sinkTo(new FlinkSink<>(seaTunnelSink))
                    actionName,
        .name(seaTunnelSink.getPluginName());
            new ArrayList<>(inputActions),if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                int parallelism   sink,= sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                    factoryUrls,dataStreamSink.setParallelism(parallelism);
            }
        actionConfig);
    if (!isStartWithSavePoint) {}


Code Block
languagejava
private SinkAction<?, ?, ?, ?> createSinkAction(
        CatalogTable catalogTable,
        Map<TablePath, CatalogTable> sinkTableMap,
        Set<Action> inputActions,
        ReadonlyConfig readonlyConfig,
        ClassLoader classLoader,
        Set<URL> factoryUrls,
        String factoryId,
        int parallelism,
        int configIndex) {
    Optional<CatalogTable> insteadTable;
    if (sinkTableMap.size() == 1) {
        insteadTable = sinkTableMap.values().stream().findFirst();
    } else {
        // TODO: another table full name map
        insteadTable =
                Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
    }
    if (insteadTable.isPresent()) {
        catalogTable = insteadTable.get();
    }
    SeaTunnelSink<?, ?, ?, ?> sink =
            FactoryUtil.createAndPrepareSink(
                    catalogTable, readonlyConfig, classLoader, factoryId);
    sink.setJobContext(jobConfig.getJobContext());
    SinkConfig actionConfig =
            new SinkConfig(catalogTable.getTableId().toTablePath().toString());
    long id = idGenerator.getNextId();
    String actionName =
            JobConfigParser.createSinkActionName(
                    configIndex, factoryId, actionConfig.getMultipleRowTableId());
    SinkAction<?, ?, ?, ?> sinkAction =
            new SinkAction<>(
                    id,
                    actionName,
                    new ArrayList<>(inputActions),
                    sink,
                    factoryUrls,
                    actionConfig);
    if (!isStartWithSavePoint) {
        handleSaveMode(sink);
    }
    sinkAction.setParallelism(parallelism);
    return sinkAction;
}

public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
    if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
        SupportSaveMode saveModeSink = (SupportSaveMode) sink;
        try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
            saveModeHandler.handleSaveMode();
        } catch (Exception e) {
            throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
        }
    }
}

Taking jdbcsink as an example


Code Block
languagejava
    @Override
    public DefaultSaveModeHandler getSaveModeHandler() {
        if (catalogTable == null) {
            return null;
        }
        Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS);
        if (catalogOptions == null) {
            return null;
        }
        String factoryId = catalogOptions.get(CommonOptions.FACTORY_ID.key());
        if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
            return null;
        }
        CatalogFactory catalogFactory =
                discoverFactory(
                        Thread.currentThread().getContextClassLoader(),
                        CatalogFactory.class,
                        factoryId);
        if (catalogFactory == null) {
            return null;
        }
        FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
        String fieldIde =
                fieldIdeEnum == null ? FieldIdeEnum.ORIGINAL.getValue() : fieldIdeEnum.getValue();
        TablePath tablePath =
                TablePath.of(
                        catalogTable.getTableId().getDatabaseName(),
                        catalogTable.getTableId().getSchemaName(),
                        CatalogUtils.quoteTableIdentifier(
                                catalogTable.getTableId().getTableName(), fieldIde));
        Catalog catalog =
                catalogFactory.createCatalog(
                        catalogFactory.factoryIdentifier(),
                        ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
        handleSaveModecatalog.open(sink);
        return new }DefaultSaveModeHandler(
    sinkAction.setParallelism(parallelism);
    return sinkAction;
}

public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
 schemaSaveMode,
     if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
        SupportSaveMode saveModeSinkdataSaveMode,
 = (SupportSaveMode) sink;
        try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) { catalog,
            saveModeHandler.handleSaveMode();
    tablePath,
    } catch (Exception e) {
        catalogTable,
    throw  new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
        }config.get(JdbcOptions.CUSTOM_SQL));
    }
}

Automatic table building library name and table name building scheme

...