THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
private SinkAction<?, ?, ?, ?> createSinkAction( CatalogTable catalogTable, Map<TablePath, CatalogTable> sinkTableMap, Set<Action> inputActions, ReadonlyConfig readonlyConfig, ClassLoader classLoader, Set<URL> factoryUrls, String factoryId, int parallelism, int configIndex) { Optional<CatalogTable> insteadTable; if (sinkTableMap.size() == 1) { insteadTable = sinkTableMap.values().stream().findFirst(); } else { // TODO: another table full name map insteadTable = Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath())); } if (insteadTable.isPresent()) { catalogTable = insteadTable.get(); } SeaTunnelSink<?, ?, ?, ?> sink = FactoryUtil.createAndPrepareSink( catalogTable, readonlyConfig, classLoader, factoryId); sink.setJobContext(jobConfig.getJobContext()); SinkConfig actionConfig = new SinkConfig(catalogTable.getTableId().toTablePath().toString()); long id = idGenerator.getNextId(); String actionName = JobConfigParser.createSinkActionName( configIndex, factoryId, actionConfig.getMultipleRowTableId()); SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>( id, actionName, new ArrayList<>(inputActions), sink, factoryUrls, actionConfig); if (!isStartWithSavePoint) { handleSaveMode(sink); } sinkAction.setParallelism(parallelism); return sinkAction; } public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) { if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) { SupportSaveMode saveModeSink = (SupportSaveMode) sink; try (SaveModeHandler saveModeHandler = saveModeSink.getSaveModeHandler()) { saveModeHandler.handleSaveMode(); } catch (Exception e) { throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); } } } |
Taking jdbcsink as an example
Code Block | ||
---|---|---|
| ||
@Override
public DefaultSaveModeHandler getSaveModeHandler() {
if (catalogTable == null) {
return null;
}
Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS);
if (catalogOptions == null) {
return null;
}
String factoryId = catalogOptions.get(CommonOptions.FACTORY_ID.key());
if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
return null;
}
CatalogFactory catalogFactory =
discoverFactory(
Thread.currentThread().getContextClassLoader(),
CatalogFactory.class,
factoryId);
if (catalogFactory == null) {
return null;
}
FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
String fieldIde =
fieldIdeEnum == null ? FieldIdeEnum.ORIGINAL.getValue() : fieldIdeEnum.getValue();
TablePath tablePath =
TablePath.of(
catalogTable.getTableId().getDatabaseName(),
catalogTable.getTableId().getSchemaName(),
CatalogUtils.quoteTableIdentifier(
catalogTable.getTableId().getTableName(), fieldIde));
Catalog catalog =
catalogFactory.createCatalog(
catalogFactory.factoryIdentifier(),
ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
catalog.open();
return new DefaultSaveModeHandler(
schemaSaveMode,
dataSaveMode,
catalog,
tablePath,
catalogTable,
config.get(JdbcOptions.CUSTOM_SQL));
} |
Automatic table building library name and table name building scheme
...