Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method

The code in the MultipleTableJobConfigParser classthe SinkExecuteProcessor and MultipleTableJobConfigParser class


Code Block
languagejava
    @Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
            throws TaskExecuteException {
        DataStream<Row> input = upstreamDataStreams.get(0);
        for (int i = 0; i < plugins.size(); i++) {
            Config sinkConfig = pluginConfigs.get(i);
            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
                    plugins.get(i);
            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
            seaTunnelSink.setTypeInfo(
                    (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
                SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
                try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
                    saveModeHandler.handleSaveMode();
                } catch (Exception e) {
                    throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                }
            }
            DataStreamSink<Row> dataStreamSink =
                    stream.sinkTo(new FlinkSink<>(seaTunnelSink))
                            .name(seaTunnelSink.getPluginName());
            if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                dataStreamSink.setParallelism(parallelism);
            }
        }


Code Block
languagejava
private SinkAction<?, ?, ?, ?> createSinkAction(
        CatalogTable catalogTable,
        Map<TablePath, CatalogTable> sinkTableMap,
        Set<Action> inputActions,
        ReadonlyConfig readonlyConfig,
        ClassLoader classLoader,
        Set<URL> factoryUrls,
        String factoryId,
        int parallelism,
        int configIndex) {
    Optional<CatalogTable> insteadTable;
    if (sinkTableMap.size() == 1) {
        insteadTable = sinkTableMap.values().stream().findFirst();
    } else {
        // TODO: another table full name map
        insteadTable =
                Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
    }
    if (insteadTable.isPresent()) {
        catalogTable = insteadTable.get();
    }
    SeaTunnelSink<?, ?, ?, ?> sink =
            FactoryUtil.createAndPrepareSink(
                    catalogTable, readonlyConfig, classLoader, factoryId);
    sink.setJobContext(jobConfig.getJobContext());
    SinkConfig actionConfig =
            new SinkConfig(catalogTable.getTableId().toTablePath().toString());
    long id = idGenerator.getNextId();
    String actionName =
            JobConfigParser.createSinkActionName(
                    configIndex, factoryId, actionConfig.getMultipleRowTableId());
    SinkAction<?, ?, ?, ?> sinkAction =
            new SinkAction<>(
                    id,
                    actionName,
                    new ArrayList<>(inputActions),
                    sink,
                    factoryUrls,
                    actionConfig);
    if (!isStartWithSavePoint) {
        handleSaveMode(sink);
    }
    sinkAction.setParallelism(parallelism);
    return sinkAction;
}

public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
    if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
        SupportSaveMode saveModeSink = (SupportSaveMode) sink;
        try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
            saveModeHandler.handleSaveMode();
        } catch (Exception e) {
            throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
        }
    }
}

...