You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Issues Address

https://github.com/apache/seatunnel/issues/5390

Motivation

The table structure and data of the target end are processed before the synchronization task. Currently, this function is not available.

Public Interfaces

org.apache.seatunnel.api.sink.SupportSaveMode
SupportSaveMode is an interface that Sink needs to implement if the SaveMode function is to be implemented. The interface defines an interface to obtain the saveMode actuator

package org.apache.seatunnel.api.sink;

/** The Sink Connectors which support schema and data SaveMode should implement this interface */
public interface SupportSaveMode {

String DATA_SAVE_MODE_KEY = "data_save_mode";

String SCHEMA_SAVE_MODE_KEY = "schema_save_mode";

// This method defines the return of a specific save_mode handler
SaveModeHandler getSaveModeHandler();
}

saveMode includes table structure saveMode and existing data saveMode. The different processing methods of respectively by two enumeration said org. Apache. Seatunnel. API. Sink. SchemaSaveMode and org. Apache. Seatunnel. API. Sink. DataSaveMode

package org.apache.seatunnel.api.sink;

public enum SchemaSaveMode {

// Will create when the table does not exist, delete and rebuild when the table is saved
RECREATE_SCHEMA,

// Will Created when the table does not exist, skipped when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST,

// Error will be reported when the table does not exist
ERROR_WHEN_SCHEMA_NOT_EXIST,
}
package org.apache.seatunnel.api.sink;

/**
* The SaveMode for the Sink connectors that use table or other table structures to organize data
*/
public enum DataSaveMode {

// Preserve database structure and delete data
KEEP_SCHEMA_DROP_DATA,

// Preserve database structure, preserve data
KEEP_SCHEMA_AND_DATA,

// User defined processing
CUSTOM_PROCESSING,

// When there is data, an error is reported
ERROR_WHEN_DATA_EXISTS
}

org.apache.seatunnel.api.sink.SaveModeHandler
SaveModeHandler is an executor that executes saveMode functions. This interface is used to implement the specific logic of saveMode functions.

package org.apache.seatunnel.api.sink;

public interface SaveModeHandler extends AutoCloseable {

void handleSchemaSaveMode();

void handleDataSaveMode();

default void handleSaveMode() {
handleSchemaSaveMode();
handleDataSaveMode();
}
}


SaveModeHandler interface has a default implementation org. Apache. Seatunnel. API. Sink. DefaultSaveModeHandler

package org.apache.seatunnel.api.sink;

@AllArgsConstructor
public class DefaultSaveModeHandler implements SaveModeHandler {

public SchemaSaveMode schemaSaveMode;
public DataSaveMode dataSaveMode;
public Catalog catalog;
public TablePath tablePath;
public CatalogTable catalogTable;
public String customSql;

public DefaultSaveModeHandler(
SchemaSaveMode schemaSaveMode,
DataSaveMode dataSaveMode,
Catalog catalog,
CatalogTable catalogTable,
String customSql) {
this(
schemaSaveMode,
dataSaveMode,
catalog,
catalogTable.getTableId().toTablePath(),
catalogTable,
customSql);
}

@Override
public void handleSchemaSaveMode() {
switch (schemaSaveMode) {
case RECREATE_SCHEMA:
recreateSchema();
break;
case CREATE_SCHEMA_WHEN_NOT_EXIST:
createSchemaWhenNotExist();
break;
case ERROR_WHEN_SCHEMA_NOT_EXIST:
errorWhenSchemaNotExist();
break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode);
}
}

@Override
public void handleDataSaveMode() {
switch (dataSaveMode) {
case KEEP_SCHEMA_DROP_DATA:
keepSchemaDropData();
break;
case KEEP_SCHEMA_AND_DATA:
keepSchemaAndData();
break;
case CUSTOM_PROCESSING:
customProcessing();
break;
case ERROR_WHEN_DATA_EXISTS:
errorWhenDataExists();
break;
default:
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);
}
}

protected void recreateSchema() {
if (tableExists()) {
dropTable();
}
createTable();
}

protected void createSchemaWhenNotExist() {
if (!tableExists()) {
createTable();
}
}

protected void errorWhenSchemaNotExist() {
if (!tableExists()) {
throw new SeaTunnelRuntimeException(SINK_TABLE_NOT_EXIST, "The sink table not exist");
}
}

protected void keepSchemaDropData() {
if (tableExists()) {
truncateTable();
}
}

protected void keepSchemaAndData() {}

protected void customProcessing() {
executeCustomSql();
}

protected void errorWhenDataExists() {
if (dataExists()) {
throw new SeaTunnelRuntimeException(
SOURCE_ALREADY_HAS_DATA, "The target data source already has data");
}
}

protected boolean tableExists() {
return catalog.tableExists(tablePath);
}

protected void dropTable() {
catalog.dropTable(tablePath, true);
}

protected void createTable() {
catalog.createTable(tablePath, catalogTable, true);
}

protected void truncateTable() {
catalog.truncateTable(tablePath, true);
}

protected boolean dataExists() {
return catalog.isExistsData(tablePath);
}

protected void executeCustomSql() {
catalog.executeSql(customSql);
}

@Override
public void close() throws Exception {
catalog.close();
}
}

Code execution time


The processing of SaveMode should be done before the job starts, so in SeaTunnel Zeta engine you can get saveModeHandler after building Sink, and then call handleSaveMode() method

The code in the MultipleTableJobConfigParser class

private SinkAction<?, ?, ?, ?> createSinkAction(
CatalogTable catalogTable,
Map<TablePath, CatalogTable> sinkTableMap,
Set<Action> inputActions,
ReadonlyConfig readonlyConfig,
ClassLoader classLoader,
Set<URL> factoryUrls,
String factoryId,
int parallelism,
int configIndex) {
Optional<CatalogTable> insteadTable;
if (sinkTableMap.size() == 1) {
insteadTable = sinkTableMap.values().stream().findFirst();
} else {
// TODO: another table full name map
insteadTable =
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
}
if (insteadTable.isPresent()) {
catalogTable = insteadTable.get();
}
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(
catalogTable, readonlyConfig, classLoader, factoryId);
sink.setJobContext(jobConfig.getJobContext());
SinkConfig actionConfig =
new SinkConfig(catalogTable.getTableId().toTablePath().toString());
long id = idGenerator.getNextId();
String actionName =
JobConfigParser.createSinkActionName(
configIndex, factoryId, actionConfig.getMultipleRowTableId());
SinkAction<?, ?, ?, ?> sinkAction =
new SinkAction<>(
id,
actionName,
new ArrayList<>(inputActions),
sink,
factoryUrls,
actionConfig);
if (!isStartWithSavePoint) {
handleSaveMode(sink);
}
sinkAction.setParallelism(parallelism);
return sinkAction;
}

public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) {
saveModeHandler.handleSaveMode();
} catch (Exception e) {
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
}

Automatic table building library name and table name building scheme


The table attribute in sink is explained here: The value of the table attribute in sink will be used as the name of the final automatic table creation. The rules are:

  • If the table attribute contains ${table_name} and you want to replace it, for example, the table name in source is table001, then the final table name created based on the above example is test_table001_001
  • If ${table_name} is not included, you do not need to replace the table name, and finally replace the table name with the value in the table attribute:

The database attribute in sink also complies with this rule.

  • No labels