Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The table structure and data of the target end are processed before the synchronization task. Currently, this function is not available.

Process

draw.io Board Diagram
bordertrue
diagramNameSaveMode Function Process
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth606
revision2

Public Interfaces

org.apache.seatunnel.api.sink.SupportSaveMode
SupportSaveMode is an interface that Sink needs to implement if the SaveMode function is to be implemented. The interface defines an interface to obtain the saveMode actuator

...


The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method

The code in the the SinkExecuteProcessor and MultipleTableJobConfigParser class


Code Block
languagejava
private SinkAction<?, ?, ?, ?> createSinkAction( @Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
 CatalogTable catalogTable,
        Map<TablePath, CatalogTable> sinkTableMap,
throws TaskExecuteException {
      Set<Action> inputActions,
 DataStream<Row> input = upstreamDataStreams.get(0);
    ReadonlyConfig readonlyConfig,
   for (int i = 0; ClassLoaderi classLoader,
   < plugins.size(); i++) {
     Set<URL> factoryUrls,
      Config sinkConfig String factoryId,= pluginConfigs.get(i);
        int parallelism,    SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
        int configIndex) {
    Optional<CatalogTable> insteadTable;
    if (sinkTableMapplugins.sizeget(i);
   == 1) {
       DataStream<Row> insteadTablestream = sinkTableMap.valuesfromSourceTable(sinkConfig).stream().findFirst(orElse(input);
    } else {
      seaTunnelSink.setTypeInfo(
  // TODO: another table full name map
        insteadTable =
   (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePathif (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())); {
       }
    if (insteadTable.isPresent()) {
   SupportSaveMode saveModeSink    catalogTable = insteadTable.get(SupportSaveMode) seaTunnelSink;
    }
    SeaTunnelSink<?, ?, ?, ?> sink =
   try (SaveModeHandler saveModeHandler       FactoryUtil.createAndPrepareSink(= saveModeSink.getSaveModeHandler()) {
                    catalogTable, readonlyConfig, classLoader, factoryIdsaveModeHandler.handleSaveMode();
    sink.setJobContext(jobConfig.getJobContext()                } catch (Exception e) {
                    throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                }
            }
            DataStreamSink<Row> dataStreamSink =
                    stream.sinkTo(new FlinkSink<>(seaTunnelSink))
                            .name(seaTunnelSink.getPluginName());
            if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                dataStreamSink.setParallelism(parallelism);
            }
        }


Code Block
languagejava
private SinkAction<?, ?, ?, ?> createSinkAction(
        CatalogTable catalogTable,
        Map<TablePath, CatalogTable> sinkTableMap,
        Set<Action> inputActions,
        ReadonlyConfig readonlyConfig,
        ClassLoader classLoader,
        Set<URL> factoryUrls,
        String factoryId,
        int parallelism,
        int configIndex) {
    Optional<CatalogTable> insteadTable;
    if (sinkTableMap.size() == 1) {
        insteadTable = sinkTableMap.values().stream().findFirst();
    } else {
        // TODO: another table full name map
        insteadTable =
                Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
    }
    if (insteadTable.isPresent()) {
        catalogTable = insteadTable.get();
    }
    SeaTunnelSink<?, ?, ?, ?> sink =
            FactoryUtil.createAndPrepareSink(
                    catalogTable, readonlyConfig, classLoader, factoryId);
    sink.setJobContext(jobConfig.getJobContext());
    SinkConfig actionConfig =
            new SinkConfig(catalogTable.getTableId().toTablePath().toString());
    long id = idGenerator.getNextId();
    String actionName =
            JobConfigParser.createSinkActionName(
                    configIndex, factoryId, actionConfig.getMultipleRowTableId());
    SinkAction<?, ?, ?, ?> sinkAction =
            new SinkAction<>(
                    id,
                    actionName,
                    new ArrayList<>(inputActions),
                    sink,
                    factoryUrls,
                    actionConfig);
    if (!isStartWithSavePoint) {
        handleSaveMode(sink);
    }
    sinkAction.setParallelism(parallelism);
    return sinkAction;
}

public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
    if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
        SupportSaveMode saveModeSink = (SupportSaveMode) sink;
        try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
            saveModeHandler.handleSaveMode();
        } catch (Exception e) {
            throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
        }
    }
}

Taking jdbcsink as an example


Code Block
languagejava
    @Override
    public DefaultSaveModeHandler getSaveModeHandler() {
        if (catalogTable == null) {
            return null;
        }
        Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS);
        if (catalogOptions == null) {
            return null;
        }
        String factoryId = catalogOptions.get(CommonOptions.FACTORY_ID.key());
        if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
            return null;
        }
        CatalogFactory catalogFactory =
                discoverFactory(
                        Thread.currentThread().getContextClassLoader(),
                        CatalogFactory.class,
                        factoryId);
        if (catalogFactory == null) {
            return null;
        }
        FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
    SinkConfig actionConfig    String fieldIde =
              new SinkConfig(catalogTable.getTableId().toTablePath().toString());
  fieldIdeEnum == null ? FieldIdeEnum.ORIGINAL.getValue() : fieldIdeEnum.getValue();
     long id = idGenerator.getNextId();
    String actionName =
 TablePath tablePath =
                JobConfigParserTablePath.createSinkActionNameof(
                    configIndex,   factoryId, actionConfigcatalogTable.getMultipleRowTableIdgetTableId().getDatabaseName();,
     SinkAction<?,  ?, ?, ?> sinkAction =
            new SinkAction<>( catalogTable.getTableId().getSchemaName(),
                    id,
    CatalogUtils.quoteTableIdentifier(
                       actionName,
         catalogTable.getTableId().getTableName(), fieldIde));
        Catalog  new ArrayList<>(inputActions),catalog =
                catalogFactory.createCatalog(
    sink,
                    factoryUrlscatalogFactory.factoryIdentifier(),
                    actionConfig);
    ifReadonlyConfig.fromMap(new HashMap<>(!isStartWithSavePoint) {catalogOptions)));
        handleSaveModecatalog.open(sink);
      }
  return new sinkAction.setParallelism(parallelism);
DefaultSaveModeHandler(
     return sinkAction;
}

public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {           schemaSaveMode,
    if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
        SupportSaveMode saveModeSink = (SupportSaveMode) sink;
dataSaveMode,
          try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
   catalog,
             saveModeHandler.handleSaveMode();
   tablePath,
     } catch (Exception e) {
       catalogTable,
     throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
        }config.get(JdbcOptions.CUSTOM_SQL));
    }
}

Automatic table building library name and table name building scheme

...


The table attribute in sink is explained here: The value of the table attribute in sink will be used as the name of the final automatic table creation. The rules are:

  • If the table attribute contains ${table_name} and you want to replace it, for example, the table name in source is table001, then the final table name created based on the above example is test_table001_001002
  • If ${table_name} is not included, you do not need to replace the table name, and finally replace the table name with the value in the table attribute:

The database attribute databaseName attribute in sink also complies with this rule.