...
The table structure and data of the target end are processed before the synchronization task. Currently, this function is not available.
Process
draw.io Board Diagram border true diagramName SaveMode Function Process simpleViewer false width links auto tbstyle top lbox true diagramWidth 606 revision 2
Public Interfaces
org.apache.seatunnel.api.sink.SupportSaveMode
SupportSaveMode is an interface that Sink needs to implement if the SaveMode function is to be implemented. The interface defines an interface to obtain the saveMode actuator
...
The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method
The code in the the SinkExecuteProcessor and MultipleTableJobConfigParser class
Code Block | ||
---|---|---|
| ||
private SinkAction<?, ?, ?, ?> createSinkAction( @Override public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) CatalogTable catalogTable, Map<TablePath, CatalogTable> sinkTableMap, throws TaskExecuteException { Set<Action> inputActions, DataStream<Row> input = upstreamDataStreams.get(0); ReadonlyConfig readonlyConfig, for (int i = 0; ClassLoaderi classLoader, < plugins.size(); i++) { Set<URL> factoryUrls, Config sinkConfig String factoryId,= pluginConfigs.get(i); int parallelism, SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = int configIndex) { Optional<CatalogTable> insteadTable; if (sinkTableMapplugins.sizeget(i); == 1) { DataStream<Row> insteadTablestream = sinkTableMap.valuesfromSourceTable(sinkConfig).stream().findFirst(orElse(input); } else { seaTunnelSink.setTypeInfo( // TODO: another table full name map insteadTable = (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType())); Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePathif (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())); { } if (insteadTable.isPresent()) { SupportSaveMode saveModeSink catalogTable = insteadTable.get(SupportSaveMode) seaTunnelSink; } SeaTunnelSink<?, ?, ?, ?> sink = try (SaveModeHandler saveModeHandler FactoryUtil.createAndPrepareSink(= saveModeSink.getSaveModeHandler()) { catalogTable, readonlyConfig, classLoader, factoryIdsaveModeHandler.handleSaveMode(); sink.setJobContext(jobConfig.getJobContext()); SinkConfig actionConfig = } catch (Exception new SinkConfig(catalogTable.getTableId().toTablePath().toString());e) { long id = idGenerator.getNextId(); String actionName = throw new JobConfigParser.createSinkActionName(SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); } configIndex, factoryId, actionConfig.getMultipleRowTableId()); SinkAction<?, ?, ?,} ?> sinkAction = DataStreamSink<Row> dataStreamSink = new SinkAction<>( id,stream.sinkTo(new FlinkSink<>(seaTunnelSink)) actionName, .name(seaTunnelSink.getPluginName()); new ArrayList<>(inputActions),if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism sink,= sinkConfig.getInt(CommonOptions.PARALLELISM.key()); factoryUrls,dataStreamSink.setParallelism(parallelism); } actionConfig); if (!isStartWithSavePoint) {} |
Code Block | ||
---|---|---|
| ||
private SinkAction<?, ?, ?, ?> createSinkAction(
CatalogTable catalogTable,
Map<TablePath, CatalogTable> sinkTableMap,
Set<Action> inputActions,
ReadonlyConfig readonlyConfig,
ClassLoader classLoader,
Set<URL> factoryUrls,
String factoryId,
int parallelism,
int configIndex) {
Optional<CatalogTable> insteadTable;
if (sinkTableMap.size() == 1) {
insteadTable = sinkTableMap.values().stream().findFirst();
} else {
// TODO: another table full name map
insteadTable =
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
}
if (insteadTable.isPresent()) {
catalogTable = insteadTable.get();
}
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(
catalogTable, readonlyConfig, classLoader, factoryId);
sink.setJobContext(jobConfig.getJobContext());
SinkConfig actionConfig =
new SinkConfig(catalogTable.getTableId().toTablePath().toString());
long id = idGenerator.getNextId();
String actionName =
JobConfigParser.createSinkActionName(
configIndex, factoryId, actionConfig.getMultipleRowTableId());
SinkAction<?, ?, ?, ?> sinkAction =
new SinkAction<>(
id,
actionName,
new ArrayList<>(inputActions),
sink,
factoryUrls,
actionConfig);
if (!isStartWithSavePoint) {
handleSaveMode(sink);
}
sinkAction.setParallelism(parallelism);
return sinkAction;
}
public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
saveModeHandler.handleSaveMode();
} catch (Exception e) {
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
} |
Taking jdbcsink as an example
Code Block | ||
---|---|---|
| ||
@Override public DefaultSaveModeHandler getSaveModeHandler() { if (catalogTable == null) { return null; } Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS); if (catalogOptions == null) { return null; } String factoryId = catalogOptions.get(CommonOptions.FACTORY_ID.key()); if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) { return null; } CatalogFactory catalogFactory = discoverFactory( Thread.currentThread().getContextClassLoader(), CatalogFactory.class, factoryId); if (catalogFactory == null) { return null; } FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE); String fieldIde = fieldIdeEnum == null ? FieldIdeEnum.ORIGINAL.getValue() : fieldIdeEnum.getValue(); TablePath tablePath = TablePath.of( catalogTable.getTableId().getDatabaseName(), catalogTable.getTableId().getSchemaName(), CatalogUtils.quoteTableIdentifier( catalogTable.getTableId().getTableName(), fieldIde)); Catalog catalog = catalogFactory.createCatalog( catalogFactory.factoryIdentifier(), ReadonlyConfig.fromMap(new HashMap<>(catalogOptions))); handleSaveModecatalog.open(sink); return new }DefaultSaveModeHandler( sinkAction.setParallelism(parallelism); return sinkAction; } public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) { schemaSaveMode, if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) { SupportSaveMode saveModeSinkdataSaveMode, = (SupportSaveMode) sink; try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) { catalog, saveModeHandler.handleSaveMode(); tablePath, } catch (Exception e) { catalogTable, throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); }config.get(JdbcOptions.CUSTOM_SQL)); } } |
Automatic table building library name and table name building scheme
...