Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

class KafkaSourceFactory extends DynamicTableSourceFactory {
	
	// ...

	DynamicTableSource createDynamicTableSource(Context context) {
		// perform format factory discovery
		TableFormatFactory keyFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, KEY_OPTION);
		TableFormatFactory valueFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, VALUE_OPTION);

		// validate using required and optional options of each factory
		// also validates if there are left-over keys
		FactoryUtil.validate(
			context,
			Arrays.asList(KEY_OPTION, VALUE_OPTION),
			Arrays.asList<thisasList(this, keyFormatFactory, valueFormatFactory));

		ChangelogMode mode = valueFormatFactory.createChangelogMode(context);

		// construct the source
		return KafkaTableSource.builder(context)
			.changelogMode(mode) // or extract it internally from the valueFormatFactory
			.keyFormatFactory(keyFormatFactory)
			.valueFormatFactory(valueFormatFactory)
			.build();
	}
}

class FactoryUtil {
	// ...

	void validate(
		DynamicTableFactory.Context context,
		List<ConfigOption> usedOptions,
		List<Factory> usedFactories) {

		// perform ConfigOption set arithmetic to check for
		// invalid, missing options etc.
	}
}

...