Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...

...

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

/xp87713sdky9xz51hb2ltt7c4bl3fvxw
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-16987

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

/**
 * Base interface for all kinds of formats.
 */
public interface FormatFactory extends Factory {

	/**
	 * Determines the changelog mode for this format.
	 */
	ChangelogMode createChangelogModederiveChangelogMode(DynamicTableFactory.Context tableContext);
}

/**
 * Factory for creating a {@link DeserializationSchema} that returns internal data structures.
 */
public interface DeserializationFormatFactory extends FormatFactory {

	/**
	 * Creates {@link DeserializationSchema} for the given produced {@link DataType} considering all
	 * contextual information.
	 */
	DeserializationSchema<RowData> createDeserializationFormatcreateDeserializationSchema(
		DynamicTableFactory.Context tableContext,
		ScanTableSource.Context runtimeContext,
		DataType producedDataType);
}

...

class KafkaSourceFactory extends DynamicTableSourceFactory {
	
	// ...

	DynamicTableSource createDynamicTableSource(Context context) {
		// perform format factory discovery
		TableFormatFactory keyFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, KEY_OPTION);
		TableFormatFactory valueFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, VALUE_OPTION);

		// validate using required and optional options of each factory
		// also validates if there are left-over keys
		FactoryUtil.validate(
			context,
			Arrays.asList(KEY_OPTION, VALUE_OPTION),
			Arrays.asList<thisasList(this, keyFormatFactory, valueFormatFactory));

		ChangelogMode mode = valueFormatFactory.createChangelogMode(context);

		// construct the source
		return KafkaTableSource.builder(context)
			.changelogMode(mode) // or extract it internally from the valueFormatFactory
			.keyFormatFactory(keyFormatFactory)
			.valueFormatFactory(valueFormatFactory)
			.build();
	}
}

class FactoryUtil {
	// ...

	void validate(
		DynamicTableFactory.Context context,
		List<ConfigOption> usedOptions,
		List<Factory> usedFactories) {

		// perform ConfigOption set arithmetic to check for
		// invalid, missing options etc.
	}
}

...