...
The table structure and data of the target end are processed before the synchronization task. Currently, this function is not available.
Process
draw.io Board Diagram border true diagramName SaveMode Function Process simpleViewer false width links auto tbstyle top lbox true diagramWidth 606 revision 2
Public Interfaces
org.apache.seatunnel.api.sink.SupportSaveMode
SupportSaveMode is an interface that Sink needs to implement if the SaveMode function is to be implemented. The interface defines an interface to obtain the saveMode actuator
...
The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method
The code in the the SinkExecuteProcessor and MultipleTableJobConfigParser class
Code Block | ||
---|---|---|
| ||
private SinkAction<?, ?, ?, ?> createSinkAction( @Override public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) CatalogTable catalogTable, Map<TablePath, CatalogTable> sinkTableMap, throws TaskExecuteException { Set<Action> inputActions, DataStream<Row> input = upstreamDataStreams.get(0); ReadonlyConfig readonlyConfig, for (int i = 0; ClassLoaderi classLoader, < plugins.size(); i++) { Set<URL> factoryUrls, Config sinkConfig String factoryId,= pluginConfigs.get(i); int parallelism, SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = int configIndex) { Optional<CatalogTable> insteadTable; if (sinkTableMapplugins.sizeget(i); == 1) { DataStream<Row> insteadTablestream = sinkTableMap.valuesfromSourceTable(sinkConfig).stream().findFirst(orElse(input); } else { seaTunnelSink.setTypeInfo( // TODO: another table full name map insteadTable = (SeaTunnelRowType) TypeConverterUtils.convert(stream.getType())); Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePathif (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())); { } if (insteadTable.isPresent()) { SupportSaveMode saveModeSink catalogTable = insteadTable.get(SupportSaveMode) seaTunnelSink; } SeaTunnelSink<?, ?, ?, ?> sink = try (SaveModeHandler saveModeHandler FactoryUtil.createAndPrepareSink(= saveModeSink.getSaveModeHandler()) { catalogTable, readonlyConfig, classLoader, factoryIdsaveModeHandler.handleSaveMode(); sink.setJobContext(jobConfig.getJobContext() } catch (Exception e) { throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); } } DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)) .name(seaTunnelSink.getPluginName()); if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key()); dataStreamSink.setParallelism(parallelism); } } |
Code Block | ||
---|---|---|
| ||
private SinkAction<?, ?, ?, ?> createSinkAction(
CatalogTable catalogTable,
Map<TablePath, CatalogTable> sinkTableMap,
Set<Action> inputActions,
ReadonlyConfig readonlyConfig,
ClassLoader classLoader,
Set<URL> factoryUrls,
String factoryId,
int parallelism,
int configIndex) {
Optional<CatalogTable> insteadTable;
if (sinkTableMap.size() == 1) {
insteadTable = sinkTableMap.values().stream().findFirst();
} else {
// TODO: another table full name map
insteadTable =
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
}
if (insteadTable.isPresent()) {
catalogTable = insteadTable.get();
}
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(
catalogTable, readonlyConfig, classLoader, factoryId);
sink.setJobContext(jobConfig.getJobContext());
SinkConfig actionConfig =
new SinkConfig(catalogTable.getTableId().toTablePath().toString());
long id = idGenerator.getNextId();
String actionName =
JobConfigParser.createSinkActionName(
configIndex, factoryId, actionConfig.getMultipleRowTableId());
SinkAction<?, ?, ?, ?> sinkAction =
new SinkAction<>(
id,
actionName,
new ArrayList<>(inputActions),
sink,
factoryUrls,
actionConfig);
if (!isStartWithSavePoint) {
handleSaveMode(sink);
}
sinkAction.setParallelism(parallelism);
return sinkAction;
}
public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
saveModeHandler.handleSaveMode();
} catch (Exception e) {
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
} |
Taking jdbcsink as an example
Code Block | ||
---|---|---|
| ||
@Override public DefaultSaveModeHandler getSaveModeHandler() { if (catalogTable == null) { return null; } Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS); if (catalogOptions == null) { return null; } String factoryId = catalogOptions.get(CommonOptions.FACTORY_ID.key()); if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) { return null; } CatalogFactory catalogFactory = discoverFactory( Thread.currentThread().getContextClassLoader(), CatalogFactory.class, factoryId); if (catalogFactory == null) { return null; } FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE); SinkConfig actionConfig String fieldIde = new SinkConfig(catalogTable.getTableId().toTablePath().toString()); fieldIdeEnum == null ? FieldIdeEnum.ORIGINAL.getValue() : fieldIdeEnum.getValue(); long id = idGenerator.getNextId(); String actionName = TablePath tablePath = JobConfigParserTablePath.createSinkActionNameof( configIndex, factoryId, actionConfigcatalogTable.getMultipleRowTableIdgetTableId().getDatabaseName();, SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>( catalogTable.getTableId().getSchemaName(), id, CatalogUtils.quoteTableIdentifier( actionName, catalogTable.getTableId().getTableName(), fieldIde)); Catalog new ArrayList<>(inputActions),catalog = catalogFactory.createCatalog( sink, factoryUrlscatalogFactory.factoryIdentifier(), actionConfig); ifReadonlyConfig.fromMap(new HashMap<>(!isStartWithSavePoint) {catalogOptions))); handleSaveModecatalog.open(sink); } return new sinkAction.setParallelism(parallelism); DefaultSaveModeHandler( return sinkAction; } public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) { schemaSaveMode, if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) { SupportSaveMode saveModeSink = (SupportSaveMode) sink; dataSaveMode, try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) { catalog, saveModeHandler.handleSaveMode(); tablePath, } catch (Exception e) { catalogTable, throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); }config.get(JdbcOptions.CUSTOM_SQL)); } } |
Automatic table building library name and table name building scheme
...
The table attribute in sink is explained here: The value of the table attribute in sink will be used as the name of the final automatic table creation. The rules are:
- ●If the table attribute contains ${table_name} and you want to replace it, for example, the table name in source is table001, then the final table name created based on the above example is test_table001_001002
- ●If ${table_name} is not included, you do not need to replace the table name, and finally replace the table name with the value in the table attribute:
The database attribute databaseName attribute in sink also complies with this rule.