Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
private SinkAction<?, ?, ?, ?> createSinkAction(
        CatalogTable catalogTable,
        Map<TablePath, CatalogTable> sinkTableMap,
        Set<Action> inputActions,
        ReadonlyConfig readonlyConfig,
        ClassLoader classLoader,
        Set<URL> factoryUrls,
        String factoryId,
        int parallelism,
        int configIndex) {
    Optional<CatalogTable> insteadTable;
    if (sinkTableMap.size() == 1) {
        insteadTable = sinkTableMap.values().stream().findFirst();
    } else {
        // TODO: another table full name map
        insteadTable =
                Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
    }
    if (insteadTable.isPresent()) {
        catalogTable = insteadTable.get();
    }
    SeaTunnelSink<?, ?, ?, ?> sink =
            FactoryUtil.createAndPrepareSink(
                    catalogTable, readonlyConfig, classLoader, factoryId);
    sink.setJobContext(jobConfig.getJobContext());
    SinkConfig actionConfig =
            new SinkConfig(catalogTable.getTableId().toTablePath().toString());
    long id = idGenerator.getNextId();
    String actionName =
            JobConfigParser.createSinkActionName(
                    configIndex, factoryId, actionConfig.getMultipleRowTableId());
    SinkAction<?, ?, ?, ?> sinkAction =
            new SinkAction<>(
                    id,
                    actionName,
                    new ArrayList<>(inputActions),
                    sink,
                    factoryUrls,
                    actionConfig);
    if (!isStartWithSavePoint) {
        handleSaveMode(sink);
    }
    sinkAction.setParallelism(parallelism);
    return sinkAction;
}

public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
    if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
        SupportSaveMode saveModeSink = (SupportSaveMode) sink;
        try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
            saveModeHandler.handleSaveMode();
        } catch (Exception e) {
            throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
        }
    }
}

Taking jdbcsink as an example


Code Block
languagejava
    @Override
    public DefaultSaveModeHandler getSaveModeHandler() {
        if (catalogTable == null) {
            return null;
        }
        Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS);
        if (catalogOptions == null) {
            return null;
        }
        String factoryId = catalogOptions.get(CommonOptions.FACTORY_ID.key());
        if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
            return null;
        }
        CatalogFactory catalogFactory =
                discoverFactory(
                        Thread.currentThread().getContextClassLoader(),
                        CatalogFactory.class,
                        factoryId);
        if (catalogFactory == null) {
            return null;
        }
        FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
        String fieldIde =
                fieldIdeEnum == null ? FieldIdeEnum.ORIGINAL.getValue() : fieldIdeEnum.getValue();
        TablePath tablePath =
                TablePath.of(
                        catalogTable.getTableId().getDatabaseName(),
                        catalogTable.getTableId().getSchemaName(),
                        CatalogUtils.quoteTableIdentifier(
                                catalogTable.getTableId().getTableName(), fieldIde));
        Catalog catalog =
                catalogFactory.createCatalog(
                        catalogFactory.factoryIdentifier(),
                        ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
        catalog.open();
        return new DefaultSaveModeHandler(
                schemaSaveMode,
                dataSaveMode,
                catalog,
                tablePath,
                catalogTable,
                config.get(JdbcOptions.CUSTOM_SQL));
    }

Automatic table building library name and table name building scheme

...