You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 30 Next »

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As discussed above, currently flink HybridSource is released, but it only be used in DataStream. We Need to add sql and table support for many table & sql end users.

so we propose this flip.

Basic Idea

Add a new built-in hybrid connector. First, In the HybridTableSourceFactory, use 'source-identifiers'(final name could be changed) option to concat ordered some child sources.

Next, we deal with indexed concrete child source option and pass to child source table factory to create child table source instances.

When child table source instances are ready, we use child table source ScanRuntimeProvider to get the actual child Source(FLIP-27 new Source API)

Finally, we bind child sources to HybridSource.


core options:

source-identifiers:Use comma delimiter identifier string to indicate concatenated child sources. It's in order. The boundedness of hybrid source is last child source's boundedness.  (required)

$identifier.option:  The concrete child source options. Because we may use same connectors as connected sources, so we can't use connector name as prefix, so use user-specific identifier as child source option prefix. it will be taken-off the identifier to get real option and pass to concrete child sources.(required)

note:  we use [A-Za-z0-9_]+ as child source identifier pattern.

switched-start-position-enabled: Currently, the DataStream API expose two start ways about next source. Use this option to enable the Switched-Start-Position(default is false).  if it's false, the hybrid will use Fixed-Start-Position. The default is Fixed-Start-Position.


1.ddl (boundedness, just use 2 filesystem sources to explain it)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='filesystem'
 'realtime.path' = '/tmp/a.csv',
 'realtime.format' = 'csv'
);


table api (boundedness, just use 2 filesystem sources to explain it)

Table API
TableDescriptor tableDescriptor =
                TableDescriptor.forConnector("hybrid")
                        .schema(
                                Schema.newBuilder()
                                        .column("f0", DataTypes.STRING())
                                        .column("f1", DataTypes.STRING())
                                        .column("f2", DataTypes.BIGINT())
                                        .build())
                        .option("source-identifiers", "historical,realtime")
                        .option("historical.connector", "filesystem")
                        .option("historical.path", "/tmp/a.csv")
                        .option("historical.format", "csv")
                        .option("realtime.connector", "filesystem")
                        .option("realtime.path", "/tmp/b.csv")
                        .option("realtime.format", "csv")
                        .build();
        tEnv.createTable("hybrid_source", tableDescriptor);

        Table table = tEnv.from("hybrid_source").select($("f0"), $("f1"), $("f2"));


2.ddl (unbounded data, use source.monitor-interval specify the second source is unbounded)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='filesystem'
 'realtime.path' = '/tmp/a.csv',
 'realtime.format' = 'csv'
 'realtime.source.monitor-interval' = '5s'
);


table api (unbounded data, use source.monitor-interval specify the second source is unbounded)

Table API
TableDescriptor tableDescriptor =
                TableDescriptor.forConnector("hybrid")
                        .schema(
                                Schema.newBuilder()
                                        .column("f0", DataTypes.STRING())
                                        .column("f1", DataTypes.STRING())
                                        .column("f2", DataTypes.BIGINT())
                                        .build())
                        .option("source-identifiers", "historical,realtime")
                        .option("historical.connector", "filesystem")
                        .option("historical.path", "/tmp/a.csv")
                        .option("historical.format", "csv")
                        .option("realtime.connector", "filesystem")
                        .option("realtime.path", "/tmp/b.csv")
                        .option("realtime.format", "csv")
						.option("realtime.source.monitor-interval", "5s")
                        .build();
        tEnv.createTable("hybrid_source", tableDescriptor);

        Table table = tEnv.from("hybrid_source").select($("f0"), $("f1"), $("f2"));


3.ddl(normal scenario, with kafka)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'test',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv' 
);


table api(normal scenario, with kafka)

Table API
TableDescriptor tableDescriptor =
                TableDescriptor.forConnector("hybrid")
                        .schema(
                                Schema.newBuilder()
                                        .column("f0", DataTypes.STRING())
                                        .column("f1", DataTypes.STRING())
                                        .column("f2", DataTypes.BIGINT())
                                        .build())
                        .option("source-identifiers", "historical,realtime")
                        .option("historical.connector", "filesystem")
                        .option("historical.path", "/tmp/a.csv")
                        .option("historical.format", "csv")
                        .option("realtime.connector", "kafka")
                        .option("realtime.topic", "test")
                        .option("realtime.properties.bootstrap.servers", "localhost:9092")
						.option("realtime.group.id", "test")
						.option("realtime.scan.startup.mode", "earliest-offset")
                        .build();
        tEnv.createTable("hybrid_source", tableDescriptor);

        Table table = tEnv.from("hybrid_source").select($("f0"), $("f1"), $("f2"));


4.ddl(with more child sources)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='historical01,historical02,realtime',
 'historical01.connector'='filesystem'
 'historical01.path' = '/tmp/a.csv',
 'historical01.format' = 'csv',
 'historical02.connector'='filesystem'
 'historical02.path' = '/tmp/a.csv',
 'historical02.format' = 'csv', 
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'testGroup',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv' 
);

table api 

similar with other cases above


Start position conversion:

Currently, the base SplitEnumerator and built-in source not expose the end timestamp, we can't use it pass to the next streaming source. We may need to update the SplitEnumerator interface to add getEndTimestamp() support.

Then we use switched-start-position-enabled option to choose switched start position or fixed start position. how it works? please the prototype implementation or draft pr below.

Prototype implementation


First, the FLIP-150 HybridSource support switched start timestamp for next source from previous one. but current all connectors  can not get endTimestamp. So, actually we can not use this function.  So, at this flip we offer this interface.


SupportsGetEndTimestamp

/**
 * A decorative interface of {@link SplitEnumerator} which allows to get end timestamp.
 *
 * <p>The split enumerator must implement this interface if it needs to support switched start
 * position in hybrid source scenario and other needed situations.
 */
public interface SupportsGetEndTimestamp {

    /** Get the end timestamp for current source or split enumerator. */
    long getEndTimestamp();
}

SupportsSwitchedStartTimestamp

/**
 * Unlike predefined start offset in table source, this interface allow to apply a dynamic switched
 * timestamp to a {@link ScanTableSource}, then table source can use this switched timestamp to
 * re-initialize the start offsets. However, it depends on concrete source implementation. A common
 * scenario is that HybridSource next source use this ability to switch from previous one.
 *
 * <p>Note: if the table source can use in HybridSource when enable switched-start-position then it
 * must be implemented, otherwise will cause exception.
 */
public interface SupportsSwitchedStartTimestamp {      
	 
     /** Apply given switched start timestamp to source. */
     void applySwitchedStartTimestamp(long startTimestamp);
}


we use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to get previous file connector end timestamp and apply it to next streaming source.


HybridTableSource(support Switched start position & Fixed start position)

HybridTableSource
public class HybridTableSource implements ScanTableSource {

    private final String tableName;
    private final ResolvedSchema tableSchema;
    private final List<ScanTableSource> childTableSources;
    private final Configuration configuration;

    public HybridTableSource(
            String tableName,
            @Nonnull List<ScanTableSource> childTableSources,
            Configuration configuration,
            ResolvedSchema tableSchema) {
        this.tableName = tableName;
        this.tableSchema = tableSchema;
        Preconditions.checkArgument(
                childTableSources.size() >= 2, "child table sources must at least 2 sources");
        this.childTableSources = childTableSources;
        this.configuration = configuration;
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder;
        ScanTableSource firstTableSource = childTableSources.get(0);
        ScanTableSource.ScanRuntimeProvider firstProvider =
                firstTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
        Source<RowData, ?, ?> firstSource = checkProviderAndGetSource(firstProvider);
        builder = HybridSource.builder(firstSource);

        if (configuration.getBoolean(OPTIONAL_SWITCHED_START_POSITION_ENABLED)) {
            // switched start position
            for (int i = 1; i < childTableSources.size(); i++) {
                ScanTableSource nextTableSource = childTableSources.get(i);
                Boundedness boundedness =
                        nextTableSource
                                        .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE)
                                        .isBounded()
                                ? Boundedness.BOUNDED
                                : Boundedness.CONTINUOUS_UNBOUNDED;
                builder.addSource(
                        switchContext -> {
                            SplitEnumerator previousEnumerator =
                                    switchContext.getPreviousEnumerator();
                            // how to get and apply timestamp depends on specific enumerator
                            long switchedTimestamp =
                                    checkAndCastSplitEnumerator(previousEnumerator)
                                            .getEndTimestamp();
                            checkAndCastTableSource(nextTableSource)
                                    .applySwitchedStartTimestamp(switchedTimestamp);
                            ScanTableSource.ScanRuntimeProvider nextProvider =
                                    nextTableSource.getScanRuntimeProvider(
                                            ScanRuntimeProviderContext.INSTANCE);
                            return checkProviderAndGetSource(nextProvider);
                        },
                        boundedness);
            }
        } else {
            // fixed start position
            for (int i = 1; i < childTableSources.size(); i++) {
                ScanTableSource.ScanRuntimeProvider nextProvider =
                        childTableSources
                                .get(i)
                                .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
                builder.addSource(checkProviderAndGetSource(nextProvider));
            }
        }

        return SourceProvider.of(builder.build());
    }
}

HybridTableSource bind accepted child sources with given order to final HybridSource.


HybridTableSourceFactory

The core logic is that we read hybrid source schema and identifier-prefix child source options and takeoff it's prefix to generate new child source schema.

HybridTableSourceFactory
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        final Configuration tableOptions = (Configuration) helper.getOptions();

        String tableName = context.getObjectIdentifier().toString();
        ResolvedCatalogTable hybridCatalogTable = context.getCatalogTable();
        ResolvedSchema tableSchema = hybridCatalogTable.getResolvedSchema();

        // process and check source identifiers
        String sourceIdentifiersStr = tableOptions.get(SOURCE_IDENTIFIERS);
        List<String> sourceIdentifiers = Arrays.asList(sourceIdentifiersStr.split(","));
        sourceIdentifiers.forEach(
                identifier ->
                        Preconditions.checkArgument(
                                identifier.matches(SOURCE_IDENTIFIER_REGEX),
                                "source-identifier pattern must be " + SOURCE_IDENTIFIER_REGEX));
        Preconditions.checkArgument(
                sourceIdentifiers.size() >= 2,
                String.format(
                        "hybrid source '%s' option must specify at least 2 sources",
                        SOURCE_IDENTIFIERS.key()));
        Preconditions.checkArgument(
                sourceIdentifiers.stream().distinct().count() == sourceIdentifiers.size(),
                "each source identifier must not be the same");

        // validate params
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
        for (String optionKey : tableOptions.toMap().keySet()) {
            if (sourceIdentifiers.stream().anyMatch(optionKey::startsWith)) {
                ConfigOption<String> childOption = key(optionKey).stringType().noDefaultValue();
                optionalOptions.add(childOption);
            }
        }
        optionalOptions.addAll(optionalOptions());
        FactoryUtil.validateFactoryOptions(requiredOptions(), optionalOptions, tableOptions);

        Set<String> consumedOptionKeys = new HashSet<>();
        consumedOptionKeys.add(CONNECTOR.key());
        consumedOptionKeys.add(SOURCE_IDENTIFIERS.key());
        optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
        FactoryUtil.validateUnconsumedKeys(
                factoryIdentifier(), tableOptions.keySet(), consumedOptionKeys);

        // generate concrete child sources & concat sources to final hybrid source
        List<ScanTableSource> childTableSources = new ArrayList<>();
        ClassLoader cl = HybridTableSourceFactory.class.getClassLoader();
        for (String sourceIdentifier : sourceIdentifiers) {
            ResolvedCatalogTable childCatalogTable =
                    hybridCatalogTable.copy(
                            extractChildSourceOptions(
                                    hybridCatalogTable.getOptions(), sourceIdentifier));
            DynamicTableSource tableSource =
                    FactoryUtil.createDynamicTableSource(
                            null,
                            context.getObjectIdentifier(),
                            childCatalogTable,
                            Collections.emptyMap(),
                            context.getConfiguration(),
                            cl,
                            true);
            childTableSources.add(checkAndCastTableSource(tableSource));
        }

        LOG.info("Generate hybrid child sources with: {}.", childTableSources);

        Preconditions.checkArgument(
                sourceIdentifiers.size() == childTableSources.size(),
                "unmatched source identifiers size and generated child sources size");

        return new HybridTableSource(tableName, childTableSources, tableOptions, tableSchema);
}

private static Source<RowData, ?, ?> checkProviderAndGetSource(
            ScanTableSource.ScanRuntimeProvider provider) {
        Preconditions.checkState(
                provider instanceof SourceProvider
                        || provider instanceof DataStreamScanSourceAbilityProvider,
                "Provider %s is not a SourceProvider or DataStreamScanSourceAbilityProvider.",
                provider.getClass().getName());
        if (provider instanceof DataStreamScanSourceAbilityProvider) {
            Object source = ((DataStreamScanSourceAbilityProvider<?>) provider).createSource();
            Preconditions.checkState(
                    source instanceof Source,
                    "Child source %s is not a new Source.",
                    source.getClass().getName());
            return (Source<RowData, ?, ?>) source;
        } else {
            return ((SourceProvider) provider).createSource();
        }
    }

private static SupportsGetEndTimestamp checkAndCastSplitEnumerator(
        SplitEnumerator splitEnumerator) {
    Preconditions.checkState(
            splitEnumerator instanceof SupportsGetEndTimestamp,
            "The split enumerator %s must implement "
                    + "SupportsGetEndTimestamp interface"
                    + "to be used in hybrid source when %s is true.",
            splitEnumerator.getClass().getName(),
            OPTIONAL_SWITCHED_START_POSITION_ENABLED.key());
    return (SupportsGetEndTimestamp) splitEnumerator;
}

private SupportsSwitchedStartTimestamp checkAndCastTableSource(ScanTableSource tableSource) {
    Preconditions.checkState(
            tableSource instanceof SupportsSwitchedStartTimestamp,
            "The dynamic table source %s must implement "
                    + "SupportsSwitchedStartTimestamp interface"
                    + "to be used in hybrid source when %s is true.",
            tableSource.getClass().getName(),
            OPTIONAL_SWITCHED_START_POSITION_ENABLED.key());
    return (SupportsSwitchedStartTimestamp) tableSource;
}

TableSourceFactory is using for Java SPI to search hybrid source implementation.

HybridConnectorOptions
HybridConnectorOptions
public class HybridConnectorOptions {

    public static final String SOURCE_IDENTIFIER_REGEX = "[A-Za-z0-9_]+";
    public static final String SOURCE_IDENTIFIER_DELIMITER = ".";

    public static final ConfigOption<String> SOURCE_IDENTIFIERS =
            ConfigOptions.key("source-identifiers")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Use comma delimiter and identifier indicate child sources that need to be concatenated. "
                                    + "e.g. source-identifiers='historical,realtime'");

    public static final ConfigOption<Boolean> OPTIONAL_SWITCHED_START_POSITION_ENABLED =
            ConfigOptions.key("switched-start-position-enabled")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "Whether to enable switched start position, default is false for using fixed start position. "
                                    + "If it is true, then hybrid source will call the previous source SplitEnumerator#getEndTimestamp "
                                    + "to get end timestamp and pass to next unbounded streaming source. ");
}

Options for creating HybridTableSource.

Future Work

1.add a metadata column to indicate the data source type, let user know the data is from bounded source or unbounded source.

for example, sources='historical,realtime'. historical.connector='filesystem', realtime.connector='kafka'. This metadata will be 'historical'  when read from filesystem and 'realtime'  when read from kafka.

Then user will use this metadata column deal some custom sql logics.

2.schema mapping. allow hybrid source inner child sources has inconsistent schema. e.g. different field name

Compatibility, Deprecation, and Migration Plan

It's a new support without migration currently.

Test Plan

Add unit test case for HybridTableSourceFactory and HybridTableSource.

Add integration test cases for hybrid source sql ddl.

Rejected Alternatives

to be added.
























  • No labels