You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As discussed above, currently flink HybridSource is released, but it only be used in DataStream. We Need to add sql support for many table & sql end users.

so we propose this flip.

Basic Idea

Add a new built-in hybrid connector. First, In the HybridTableSourceFactory, use 'sources' option to concat ordered some child sources.

Next, we deal with indexed concrete child source option and pass to child source table factory to create child table source instances.

When child table source instances are ready, we use child table source ScanRuntimeProvider to get the actual child Source(FLIP-27 new Source API)

Finally, we bind sources to HybridSource.


ddl core options:

sources:Use comma delimiter child sources string to indicate concatenated child sources. It's in order. The boundedness of hybrid source is last child source's boundedness.  (required)

$index.option:  The concrete child source options. Because we may use same connectors as connected sources, so we can't use connector name as prefix, so use index as child source option prefix (may be discussed). (optional)

schema-field-mappings: Use json kv to match the different field names with ddl field to deal inconsistent schema scenarios (It's an extra feature, the draft pr below show how it implements and works). (optional)


ddl (normal scenario, boundedness)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='filesystem,filesystem',
 '0.path' = '/tmp/a.csv',
 '0.format' = 'csv',
 '1.path' = '/tmp/b.csv',
 '1.format' = 'csv'"
);


ddl (normal scenario, unbounded data)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='filesystem,filesystem',
 '0.path' = '/tmp/a.csv',
 '0.format' = 'csv',
 '1.path' = '/tmp/b.csv',
 '1.format' = 'csv',
 '1.source.monitor-interval' = '3'
);



ddl(with different filed names, it's a feature, may not be implemented finally. need to be discussed)

a.json

{"name": "lily","gender": "girl","age": 10}

b.json

{"uid": "bob","gender": "boy","age": 12}

name and uid with different field name. we can define ddl field with name and let second b source name uid map to ddl field.

DDL example
CREATE TABLE hybrid_source(
  name VARCHAR,
  gender VARCHAR,
  age BIGINT
) WITH (
  'connector' = 'hybrid',
  'sources' = 'filesystem,filesystem',
  'schema-field-mappings'='[{},{"name": "uid"}]',
  '0.path' = '/tmp/a.json',
  '0.format' = 'json',
  '1.path' = '/tmp/b.json',
  '1.format' = 'json'
)


it means the first a source no need to match, second source use mappings option to deal different field name.


Start position conversion:

Currently, the FileSource not expose the end position, we can't use it pass to the next streaming source. detail: Unable to render Jira issues macro, execution error.

Actually, by using sql we can definite the next streaming source, for example, we can definite kafka start-position. 

When first batch bounded data read finished, the hybrid source will call to read kafka with given start-position or other default start strategies. 

Prototype implementation


HybridTableSource

HybridTableSource
public class HybridTableSource implements ScanTableSource {
    public HybridTableSource(
            String tableName,
            @Nonnull List<Source<RowData, ?, ?>> childSources,
            Configuration configuration,
            ResolvedSchema tableSchema) {
        this.tableName = tableName;
        this.tableSchema = tableSchema;
        this.childSources = childSources;
        this.configuration = configuration;
    }
}

HybridTableSource bind accepted child sources with given order to final HybridSource.


HybridTableSourceFactory

HybridTableSourceFactory
public class HybridTableSourceFactory implements DynamicTableSourceFactory {
   public DynamicTableSource createDynamicTableSource(Context context) {
      // to be added
   }
}

TableSourceFactory is using for Java SPI to search hybrid source implementation.

HybridConnectorOptions
HybridTableSourceFactory
public class HybridConnectorOptions {

    public static final String SOURCE_DELIMITER = ".";

    public static final ConfigOption<String> SOURCES =
            ConfigOptions.key("sources")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Use comma delimiter child sources string to indicate concatenated child sources. It's in order. e.g. sources='filesystem,kafka'");

    public static final ConfigOption<String> OPTIONAL_SCHEMA_FIELD_MAPPINGS =
            ConfigOptions.key("schema-field-mappings")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Use json kv to match the different field names with ddl field. e.g. '[{\"f0\":\"A\"},{}]' it means the "
                                    + "first child source column A is match to ddl column f0, the second source no matching.");

    private HybridConnectorOptions() {}
}

Options for creating HybridTableSource.

Future Work

add a metadata column to indicate the data source type, let user know the data is from bounded source or unbounded source.

for example, sources='filesystem,kafka'. This metadata will be 'bounded'  when read from filesystem and 'unbounded'  when read from kafka.

Then user will use this metadata column deal some custom sql logics.

Compatibility, Deprecation, and Migration Plan

It's a new support without migration currently.

Test Plan

Add unit test case for HybridTableSourceFactory and HybridTableSource.

Add integration test cases for hybrid source sql ddl.

Rejected Alternatives

to be added.








  • No labels