Issues Address
https://github.com/apache/seatunnel/issues/5390
Motivation
The table structure and data of the target end are processed before the synchronization task. Currently, this function is not available.
Process
draw.io Board Diagram |
---|
border | true |
---|
| |
---|
diagramName | SaveMode Function Process |
---|
simpleViewer | false |
---|
width | |
---|
links | auto |
---|
tbstyle | top |
---|
lbox | true |
---|
diagramWidth | 606 |
---|
revision | 2 |
---|
|
Public Interfaces
org.apache.seatunnel.api.sink.SupportSaveMode
SupportSaveMode is an interface that Sink needs to implement if the SaveMode function is to be implemented. The interface defines an interface to obtain the saveMode actuator
Code Block |
---|
|
package org.apache.seatunnel.api.sink; |
...
/** The Sink Connectors which support schema and data SaveMode should implement this interface */ |
...
public interface SupportSaveMode { |
...
String DATA_SAVE_MODE_KEY = "data_save_mode"; |
...
String SCHEMA_SAVE_MODE_KEY = "schema_save_mode"; |
...
// This method defines the return of a specific save_mode handler |
...
SaveModeHandler getSaveModeHandler(); |
...
saveMode includes table structure saveMode and existing data saveMode. The different processing methods of respectively by two enumeration said org. Apacheapache. Seatunnelseatunnel. APIapi. Sinksink.SchemaSaveMode and org. Apacheapache. Seatunnelseatunnel. APIapi. Sinksink.DataSaveMode
Code Block |
---|
|
package org.apache.seatunnel.api.sink; |
...
public enum SchemaSaveMode { |
...
// Will create when the table does not exist, delete and rebuild when the table is saved |
...
...
// Will Created when the table does not exist, skipped when the table is saved |
...
CREATE_SCHEMA_WHEN_NOT_EXIST, |
...
// Error will be reported when the table does not exist |
...
ERROR_WHEN_SCHEMA_NOT_EXIST, |
...
Code Block |
---|
|
package org.apache.seatunnel.api.sink; |
...
...
* The SaveMode for the Sink connectors that use table or other table structures to organize data |
...
...
public enum DataSaveMode { |
...
// Preserve database structure and delete data |
...
...
// Preserve database structure, preserve data |
...
...
// User defined processing |
...
...
// When there is data, an error is reported |
...
...
org.apache.seatunnel.api.sink.SaveModeHandler
SaveModeHandler is an executor that executes saveMode functions. This interface is used to implement the specific logic of saveMode functions.
Code Block |
---|
|
package org.apache.seatunnel.api.sink; |
...
public interface SaveModeHandler extends AutoCloseable { |
...
void handleSchemaSaveMode(); |
...
void handleDataSaveMode(); |
...
default void handleSaveMode() { |
...
...
...
...
SaveModeHandler interface has a default implementation org. Apacheapache. Seatunnelseatunnel. APIapi. Sinksink.DefaultSaveModeHandler
Code Block |
---|
|
package org.apache.seatunnel.api.sink; |
...
...
public class DefaultSaveModeHandler implements SaveModeHandler { |
...
public SchemaSaveMode schemaSaveMode; |
...
public DataSaveMode dataSaveMode; |
...
...
public TablePath tablePath; |
...
public CatalogTable catalogTable; |
...
...
public DefaultSaveModeHandler( |
...
SchemaSaveMode schemaSaveMode, |
...
DataSaveMode dataSaveMode, |
...
...
CatalogTable catalogTable, |
...
...
...
...
...
...
catalogTable.getTableId().toTablePath(), |
...
...
...
...
...
public void handleSchemaSaveMode() { |
...
switch (schemaSaveMode) { |
...
...
...
...
case CREATE_SCHEMA_WHEN_NOT_EXIST: |
...
createSchemaWhenNotExist(); |
...
...
case ERROR_WHEN_SCHEMA_NOT_EXIST: |
...
errorWhenSchemaNotExist(); |
...
...
...
throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode); |
...
...
...
...
public void handleDataSaveMode() { |
...
...
case KEEP_SCHEMA_DROP_DATA: |
...
...
...
case KEEP_SCHEMA_AND_DATA: |
...
...
...
...
...
...
case ERROR_WHEN_DATA_EXISTS: |
...
...
...
...
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode); |
...
...
...
protected void recreateSchema() { |
...
...
...
...
...
...
protected void createSchemaWhenNotExist() { |
...
...
...
...
...
protected void errorWhenSchemaNotExist() { |
...
...
throw new SeaTunnelRuntimeException(SINK_TABLE_NOT_EXIST, "The sink table not exist"); |
...
...
...
protected void keepSchemaDropData() { |
...
...
...
...
...
protected void keepSchemaAndData() {} |
...
protected void customProcessing() { |
...
...
...
protected void errorWhenDataExists() { |
...
...
throw new SeaTunnelRuntimeException( |
...
SOURCE_ALREADY_HAS_DATA, "The target data source already has data"); |
...
...
...
protected boolean tableExists() { |
...
return catalog.tableExists(tablePath); |
...
...
protected void dropTable() { |
...
catalog.dropTable(tablePath, true); |
...
...
protected void createTable() { |
...
catalog.createTable(tablePath, catalogTable, true); |
...
...
protected void truncateTable() { |
...
catalog.truncateTable(tablePath, true); |
...
...
protected boolean dataExists() { |
...
return catalog.isExistsData(tablePath); |
...
...
protected void executeCustomSql() { |
...
catalog.executeSql(customSql); |
...
...
...
public void close() throws Exception { |
...
...
...
Code execution time
The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method
The code in the the SinkExecuteProcessor and MultipleTableJobConfigParser class
...
...
...
...
...
@Override
public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
|
...
...
...
...
...
...
...
DataStream<Row> input = upstreamDataStreams.get(0);
|
...
...
...
...
...
...
...
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
|
...
...
...
...
...
...
...
...
...
...
...
...
...
fromSourceTable(sinkConfig). |
...
...
...
...
...
seaTunnelSink.setTypeInfo(
|
...
...
...
...
...
...
...
...
...
(SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
|
...
if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) |
...
...
...
...
...
...
...
...
...
...
...
...
try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
|
...
saveModeHandler.handleSaveMode();
|
...
...
...
...
...
...
...
...
...
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
|
...
...
...
...
...
...
...
...
}
DataStreamSink<Row> dataStreamSink =
|
...
...
...
...
...
...
...
...
...
...
...
FlinkSink<>(seaTunnelSink))
|
...
.name(seaTunnelSink.getPluginName());
|
...
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism |
...
= sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
|
...
Code Block |
---|
|
private SinkAction<?, ?, |
...
...
?> createSinkAction(
CatalogTable catalogTable,
|
...
Map<TablePath, CatalogTable> sinkTableMap,
|
...
...
...
Set<Action> inputActions,
|
...
...
ReadonlyConfig readonlyConfig,
|
...
...
ClassLoader classLoader,
Set<URL> factoryUrls,
|
...
...
...
...
...
...
...
...
...
...
Optional<CatalogTable> insteadTable;
if (sinkTableMap.size() == 1) {
insteadTable = sinkTableMap.values().stream().findFirst();
} else {
// TODO: another table full name map
insteadTable =
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
}
if (insteadTable.isPresent()) {
catalogTable = insteadTable.get();
}
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(
catalogTable, readonlyConfig, classLoader, factoryId);
sink.setJobContext(jobConfig.getJobContext());
SinkConfig actionConfig =
new SinkConfig(catalogTable.getTableId().toTablePath().toString());
long id = idGenerator.getNextId();
String actionName =
JobConfigParser.createSinkActionName(
configIndex, factoryId, actionConfig.getMultipleRowTableId());
SinkAction<?, ?, ?, ?> sinkAction =
new SinkAction<>(
id,
actionName,
new ArrayList<>(inputActions),
sink,
factoryUrls,
actionConfig);
if (!isStartWithSavePoint) {
handleSaveMode(sink);
}
sinkAction.setParallelism(parallelism);
return sinkAction;
}
public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
saveModeHandler.handleSaveMode();
} catch (Exception e) {
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
} |
Taking jdbcsink as an example
Code Block |
---|
|
@Override
public DefaultSaveModeHandler getSaveModeHandler() {
if (catalogTable == null) {
return null;
}
Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS);
if (catalogOptions == null) {
return null;
}
String factoryId = catalogOptions.get(CommonOptions.FACTORY_ID.key());
if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
return null;
}
CatalogFactory catalogFactory =
discoverFactory(
Thread.currentThread().getContextClassLoader(),
CatalogFactory.class,
factoryId);
if (catalogFactory == null) {
return null;
}
FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
String fieldIde =
fieldIdeEnum == null ? FieldIdeEnum.ORIGINAL.getValue() : fieldIdeEnum.getValue();
TablePath tablePath =
TablePath.of(
catalogTable.getTableId().getDatabaseName(),
catalogTable.getTableId().getSchemaName(),
CatalogUtils.quoteTableIdentifier(
catalogTable.getTableId().getTableName(), fieldIde));
Catalog catalog =
catalogFactory.createCatalog(
catalogFactory.factoryIdentifier(),
ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
catalog.open();
return new DefaultSaveModeHandler(
schemaSaveMode,
dataSaveMode,
catalog,
|
...
...
...
...
...
...
...
...
...
...
...
config.get(JdbcOptions.CUSTOM_SQL));
|
...
Automatic table building library name and table name building scheme
...
The table attribute in sink is explained here: The value of the table attribute in sink will be used as the name of the final automatic table creation. The rules are:
- ●If the table attribute contains ${table_name} and you want to replace it, for example, the table name in source is table001, then the final table name created based on the above example is test_table001_001002
- ●If ${table_name} is not included, you do not need to replace the table name, and finally replace the table name with the value in the table attribute:
The database attribute databaseName attribute in sink also complies with this rule.