THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
class KafkaSourceFactory extends DynamicTableSourceFactory {
// ...
DynamicTableSource createDynamicTableSource(Context context) {
// perform format factory discovery
TableFormatFactory keyFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, KEY_OPTION);
TableFormatFactory valueFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, VALUE_OPTION);
// validate using required and optional options of each factory
// also validates if there are left-over keys
FactoryUtil.validate(
context,
Arrays.asList(KEY_OPTION, VALUE_OPTION),
Arrays.asList<thisasList(this, keyFormatFactory, valueFormatFactory));
ChangelogMode mode = valueFormatFactory.createChangelogMode(context);
// construct the source
return KafkaTableSource.builder(context)
.changelogMode(mode) // or extract it internally from the valueFormatFactory
.keyFormatFactory(keyFormatFactory)
.valueFormatFactory(valueFormatFactory)
.build();
}
}
class FactoryUtil {
// ...
void validate(
DynamicTableFactory.Context context,
List<ConfigOption> usedOptions,
List<Factory> usedFactories) {
// perform ConfigOption set arithmetic to check for
// invalid, missing options etc.
}
}
...