Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/5lp0p5jlvryo6y7r56y2qs2vfwtpxmlrnbf3skopy3trtj37jcovmt6ktcgst4w8
Vote thread
JIRA


Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As discussed above, currently flink HybridSource is released, but it only be used in DataStream. We Need to add sql and table support for many table & sql end users.

...

Add a new built-in hybrid connector. First, In the HybridTableSourceFactory, use 'sources' source-identifiers'(final name could be changed) option to concat ordered some child sources.

...

When child table source instances are ready, we use child table source ScanRuntimeProvider to get the actual child Source(FLIP-27 new Source API)

Finally, we bind child sources to HybridSource.

ddl (normal)

Code Block
languagesql
firstline1
titleDDL example
linenumberstrue
collapsetrue
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='csv,kafka'
);

ddl(with different filed name, it's a feature, may not be implemented finally. need to be discussed)


core options:

source-identifiers:Use comma delimiter identifier string to indicate concatenated child sources. It's in order. The boundedness of hybrid source is last child source's boundedness.  (required)

$identifier.option:  The concrete child source options. Because we may use same connectors as connected sources, so we can't use connector name as prefix, so use user-specific identifier as child source option prefix. it will be taken-off the identifier to get real option and pass to concrete child sources.(required)

note:  we use [A-Za-z0-9_]+ as child source identifier pattern.

switched-start-position-enabled: Currently, the DataStream API expose two start ways about next source. Use this option to enable the Switched-Start-Position(default is false).  if it's false, the hybrid will use Fixed-Start-Position. The default is Fixed-Start-Position.


1.ddl (boundedness, just use 2 filesystem sources to explain it)

Code Block
languagesql
firstline1
Code Block
languagesql
firstline1
titleDDL example
linenumberstrue
collapsetrue
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sourcessource-identifiers'='csvhistorical,kafkarealtime',
 'schema-field-mappingshistorical.connector'='[{"f0":"A","f1":"B"},{}]'
);

csv acutal data names: A,B,f2

kafka acutal data names: f0,f1,f2

it means csv column is A,B we match them to the ddl fields.  kafka column is f0,f1,f2, no need to match.

user can use kafka acutal data names to be ddl fields or csv field names or other cases.

options:

sources:Use comma delimiter indicate child sources that need to be concatenated. it's in order. The boundedness of hybrid source is last child source's boundedness.

schema-field-mappings: Use json kv to match the different field names with ddl field (It's an extra feature, the draft pr below show how it implements and works).

Start position conversion:

Currently, the FileSource not expose the end position, we can't use it pass to the next streaming source. detail:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-23633

Actually, by using sql we can definite the next streaming source, for example, we can definite kafka start-position. 

When first batch bounded data read finished, the hybrid source will call to read kafka with given start-position or other start strategy. 

Prototype implementation

HybridTableSource

filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='filesystem'
 'realtime.path' = '/tmp/a.csv',
 'realtime.format' = 'csv'
);


table api (boundedness, just use 2 filesystem sources to explain it)

Code Block
languagejava
firstline1
titleTable API
linenumberstrue
collapsetrue
TableDescriptor tableDescriptor =
                TableDescriptor.forConnector("hybrid")
                 
Code Block
languagejava
firstline1
titleHybridTableSource
linenumberstrue
collapsetrue
public class HybridTableSource implements ScanTableSource {
    public HybridTableSource(
       .schema(
     String tableName,
            @Nonnull List<Source<RowData, ?, ?>> childSources,
          Schema.newBuilder()
  Configuration configuration,
            ResolvedSchema tableSchema) {
        this.tableName = tableName;
               .column("f0", DataTypes.STRING())
               this.tableSchema = tableSchema;
        this.childSources = childSources;
        this.configuration = configuration;
    }
}

HybridTableSource bind accepted child sources with given order to final HybridSource.

HybridTableSourceFactory

Code Block
languagejava
firstline1
titleHybridTableSourceFactory
linenumberstrue
collapsetrue
public class HybridTableSourceFactory implements DynamicTableSourceFactory {
   public DynamicTableSource createDynamicTableSource(Context context) {
.column("f1", DataTypes.STRING())
                      // core logic for creating HybridTableSource
   }
}

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

Code Block
languagejava
firstline1
titleHybridTableSourceFactory
linenumberstrue
collapsetrue
public class HybridConnectorOptions {

    public static final String SOURCE_DELIMITER = ".";
.column("f2", DataTypes.BIGINT())
    public static final ConfigOption<String> SOURCES =
            ConfigOptions.key("sources")
                    .stringTypebuild())
                        .noDefaultValue()
option("source-identifiers", "historical,realtime")
                        .withDescription(option("historical.connector", "filesystem")
                        .option("historical.path", "/tmp/a.csv")
   "Use comma delimiter indicate child sources that need to be concatenated. e.g. sources='csv,kafka'");

    public static final ConfigOption<String> OPTIONAL_SCHEMA_FIELD_MAPPINGS =
            ConfigOptions.key("schema-field-mappings")
.option("historical.format", "csv")
                        .stringType(option("realtime.connector", "filesystem")
                    .noDefaultValue()
    .option("realtime.path", "/tmp/b.csv")
               .withDescription(
         .option("realtime.format", "csv")
                    "Use json kv to match the different field names with ddl field. e.g. '[{\"f0\":\"A\"},{}]' it means the " .build();
        tEnv.createTable("hybrid_source", tableDescriptor);

        Table table                           + "first child source column A is match to ddl column f0, the second source no matching.");

    private HybridConnectorOptions() {}
}

...

= tEnv.from("hybrid_source").select($("f0"), $("f1"), $("f2"));


2.ddl (unbounded data, use source.monitor-interval specify the second source is unbounded)

Code Block
languagesql
firstline1
titleDDL example
linenumberstrue
collapsetrue
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'source-identifiers'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='filesystem'
 'realtime.path' = '/tmp/a.csv',
 'realtime.format' = 'csv'
 'realtime.source.monitor-interval' = '5s'
);


table api (unbounded data, use source.monitor-interval specify the second source is unbounded)

Code Block
languagejava
firstline1
titleTable API
linenumberstrue
collapsetrue
TableDescriptor tableDescriptor =
                TableDescriptor.forConnector("hybrid")
                        .schema(
                                Schema.newBuilder()
                                        .column("f0", DataTypes.STRING())
                                        .column("f1", DataTypes.STRING())
                                        .column("f2", DataTypes.BIGINT())
                                        .build())
                        .option("source-identifiers", "historical,realtime")
                        .option("historical.connector", "filesystem")
                        .option("historical.path", "/tmp/a.csv")
                        .option("historical.format", "csv")
                        .option("realtime.connector", "filesystem")
                        .option("realtime.path", "/tmp/b.csv")
                        .option("realtime.format", "csv")
						.option("realtime.source.monitor-interval", "5s")
                        .build();
        tEnv.createTable("hybrid_source", tableDescriptor);

        Table table = tEnv.from("hybrid_source").select($("f0"), $("f1"), $("f2"));


3.ddl(normal scenario, with kafka)

Code Block
languagesql
firstline1
titleDDL example
linenumberstrue
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'source-identifiers'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'test',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv' 
);


table api(normal scenario, with kafka)

Code Block
languagejava
firstline1
titleTable API
linenumberstrue
TableDescriptor tableDescriptor =
                TableDescriptor.forConnector("hybrid")
                        .schema(
                                Schema.newBuilder()
                                        .column("f0", DataTypes.STRING())
                                        .column("f1", DataTypes.STRING())
                                        .column("f2", DataTypes.BIGINT())
                                        .build())
                        .option("source-identifiers", "historical,realtime")
                        .option("historical.connector", "filesystem")
                        .option("historical.path", "/tmp/a.csv")
                        .option("historical.format", "csv")
                        .option("realtime.connector", "kafka")
                        .option("realtime.topic", "test")
                        .option("realtime.properties.bootstrap.servers", "localhost:9092")
						.option("realtime.group.id", "test")
						.option("realtime.scan.startup.mode", "earliest-offset")
                        .build();
        tEnv.createTable("hybrid_source", tableDescriptor);

        Table table = tEnv.from("hybrid_source").select($("f0"), $("f1"), $("f2"));


4.ddl(with more child sources)

Code Block
languagesql
firstline1
titleDDL example
linenumberstrue
collapsetrue
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'source-identifiers'='historical01,historical02,realtime',
 'historical01.connector'='filesystem'
 'historical01.path' = '/tmp/a.csv',
 'historical01.format' = 'csv',
 'historical02.connector'='filesystem'
 'historical02.path' = '/tmp/a.csv',
 'historical02.format' = 'csv', 
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'testGroup',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv' 
);

table api 

similar with other cases above


Start position conversion:

Currently, the base SplitEnumerator and built-in source not expose the end timestamp, we can't use it pass to the next streaming source. We may need to update the SplitEnumerator interface to add getEndTimestamp() support.

Then we use switched-start-position-enabled optionto choose switched start position or fixed start position. how it works? please the prototype implementation or poc PR below.


switched-start-offset:

Code Block
languagesql
firstline1
titleDDL example
linenumberstrue
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'source-identifiers'='historical,realtime',
 'switched-start-position-enabled'='true'
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'testGroup',
  --'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv' 
);

table api 

similar with other cases above

Introduced API changes And Prototype implementation


First, the FLIP-150 HybridSource support switched start timestamp for next source from previous one. but current all connectors  can not get endTimestamp. So, actually we can not use this function.  So, at this flip we offer this interface.


SupportsGetEndTimestamp

Code Block
languagejava
firstline1
linenumberstrue
/**
 * A decorative interface of {@link SplitEnumerator} which allows to get end timestamp.
 *
 * <p>The split enumerator must implement this interface if it needs to support switched start
 * position in hybrid source scenario and other needed situations.
 */
@PublicEvolving
public interface SupportsGetEndTimestamp {

    /** Get the end timestamp for current source or split enumerator. */
    long getEndTimestamp();
}

SupportsSwitchedStartTimestamp

Code Block
languagejava
firstline1
linenumberstrue
/**
 * Unlike predefined start offset in source, this interface allow to apply a dynamic switched
 * timestamp to a source, then source can use this switched timestamp to re-initialize the start
 * offsets. However, it depends on concrete source implementation. A common scenario is that
 * HybridSource's next source use this ability to switch from previous one.
 */
@PublicEvolving
public interface SupportsSwitchedStartTimestamp {

    /** Apply given switched start timestamp to source. */
    void applySwitchedStartTimestamp(long startTimestamp);
}



we use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to get previous bounded source end timestamp and apply it to next streaming source.

DataStreamScanSourceAbilityProvider


note: Hybrid Source only support new Source, it's corresponding provider is SourceProvider. We extract inner child source from SourceProvider. But currently the KafkaDynamicTableSource use

DataStreamScanProvider which we can't get the inner KafkaSource in compiled phase. So here introduce a DataStreamScanSourceAbilityProvider to let hybrid source can get DataStreamScanProvider inner source.

Connector devs can choose this provider as a bridge provider for DataStreamScanProvider.

Code Block
languagejava
firstline1
linenumberstrue
/**
 * Provider that extends for {@link DataStreamScanProvider} to let DataStreamScanProvider support
 * expose inner source.
 *
 * <p>Usually a {@link SourceProvider} expose {@link Source}, {@link SourceFunctionProvider} expose
 * {@link SourceFunction} and {@link InputFormatProvider} expose {@link InputFormat}, but {@link
 * DataStreamScanProvider} just produce datastream, we can not get the inner source(note this source
 * could be SourceFunction or new Source or InputFormat). But sometimes we need it, a common
 * scenario is HybridSource, we need to extract inner source from provider.
 *
 * @param <T> source type
 */

@PublicEvolving
public interface DataStreamScanSourceAbilityProvider<T> extends DataStreamScanProvider {

    /**
     * Expose DataStreamScanProvider inner source.
     *
     * @return inner source
     */
    T createSource();
}


HybridTableSource(support Switched start position & Fixed start position)

Note: We let HybridTableSource implements SupportsReadingMetadata & SupportsProjectionPushDown. So HybridTableSource can pass SupportsReadingMetadata & SupportsProjectionPushDown  to inner sources.

which means the child sources must implements these 2 interfaces either. We don't support the scenario where one source implements an interface and another does not. it will cause some problems under current source framework.

e.g. First source supports projection push down and next source not. If ddl has 3 fields and select 2 fields, it will cause pushdown. HybridTableSource and first source both have 2 fields in produced datatype, but another source has 3 fields.

more generally speaking, if 2 sources both not SupportsProjectionPushDown(but HybridTableSource supports it), then HybridTableSource produced data type is 2 fields, however inner 2 sources have 3 fields . 


Code Block
languagejava
firstline1
titleHybridTableSource
linenumberstrue
public class HybridTableSource implements ScanTableSource, SupportsReadingMetadata, SupportsProjectionPushDown {      
	
    private final String tableName;
    private final ResolvedSchema tableSchema;
    private final List<ScanTableSource> childTableSources;
    private final Configuration configuration;

    public HybridTableSource(
            String tableName,
            @Nonnull List<ScanTableSource> childTableSources,
            Configuration configuration,
            ResolvedSchema tableSchema) {
        this.tableName = tableName;
        this.tableSchema = tableSchema;
        Preconditions.checkArgument(
                childTableSources.size() >= 2, "child table sources must at least 2 sources.");
        this.childTableSources = childTableSources;
        this.configuration = configuration;
    }

    @Override
    public DynamicTableSource copy() {
        return new HybridTableSource(tableName, childTableSources, configuration, tableSchema);
    }

    @Override
    public String asSummaryString() {
        return "HybridTableSource";
    }

    @Override
    public ChangelogMode getChangelogMode() {
        Set<RowKind> kinds = new HashSet<>();
        for (ScanTableSource childSource : childTableSources) {
            kinds.addAll(childSource.getChangelogMode().getContainedKinds());
        }
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        for (RowKind kind : kinds) {
            builder.addContainedKind(kind);
        }
        return builder.build();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder;
        ScanTableSource firstTableSource = childTableSources.get(0);
        ScanTableSource.ScanRuntimeProvider firstProvider =
                validateAndGetProvider(firstTableSource);
        Source<RowData, ?, ?> firstSource = validateAndGetSource(firstProvider);
        builder = HybridSource.builder(firstSource);

        if (configuration.getBoolean(OPTIONAL_SWITCHED_START_POSITION_ENABLED)) {
            // switched start position
            for (int i = 1; i < childTableSources.size(); i++) {
                ScanTableSource nextTableSource = childTableSources.get(i);
                ScanTableSource.ScanRuntimeProvider nextProvider =
                        validateAndGetProvider(nextTableSource);
                Source<RowData, ?, ?> nextSource = validateAndGetSource(nextProvider);
                Boundedness boundedness = nextSource.getBoundedness();
                final Source<RowData, ?, ?> localCopySource;
                try {
                    localCopySource = InstantiationUtil.clone(nextSource);
                } catch (ClassNotFoundException | IOException e) {
                    throw new IllegalStateException("Unable to clone the hybrid child source.", e);
                }
                // builder#addSource below is a serialized-lambda. if we use nextSource, the
                // lambda captured variables will be HybridTableSource and nextSource
                // while nextSource can be serialized, but HybridTableSource can not be
                // serialized, it will cause serialized exception. So here we do a deepCopy to
                // address it.
                builder.addSource(
                        switchContext -> {
                            SplitEnumerator previousEnumerator =
                                    switchContext.getPreviousEnumerator();
                            // how to get and apply timestamp depends on specific enumerator
                            long switchedTimestamp =
                                    validateAndCastSplitEnumerator(previousEnumerator)
                                            .getEndTimestamp();
                            validateAndCastSource(localCopySource)
                                    .applySwitchedStartTimestamp(switchedTimestamp);
                            return localCopySource;
                        },
                        boundedness);
            }
        } else {
            // fixed start position
            for (int i = 1; i < childTableSources.size(); i++) {
                ScanTableSource.ScanRuntimeProvider provider =
                        validateAndGetProvider(childTableSources.get(i));
                builder.addSource(validateAndGetSource(provider));
            }
        }

        return SourceProvider.of(builder.build());
    }

    @Override
    public Map<String, DataType> listReadableMetadata() {
        final Map<String, DataType> metadataMap = new HashMap<>();
        tableSchema.getColumns().stream()
                .filter(column -> column instanceof Column.MetadataColumn)
                .forEach(
                        column ->
                                metadataMap.put(
                                        ((Column.MetadataColumn) column)
                                                .getMetadataKey()
                                                .orElse(column.getName()),
                                        column.getDataType()));
        return metadataMap;
    }

    @Override
    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        // we pass the ddl metadata fields to each child sources, if child source has this
        // metadata then return its value else metadata value is null.
        for (ScanTableSource childSource : childTableSources) {
            Preconditions.checkState(
                    childSource instanceof SupportsReadingMetadata,
                    "The table source %s must implement "
                            + "SupportsReadingMetadata interface to be used in hybrid source.",
                    childSource.getClass().getName());
            ((SupportsReadingMetadata) childSource)
                    .applyReadableMetadata(metadataKeys, producedDataType);
        }
    }

    @Override
    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
        for (ScanTableSource childSource : childTableSources) {
            Preconditions.checkState(
                    childSource instanceof SupportsProjectionPushDown,
                    "The table source %s must implement "
                            + "SupportsProjectionPushDown interface to be used in hybrid source.",
                    childSource.getClass().getName());
            ((SupportsProjectionPushDown) childSource)
                    .applyProjection(projectedFields, producedDataType);
        }
    }

    @Override
    public boolean supportsNestedProjection() {
        return false;
    }

    // ------------------------------------------------------------------------

    private static ScanTableSource.ScanRuntimeProvider validateAndGetProvider(
            ScanTableSource tableSource) {
        ScanTableSource.ScanRuntimeProvider provider =
                tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
        Preconditions.checkState(
                provider instanceof SourceProvider
                        || provider instanceof DataStreamScanSourceAbilityProvider,
                "Provider %s is not a SourceProvider or DataStreamScanSourceAbilityProvider.",
                provider.getClass().getName());
        return provider;
    }

    private static Source<RowData, ?, ?> validateAndGetSource(
            ScanTableSource.ScanRuntimeProvider provider) {
        if (provider instanceof DataStreamScanSourceAbilityProvider) {
            Object source = ((DataStreamScanSourceAbilityProvider<?>) provider).createSource();
            Preconditions.checkState(
                    source instanceof Source,
                    "Child source %s is not a new Source.",
                    source.getClass().getName());
            return (Source<RowData, ?, ?>) source;
        } else {
            return ((SourceProvider) provider).createSource();
        }
    }

    private static SupportsGetEndTimestamp validateAndCastSplitEnumerator(
            SplitEnumerator splitEnumerator) {
        Preconditions.checkState(
                splitEnumerator instanceof SupportsGetEndTimestamp,
                "The split enumerator %s must implement "
                        + "SupportsGetEndTimestamp interface to be used in hybrid source when %s "
                        + "is true.",
                splitEnumerator.getClass().getName(),
                OPTIONAL_SWITCHED_START_POSITION_ENABLED.key());
        return (SupportsGetEndTimestamp) splitEnumerator;
    }

    private static SupportsSwitchedStartTimestamp validateAndCastSource(Source source) {
        Preconditions.checkState(
                source instanceof SupportsSwitchedStartTimestamp,
                "The dynamic table source %s must implement "
                        + "SupportsSwitchedStartTimestamp interface"
                        + "to be used in hybrid source when %s is true.",
                source.getClass().getName(),
                OPTIONAL_SWITCHED_START_POSITION_ENABLED.key());
        return (SupportsSwitchedStartTimestamp) source;
    }
 }

HybridTableSource bind accepted child sources with given order to final HybridSource.


HybridTableSourceFactory

The core logic is that we read hybrid source schema and identifier-prefix child source options and takeoff it's prefix to generate new child source schema.

Code Block
languagejava
firstline1
titleHybridTableSourceFactory
linenumberstrue
collapsetrue
@Override
public DynamicTableSource createDynamicTableSource(Context context) {         
		final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        final Configuration tableOptions = (Configuration) helper.getOptions();

        String tableName = context.getObjectIdentifier().toString();
        ResolvedCatalogTable hybridCatalogTable = context.getCatalogTable();
        ResolvedSchema tableSchema = hybridCatalogTable.getResolvedSchema();

        // process and check source identifiers
        String sourceIdentifiersStr = tableOptions.get(SOURCE_IDENTIFIERS);
        List<String> sourceIdentifiers = Arrays.asList(sourceIdentifiersStr.split(","));
        sourceIdentifiers.forEach(
                identifier ->
                        Preconditions.checkArgument(
                                identifier.matches(SOURCE_IDENTIFIER_REGEX),
                                "source-identifier pattern must be " + SOURCE_IDENTIFIER_REGEX));
        Preconditions.checkArgument(
                sourceIdentifiers.size() >= 2,
                String.format(
                        "hybrid source '%s' option must specify at least 2 sources",
                        SOURCE_IDENTIFIERS.key()));
        Preconditions.checkArgument(
                sourceIdentifiers.stream().distinct().count() == sourceIdentifiers.size(),
                "each source identifier must not be the same");

        // validate params
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
        for (String optionKey : tableOptions.toMap().keySet()) {
            if (sourceIdentifiers.stream().anyMatch(optionKey::startsWith)) {
                ConfigOption<String> childOption = key(optionKey).stringType().noDefaultValue();
                optionalOptions.add(childOption);
            }
        }
        optionalOptions.addAll(optionalOptions());
        FactoryUtil.validateFactoryOptions(requiredOptions(), optionalOptions, tableOptions);

        Set<String> consumedOptionKeys = new HashSet<>();
        consumedOptionKeys.add(CONNECTOR.key());
        consumedOptionKeys.add(SOURCE_IDENTIFIERS.key());
        optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
        FactoryUtil.validateUnconsumedKeys(
                factoryIdentifier(), tableOptions.keySet(), consumedOptionKeys);                  
		
		// generate each child table sources & concat sources to hybrid table source
        List<ScanTableSource> childTableSources = new ArrayList<>();
        ClassLoader cl = HybridTableSourceFactory.class.getClassLoader();
        for (String sourceIdentifier : sourceIdentifiers) {
            ResolvedCatalogTable childCatalogTable =
                    hybridCatalogTable.copy(
                            extractChildSourceOptions(
                                    hybridCatalogTable.getOptions(), sourceIdentifier));             
			String newTableName =
                    String.join(
                            "_", context.getObjectIdentifier().getObjectName(), sourceIdentifier);
            DynamicTableSource tableSource =
                    FactoryUtil.createDynamicTableSource(
                            null,
                            ObjectIdentifier.of(
                                    context.getObjectIdentifier().getCatalogName(),
                                    context.getObjectIdentifier().getDatabaseName(),
                                    newTableName),
                            childCatalogTable,
                            Collections.emptyMap(),
                            context.getConfiguration(),
                            cl,
                            true);             
			childTableSources.add(validateAndCastTableSource(tableSource));
        }

        LOG.info("Generate hybrid child sources with: {}.", childTableSources);

        Preconditions.checkArgument(
                sourceIdentifiers.size() == childTableSources.size(),
                "unmatched source identifiers size and generated child sources size");

        return new HybridTableSource(tableName, childTableSources, tableOptions, tableSchema);
 }


TableSourceFactory is using for Java SPI to search hybrid source implementation.

HybridConnectorOptions

Options for creating HybridTableSource.

Code Block
languagejava
firstline1
titleHybridConnectorOptions
linenumberstrue
public class HybridConnectorOptions {

    public static final String SOURCE_IDENTIFIER_REGEX = "[A-Za-z0-9_]+";
    public static final String SOURCE_IDENTIFIER_DELIMITER = ".";

    public static final ConfigOption<String> SOURCE_IDENTIFIERS =
            ConfigOptions.key("source-identifiers")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Use comma delimiter and identifier indicate child sources that need to be concatenated. "
                                    + "e.g. source-identifiers='historical,realtime'");

    public static final ConfigOption<Boolean> OPTIONAL_SWITCHED_START_POSITION_ENABLED =
            ConfigOptions.key("switched-start-position-enabled")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "Whether to enable switched start position, default is false for using fixed start position. "
                                    + "If it is true, then hybrid source will call the previous source SplitEnumerator#getEndTimestamp "
                                    + "to get end timestamp and pass to next unbounded streaming source. ");
}

Future Work

1.add a metadata column to indicate the data source type, let user know the data is from bounded source or unbounded source.

for example, sources='historical,realtime'. historical.connector='filesystem', realtime.connector='kafka'. This metadata will be 'historical'  when read from filesystem and 'realtime'  when read from kafka.

Then user will use this metadata column deal some custom sql logics.

2.schema mapping. allow hybrid source inner child sources has inconsistent schema. e.g. different field name

Compatibility, Deprecation, and Migration Plan

...

Add integration test cases for hybrid source sql ddl & table api.

Rejected Alternatives

to be added.

...