You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 20 Next »

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As discussed above, currently flink HybridSource is released, but it only be used in DataStream. We Need to add sql support for many table & sql end users.

so we propose this flip.

Basic Idea

Add a new built-in hybrid connector. First, In the HybridTableSourceFactory, use 'sources' option to concat ordered some child sources.

Next, we deal with indexed concrete child source option and pass to child source table factory to create child table source instances.

When child table source instances are ready, we use child table source ScanRuntimeProvider to get the actual child Source(FLIP-27 new Source API)

Finally, we bind sources to HybridSource.


ddl core options:

sources:Use comma delimiter child sources string to indicate concatenated child sources. It's in order. The boundedness of hybrid source is last child source's boundedness.  (required)

$identifier.option:  The concrete child source options. Because we may use same connectors as connected sources, so we can't use connector name as prefix, so use user-specific identifier as child source option prefix. it will be taken-off the identifier to get real option and pass to concrete child sources.(required)

note:  we using [a-z0-9_] as child sources identifiers.

switched-start-position-enabled: Currently, the DataStream API expose 2 start ways about second source. Use this option to enable the Switched-Start-Position.  if it's false, the hybrid will convert it to Fixed-Start-Position. The default is Fixed-Start-Position.

schema-field-mappings: Use json kv to match the different field names with ddl field to deal inconsistent schema scenarios (It's an extra feature, the prototype implementation below show how it implements and works). (optional)


ddl (boundedness, just use 2 filesystem sources to explain it)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='filesystem'
 'realtime.path' = '/tmp/a.csv',
 'realtime.format' = 'csv'
);


ddl (unbounded data, use source.monitor-interval specify the second source is unbounded)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='filesystem'
 'realtime.path' = '/tmp/a.csv',
 'realtime.format' = 'csv'
 'realtime.source.monitor-interval' = '3'
);


ddl(normal scenario, with kafka)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='kafka'
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'testGroup',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv' 
);


ddl(with more child sources)

DDL example
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='historical01,historical02,realtime',
 'historical01.connector'='filesystem'
 'historical01.path' = '/tmp/a.csv',
 'historical01.format' = 'csv',
 'historical02.connector'='filesystem'
 'historical02.path' = '/tmp/a.csv',
 'historical02.format' = 'csv', 
 'realtime.connector'='kafka'
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'testGroup',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv' 
);


ddl(with different filed names, it's a feature, may not be implemented finally. need to be discussed)

a.json

{"name": "lily","gender": "girl","age": 10}

kafka

{"uid": "bob","gender": "boy","age": 12}

bounded file the first column is name and kafka is uid. we can define ddl field with name and let second kafka source name uid map to ddl field.

DDL example
CREATE TABLE hybrid_source(
  name varchar,
  gender varchar,
  age bigint
 ) WITH (
  'connector' = 'hybrid',
  'sources'='historical,realtime',
  'schema-field-mappings'='[{},{"name": "uid"}]',
  'historical.connector'='filesystem'
  'historical.path' = '/tmp/a.json',
  'historical.format' = 'json',   
  'realtime.properties.bootstrap.servers' = 'localhost:9092',
  'realtime.properties.group.id' = 'testGroup',
  'realtime.scan.startup.mode' = 'earliest-offset',
  'realtime.format' = 'json'   
)


it means the first a source no need to match, second source use mappings option to deal different field name.


Start position conversion:

Currently, the base SplitEnumerator and built-in source not expose the end position, we can't use it pass to the next streaming source. We may need to update the SplitEnumerator interface to add getEndTimestamp() support.

Then we use switched-start-position-enabled option to choose switched start position or fixed start position.

Because hybrid source can use more child sources not just 2, so currently we do not support the previous bounded source to use switched-start-position. We just allow the last unbounded source(e.g. kafka) to use timestamp from previous source.

Prototype implementation


HybridTableSource(Switched start position & Fixed start position)

HybridTableSource
public class HybridTableSource implements ScanTableSource {
    public HybridTableSource(
            String tableName,
            @Nonnull List<Source<RowData, ?, ?>> childSources,
            Configuration configuration,
            ResolvedSchema tableSchema) {
        this.tableName = tableName;
        this.tableSchema = tableSchema;
        this.childSources = childSources;
        this.configuration = configuration;
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        Preconditions.checkArgument(childSources.size() > 0);
        HybridSource<RowData> hybridSource;
        if (configuration.getBoolean(
                HybridConnectorOptions.OPTIONAL_SWITCHED_START_POSITION_ENABLED)) {
            HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder =
                    HybridSource.builder(childSources.get(0));
            for (int i = 1; i < childSources.size(); i++) {
                final int sourceIndex = i;
                Boundedness boundedness = childSources.get(sourceIndex).getBoundedness();
                builder.addSource(
                        switchContext -> {
                            SplitEnumerator previousEnumerator =
                                    switchContext.getPreviousEnumerator();
                            // how to pass to kafka or other connector ? Can we add a method in new
                            // source api like startTimestamp();
                            long switchedTimestamp = previousEnumerator.getEndTimestamp();
                            return childSources.get(sourceIndex);
                        },
                        boundedness);
            }
            hybridSource = builder.build();
        } else {
            HybridSource.HybridSourceBuilder<RowData, HybridSourceSplitEnumerator> builder =
                    HybridSource.builder(childSources.get(0));
            for (int i = 1; i < childSources.size(); i++) {
                builder.addSource(childSources.get(i));
            }
            hybridSource = builder.build();
        }
        return SourceProvider.of(hybridSource);     
   } 
}

HybridTableSource bind accepted child sources with given order to final HybridSource.


HybridTableSourceFactory

The core logic is that we read hybrid source schema and identifier-prefix child source options and takeoff it's prefix to generate new child source schema.

HybridTableSourceFactory
@Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        final Configuration tableOptions = (Configuration) helper.getOptions();

        String tableName = context.getObjectIdentifier().toString();
        ResolvedCatalogTable hybridCatalogTable = context.getCatalogTable();
        ResolvedSchema tableSchema = hybridCatalogTable.getResolvedSchema();

        // validate params
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
        for (String optionKey : tableOptions.toMap().keySet()) {
            if (optionKey.matches(HybridConnectorOptions.SOURCE_OPTION_REGEX)) {
                ConfigOption<String> childOption = key(optionKey).stringType().noDefaultValue();
                optionalOptions.add(childOption);
            }
        }
        optionalOptions.addAll(optionalOptions());
        FactoryUtil.validateFactoryOptions(requiredOptions(), optionalOptions, tableOptions);

        Set<String> consumedOptionKeys = new HashSet<>();
        consumedOptionKeys.add(CONNECTOR.key());
        consumedOptionKeys.add(HybridConnectorOptions.SOURCES.key());
        optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
        FactoryUtil.validateUnconsumedKeys(
                factoryIdentifier(), tableOptions.keySet(), consumedOptionKeys);

        final ReadableConfig flinkConf = context.getConfiguration();
        RuntimeExecutionMode runtimeMode = flinkConf.get(ExecutionOptions.RUNTIME_MODE);
        boolean isBatchMode = runtimeMode == RuntimeExecutionMode.BATCH;

        // process source option
        String sourceStr = tableOptions.get(HybridConnectorOptions.SOURCES);
        List<String> sourceList = Arrays.asList(sourceStr.split(","));
        if (sourceList.size() < 2) {
            throw new TableException(
                    String.format(
                            "Hybrid source '%s' option must specify at least 2 sources.",
                            HybridConnectorOptions.SOURCES.key()));
        }

        // parse schema-field-mappings
        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
        String fieldMappingStr =
                tableOptions.get(HybridConnectorOptions.OPTIONAL_SCHEMA_FIELD_MAPPINGS);
        List<Map<String, String>> fieldMappingsList = null;
        if (!StringUtils.isEmpty(fieldMappingStr)) {
            try {
                fieldMappingsList =
                        mapper.readValue(
                                fieldMappingStr, new TypeReference<List<Map<String, String>>>() {});
            } catch (Exception e) {
                throw new TableException(
                        String.format(
                                "Failed to parse hybrid source '%s' option.",
                                HybridConnectorOptions.OPTIONAL_SCHEMA_FIELD_MAPPINGS.key()),
                        e);
            }
            if (fieldMappingsList.size() != sourceList.size()) {
                throw new IllegalArgumentException(
                        String.format(
                                "Hybrid source '%s' option split nums must equals child source nums.",
                                HybridConnectorOptions.OPTIONAL_SCHEMA_FIELD_MAPPINGS.key()));
            }
        }

        LOG.info("Hybrid source is consisted of {}.", sourceList);

        // generate concrete child sources & concat sources to final hybrid source
        List<Source<RowData, ?, ?>> childSources = new ArrayList<>();
        ClassLoader cl = HybridTableSourceFactory.class.getClassLoader();
        for (int i = 0; i < sourceList.size(); i++) {
            ResolvedCatalogTable childCatalogTable;
            ResolvedSchema childResolvedSchema = null;
            // override schema
            if (fieldMappingsList != null) {
                Map<String, String> fieldMappings = fieldMappingsList.get(i);
                childResolvedSchema =
                        createChildSchemaUsingFieldMappings(tableSchema, fieldMappings);
            }
            if (childResolvedSchema != null) {
                childCatalogTable =
                        new ResolvedCatalogTable(
                                hybridCatalogTable
                                        .getOrigin()
                                        .copy(
                                                extractChildSourceOptions(
                                                        hybridCatalogTable.getOptions(), i)),
                                childResolvedSchema);
            } else {
                childCatalogTable =
                        hybridCatalogTable.copy(
                                extractChildSourceOptions(hybridCatalogTable.getOptions(), i));
            }

            DynamicTableSource tableSource =
                    FactoryUtil.createDynamicTableSource(
                            null,
                            context.getObjectIdentifier(),
                            childCatalogTable,
                            Collections.emptyMap(),
                            context.getConfiguration(),
                            cl,
                            true);
            if (tableSource instanceof ScanTableSource) {
                ScanTableSource.ScanRuntimeProvider provider =
                        ((ScanTableSource) tableSource)
                                .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
                if (provider instanceof SourceProvider) {
                    Source<RowData, ?, ?> source = ((SourceProvider) provider).createSource();
                    // the first child source must be bounded.
                    if (i == 0 && source.getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED) {
                        throw new TableException(
                                "hybrid source first child source must be bounded.");
                    }
                    // in batch mode, all sources must be bounded. in streaming mode, sources can
                    // both be bounded or unbounded.
                    if (isBatchMode
                            && source.getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED) {
                        throw new TableException(
                                "hybrid source all child sources must be bounded in batch mode.");
                    }
                    childSources.add(source);
                } else {
                    throw new UnsupportedOperationException(
                            provider.getClass().getCanonicalName() + " is unsupported now.");
                }
            } else {
                // hybrid source only support ScanTableSource
                throw new TableException(
                        String.format(
                                "%s is not a ScanTableSource, please check it.",
                                tableSource.getClass().getName()));
            }
        }
        LOG.info("Generate hybrid child sources with: {}.", childSources);

        // post check
        Preconditions.checkArgument(sourceList.size() == childSources.size());

        return new HybridTableSource(tableName, childSources, tableOptions, tableSchema);
    }

    protected Map<String, String> extractChildSourceOptions(
            Map<String, String> originalOptions, int index) {
        if (originalOptions == null || originalOptions.isEmpty()) {
            return originalOptions;
        }
        String optionPrefix = index + HybridConnectorOptions.SOURCE_DELIMITER;
        Map<String, String> sourceOptions =
                originalOptions.entrySet().stream()
                        .filter(entry -> entry.getKey().startsWith(optionPrefix))
                        .collect(
                                Collectors.toMap(
                                        entry ->
                                                StringUtils.removeStart(
                                                        entry.getKey(), optionPrefix),
                                        Map.Entry::getValue));
        String[] sources = originalOptions.get(HybridConnectorOptions.SOURCES.key()).split(",");
        sourceOptions.put(FactoryUtil.CONNECTOR.key(), sources[index]);

        return sourceOptions;
    }

TableSourceFactory is using for Java SPI to search hybrid source implementation.

HybridConnectorOptions
HybridConnectorOptions
public class HybridConnectorOptions {

    public static final String SOURCE_DELIMITER = ".";      

    public static final ConfigOption<String> SOURCES =
            ConfigOptions.key("sources")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Use comma delimiter and identifier indicate child sources that need to be concatenated. e.g. sources='historical,realtime'");

    public static final ConfigOption<Boolean> OPTIONAL_SWITCHED_START_POSITION_ENABLED =
            ConfigOptions.key("switched-start-position-enabled")
                    .booleanType()
                    .noDefaultValue()
                    .withDescription(
                            "Whether to enable switched start position, default is false for using fixed start position. If it is true, then hybrid source will "
                                    + "call the previous source SplitEnumerator#getEndTimestamp to get end ts and pass to this unbounded streaming source. ");

    public static final ConfigOption<String> OPTIONAL_SCHEMA_FIELD_MAPPINGS =
            ConfigOptions.key("schema-field-mappings")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Use json kv to match the different field names with ddl field. e.g. '[{\"f0\":\"A\"},{}]' it means the "
                                    + "first child source column A is match to ddl column f0, the second source no matching.");

    private HybridConnectorOptions() {}
}

Options for creating HybridTableSource.

Future Work

add a metadata column to indicate the data source type, let user know the data is from bounded source or unbounded source.

for example, sources='historical,realtime'. historical.connector='filesystem', realtime.connector='kafka'. This metadata will be 'historical'  when read from filesystem and 'realtime'  when read from kafka.

Then user will use this metadata column deal some custom sql logics.

Compatibility, Deprecation, and Migration Plan

It's a new support without migration currently.

Test Plan

Add unit test case for HybridTableSourceFactory and HybridTableSource.

Add integration test cases for hybrid source sql ddl.

Rejected Alternatives

to be added.
















  • No labels