THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method
The code in the MultipleTableJobConfigParser classthe SinkExecuteProcessor and MultipleTableJobConfigParser class
Code Block | ||
---|---|---|
| ||
@Override
public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
throws TaskExecuteException {
DataStream<Row> input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo(
(SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
saveModeHandler.handleSaveMode();
} catch (Exception e) {
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
DataStreamSink<Row> dataStreamSink =
stream.sinkTo(new FlinkSink<>(seaTunnelSink))
.name(seaTunnelSink.getPluginName());
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
} |
Code Block | ||
---|---|---|
| ||
private SinkAction<?, ?, ?, ?> createSinkAction( CatalogTable catalogTable, Map<TablePath, CatalogTable> sinkTableMap, Set<Action> inputActions, ReadonlyConfig readonlyConfig, ClassLoader classLoader, Set<URL> factoryUrls, String factoryId, int parallelism, int configIndex) { Optional<CatalogTable> insteadTable; if (sinkTableMap.size() == 1) { insteadTable = sinkTableMap.values().stream().findFirst(); } else { // TODO: another table full name map insteadTable = Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath())); } if (insteadTable.isPresent()) { catalogTable = insteadTable.get(); } SeaTunnelSink<?, ?, ?, ?> sink = FactoryUtil.createAndPrepareSink( catalogTable, readonlyConfig, classLoader, factoryId); sink.setJobContext(jobConfig.getJobContext()); SinkConfig actionConfig = new SinkConfig(catalogTable.getTableId().toTablePath().toString()); long id = idGenerator.getNextId(); String actionName = JobConfigParser.createSinkActionName( configIndex, factoryId, actionConfig.getMultipleRowTableId()); SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>( id, actionName, new ArrayList<>(inputActions), sink, factoryUrls, actionConfig); if (!isStartWithSavePoint) { handleSaveMode(sink); } sinkAction.setParallelism(parallelism); return sinkAction; } public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) { if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) { SupportSaveMode saveModeSink = (SupportSaveMode) sink; try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) { saveModeHandler.handleSaveMode(); } catch (Exception e) { throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); } } } |
...