Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The table structure and data of the target end are processed before the synchronization task. Currently, this function is not available.

Process

draw.io Board Diagram
bordertrue
diagramNameSaveMode Function Process
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth606
revision2

Public Interfaces

org.apache.seatunnel.api.sink.SupportSaveMode
SupportSaveMode is an interface that Sink needs to implement if the SaveMode function is to be implemented. The interface defines an interface to obtain the saveMode actuator

...

Code Block
languagejava
private SinkAction<?, ?, ?, ?> createSinkAction(
        CatalogTable catalogTable,
        Map<TablePath, CatalogTable> sinkTableMap,
        Set<Action> inputActions,
        ReadonlyConfig readonlyConfig,
        ClassLoader classLoader,
        Set<URL> factoryUrls,
        String factoryId,
        int parallelism,
        int configIndex) {
    Optional<CatalogTable> insteadTable;
    if (sinkTableMap.size() == 1) {
        insteadTable = sinkTableMap.values().stream().findFirst();
    } else {
        // TODO: another table full name map
        insteadTable =
                Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
    }
    if (insteadTable.isPresent()) {
        catalogTable = insteadTable.get();
    }
    SeaTunnelSink<?, ?, ?, ?> sink =
            FactoryUtil.createAndPrepareSink(
                    catalogTable, readonlyConfig, classLoader, factoryId);
    sink.setJobContext(jobConfig.getJobContext());
    SinkConfig actionConfig =
            new SinkConfig(catalogTable.getTableId().toTablePath().toString());
    long id = idGenerator.getNextId();
    String actionName =
            JobConfigParser.createSinkActionName(
                    configIndex, factoryId, actionConfig.getMultipleRowTableId());
    SinkAction<?, ?, ?, ?> sinkAction =
            new SinkAction<>(
                    id,
                    actionName,
                    new ArrayList<>(inputActions),
                    sink,
                    factoryUrls,
                    actionConfig);
    if (!isStartWithSavePoint) {
        handleSaveMode(sink);
    }
    sinkAction.setParallelism(parallelism);
    return sinkAction;
}

public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
    if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
        SupportSaveMode saveModeSink = (SupportSaveMode) sink;
        try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
            saveModeHandler.handleSaveMode();
        } catch (Exception e) {
            throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
        }
    }
}

Taking jdbcsink as an example


Code Block
languagejava
    @Override
    public DefaultSaveModeHandler getSaveModeHandler() {
        if (catalogTable == null) {
            return null;
        }
        Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS);
        if (catalogOptions == null) {
            return null;
        }
        String factoryId = catalogOptions.get(CommonOptions.FACTORY_ID.key());
        if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
            return null;
        }
        CatalogFactory catalogFactory =
                discoverFactory(
                        Thread.currentThread().getContextClassLoader(),
                        CatalogFactory.class,
                        factoryId);
        if (catalogFactory == null) {
            return null;
        }
        FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
        String fieldIde =
                fieldIdeEnum == null ? FieldIdeEnum.ORIGINAL.getValue() : fieldIdeEnum.getValue();
        TablePath tablePath =
                TablePath.of(
                        catalogTable.getTableId().getDatabaseName(),
                        catalogTable.getTableId().getSchemaName(),
                        CatalogUtils.quoteTableIdentifier(
                                catalogTable.getTableId().getTableName(), fieldIde));
        Catalog catalog =
                catalogFactory.createCatalog(
                        catalogFactory.factoryIdentifier(),
                        ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
        catalog.open();
        return new DefaultSaveModeHandler(
                schemaSaveMode,
                dataSaveMode,
                catalog,
                tablePath,
                catalogTable,
                config.get(JdbcOptions.CUSTOM_SQL));
    }

Automatic table building library name and table name building scheme

...