THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Page properties |
---|
...
|
...
...
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
/**
* Base interface for all kinds of formats.
*/
public interface FormatFactory extends Factory {
/**
* Determines the changelog mode for this format.
*/
ChangelogMode createChangelogModederiveChangelogMode(DynamicTableFactory.Context tableContext);
}
/**
* Factory for creating a {@link DeserializationSchema} that returns internal data structures.
*/
public interface DeserializationFormatFactory extends FormatFactory {
/**
* Creates {@link DeserializationSchema} for the given produced {@link DataType} considering all
* contextual information.
*/
DeserializationSchema<RowData> createDeserializationFormatcreateDeserializationSchema(
DynamicTableFactory.Context tableContext,
ScanTableSource.Context runtimeContext,
DataType producedDataType);
}
...
class KafkaSourceFactory extends DynamicTableSourceFactory {
// ...
DynamicTableSource createDynamicTableSource(Context context) {
// perform format factory discovery
TableFormatFactory keyFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, KEY_OPTION);
TableFormatFactory valueFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, VALUE_OPTION);
// validate using required and optional options of each factory
// also validates if there are left-over keys
FactoryUtil.validate(
context,
Arrays.asList(KEY_OPTION, VALUE_OPTION),
Arrays.asList<thisasList(this, keyFormatFactory, valueFormatFactory));
ChangelogMode mode = valueFormatFactory.createChangelogMode(context);
// construct the source
return KafkaTableSource.builder(context)
.changelogMode(mode) // or extract it internally from the valueFormatFactory
.keyFormatFactory(keyFormatFactory)
.valueFormatFactory(valueFormatFactory)
.build();
}
}
class FactoryUtil {
// ...
void validate(
DynamicTableFactory.Context context,
List<ConfigOption> usedOptions,
List<Factory> usedFactories) {
// perform ConfigOption set arithmetic to check for
// invalid, missing options etc.
}
}
...