Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

threadhttps://

...

...

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

thread/xp87713sdky9xz51hb2ltt7c4bl3fvxw
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-16987

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Corresponding factory interfaces:

  • Factory
  • DynamicTableFactory extends Factory
  • DynamicTableSourceFactory extends DynamicTableFactory
  • DynamicTableSinkFactory extends DynamicTableFactory
  • FormatFactory extends Factory

Optional interfaces that add further abilities:

...

  • RowData
  • ArrayData
  • MapData
  • StringData
  • DecimalData
  • TimestampData
  • RawValueData
  • GenericRowData implements RowData
  • GenericArrayData implements ArrayData
  • GenericMapData implements MapData

Main interfaces as well as interfaces, factory interfaces, some ability interfaces, and data structure interfaces will be discussed below.

Factory interfaces work similar to the current table source/sink factories. Other abilities remain unchanged.

 Other abilities remain unchanged.

Source Source Interfaces

DynamicTableSource

/**
 * Source of a dynamic table from an external storage system.
 *
 * <p>A dynamic table source can be seen as a factory that produces concrete runtime implementation.
 *
 * <p>Depending on the optionally declared interfaces such as {@link SupportsComputedColumnPushDown} or
 * {@link SupportsProjectionPushDown}, the planner might apply changes to instances of this class and thus
 * mutates the produced runtime implementation.
 */
@PublicEvolving
public interface DynamicTableSource {

	/**
	 * Returns a string that summarizes this source for printing to a console or log.
	 */
	String asSummaryString();

	/**
	 * Creates a copy of this instance during planning.
	 */
	DynamicTableSource copy();

	// --------------------------------------------------------------------------------------------
	// Helper Interfaces
	// --------------------------------------------------------------------------------------------

	/**
	 * Converts data structures during runtime.
	 */
	interface DataStructureConverter extends RuntimeConverter {

		/**
		 * Converts the given object into an internal data structure.
		 */
		@Nullable Object toInternal(@Nullable Object externalStructure);
	}
}

...

/**
 * A {@link DynamicTableSource} that scans all rows from an external storage system.
 *
 * <p>Depending on the specified {@link ChangelogMode}, the scanned rows must not contain only
 * insertions but can also contain updates and deletions.
 */
@PublicEvolving
public interface ScanTableSource extends DynamicTableSource {

	/**
	 * Returns what kind of changes are produced by this source.
	 *
	 * @see RowKind
	 */
	ChangelogMode getChangelogMode();

	/**
	 * Returns the actual implementation for reading the data.
	 */
	ScanRuntimeProvider getScanRuntimeProvider(Context context);

	// --------------------------------------------------------------------------------------------
	// Helper Interfaces
	// --------------------------------------------------------------------------------------------

	interface Context {

		/**
		 * Returns the user code class loader.
		 */
		ClassLoader getUserClassLoader();

		/**
		 * Creates type information describing the internal data structures of the given
		 * {@link DataType}.
		 */
		TypeInformation<?> createTypeInformation(DataType producedDataType);

		/**
		 * Creates a runtime data structure converter that converts data of the given {@link DataType}
		 * to Flink's internal data structures.
		 *
		 * <p>Allows to implement runtime logic without depending on Flink's internal structures for
		 * timestamps, decimals, and structured types.
		 *
		 * @see LogicalType#supportsInputConversion(Class)
		 */
		DataStructureConverter createDataStructureConverter(DataType producedDataType);
	}

	/**
	 * Actual implementation for reading the data.
	 */
	interface ScanRuntimeProvider {

		/**
		 * Whether the data is bounded or not.
		 */
		boolean isBounded();
	}
}

...

/**
 * Allows to push down filters into a {@link ScanTableSource}.
 */
@PublicEvolving
public interface SupportsFilterPushDown {

	voidResult applyFilters(List<ResolvedExpression> filters);

	final class Result {
		private final List<ResolvedExpression> acceptedFilters;
		private final List<ResolvedExpression> remainingFilters;

		protected Result(
				List<ResolvedExpression> acceptedFilters,
				List<ResolvedExpression> remainingFilters) {
			this.acceptedFilters = acceptedFilters;
			this.remainingFilters = remainingFilters;
		}

		public List<ResolvedExpression> getAcceptedFilters() {
			return acceptedFilters;
		}

		public List<ResolvedExpression> getRemainingFilters() {
			return remainingFilters;
		}
	}
}

/**
 * Allows to push down (possibly nested) projections into a {@link ScanTableSource}.
 */
@PublicEvolving
public interface SupportsProjectionPushDown {

	boolean supportsNestedProjectionPushedDown();

	void applyProjection(TableSchema schema);
}

...

/**
 * A {@link DynamicTableSource} that looks up rows of an external storage system by one or more
 * keys.
 */
@PublicEvolving
public interface LookupTableSource extends DynamicTableSource {

	/**
	 * Returns the actual implementation for reading the data.
	 */
	LookupRuntimeProvider getLookupRuntimeProvider(Context context);

	// --------------------------------------------------------------------------------------------
	// Helper Interfaces
	// --------------------------------------------------------------------------------------------

	interface Context {

		/**
		 * Returns the key fields that should be used during the lookup.
		 */
		List<FieldReferenceExpression> getKeyFields();

		/**
		 * Creates a runtime data structure converter that converts data of the given {@link DataType}
		 * to Flink's internal data structures.
		 *
		 * <p>Allows to implement runtime logic without depending on Flink's internal structures for
		 * timestamps, decimals, and structured types.
		 *
		 * @see LogicalType#supportsInputConversion(Class)
		 */
		DataStructureConverter createDataStructureConverter(DataType producedDataType);
	}

	interface LookupRuntimeProvider {
		// marker interface
	}
}

/**
 * Uses a {@link TableFunction} during runtime for reading.
 */
public interface TableFunctionProvider<T> extends LookupTableSource.LookupRuntimeProvider {

	TableFunction<T> createTableFunction();

	static <T> TableFunctionProvider<T> of(TableFunction<T> tableFunction) {
		return () -> tableFunction;
	}
}

/**
 * Uses a {@link AsyncTableFunction} during runtime for reading.
 */
public interface AsyncTableFunctionProvider<T> extends LookupTableSource.LookupRuntimeProvider {

	AsyncTableFunction<T> createAsyncTableFunction();

	static <T> AsyncTableFunctionProvider<T> of(AsyncTableFunction<T> tableFunction) {
		return () -> tableFunction;
	}
}

Sink Interfaces

`DynamicTableSink` works similar to `ScanTableSource`. A more complex interface hierarchy is not required currently.

...

/**
 * Sink of a dynamic table to an external storage system.
 *
 * <p>A dynamic table sink can be seen as a factory that produces concrete runtime implementation.
 *
 * <p>Depending on optionally declared interfaces such as {@link SupportsPartitioning}, the planner
 * might apply changes to instances of this class and thus mutates the produced runtime
 * implementation.
 */
@PublicEvolving
public interface DynamicTableSink {

	/**
	 * Returns a string that summarizes this sink for printing to a console or log.
	 */
	String asSummaryString();

	/**
	 * Returns the {@link ChangelogMode} that this writer consumes.
	 *
	 * <p>The runtime can make suggestions but the sink has the final decision what it requires. If
	 * the runtime does not support this mode, it will throw an error. For example, the sink can
	 * return that it only supports {@link RowKind#INSERT}s.
	 *
	 * @param requestedMode expected kind of changes by the current plan
	 */
	ChangelogMode getChangelogMode(ChangelogMode requestedMode);

	/**
	 * Returns the actual implementation for writing the data.
	 */
	SinkRuntimeProvider getSinkRuntimeProvider(Context context);

	// --------------------------------------------------------------------------------------------
	// Helper Interfaces
	// --------------------------------------------------------------------------------------------

	interface Context {

		/**
		 * Returns the user code class loader.
		 */
		ClassLoader getUserClassLoader();

		/**
		 * Creates a runtime data structure converter that converts Flink's internal data structures
		 * to data of the given {@link DataType}.
		 *
		 * <p>Allows to implement runtime logic without depending on Flink's internal structures for
		 * timestamps, decimals, and structured types.
		 *
		 * @see LogicalType#supportsOutputConversion(Class)
		 */
		DataStructureConverter createDataStructureConverter(DataType consumedDataType);
	}

	/**
	 * Converts data structures during runtime.
	 */
	interface DataStructureConverter extends RuntimeConverter {

		/**
		 * Converts the given object into an external data structure.
		 */
		@Nullable Object toExternal(@Nullable Object internalStructure);
	}

	interface SinkRuntimeProvider {
		// marker interfaceRuntimeConverter {

		/**
		 * Converts the given object into an external data structure.
		 */
		@Nullable Object toExternal(@Nullable Object internalStructure);
	}

	interface SinkRuntimeProvider {
		// marker interface
	}
}

Factory Interfaces

Because we need new factories for the new source and sink interfaces, it is the right time to look at the big picture of how factories, connectors, and their formats play together.

Also, with the recent decision of unifying the Flink configuration experience by only using `ConfigOption`, the old factory interfaces with the `DescriptorProperties` util needs an update.

Factory

Because `TableFactory` was the superclass of all `flink-table` related factories, it confused users to use a table factory for modules, catalogs (not table related). The `supportedProperties` caused a long list of keys also for schema and format wildcards, thus, caused a lot of duplicate code.

The validation was partially done by the factory service and the factory itself. We suggest to perform the validation only in the factory. By using `CatalogTable` directly, the schema part must not be validated anymore.

For simplifying the discovery, we only match by one identifier (+ version). In theory, we could also use the class name itself, but this would make it more difficult to insert a custom factory without changing DDL properties.

/**
 * A factory for creating instances from {@link ConfigOption}s in the table ecosystem. This
 * factory is used with Java's Service Provider Interfaces (SPI) for discovery.
 */
@PublicEvolving
public interface Factory {

	/**
	 * Uniquely identifies this factory when searching for a matching factory. Possibly versioned
	 * by {@link #factoryVersion()}.
	 */
	String factoryIdentifier();

	/**
	 * Extends a {@link #factoryIdentifier()} by a version when searching for a matching factory.
	 */
	Optional<String> factoryVersion();

	/**
	 * Definition of required options for this factory. The information will be used for generation
	 * of documentation and validation. It does not influence the discovery of a factory.
	 */
	Set<ConfigOption<?>> requiredOptions();

	/**
	 * Definition of optional options for this factory. This information will be used for generation
	 * of documentation and validation. It does not influence the discovery of a factory.
	 */
	Set<ConfigOption<?>> optionalOptions();
}

DynamicTableFactory, DynamicTableSourceFactory, DynamicTableSinkFactory

/**
 * Factory for accessing a dynamic table for reading or writing.
 */
public interface DynamicTableFactory extends Factory {

	/**
	 * Information about the accessed table.
	 */
	interface Context {

		/**
		 * Identifier of the table in the catalog.
		 */
		ObjectIdentifier getObjectIdentifier();

		/**
		 * Table information received from the {@link Catalog}.
		 */
		CatalogTable getCatalogTable();

		/**
		 * Configuration of the current session.
		 */
		ReadableConfig getConfiguration();

		/**
		 * Class loader of the current session.
		 */
		ClassLoader getClassLoader();
	}
}

/**
 * Factory for reading a dynamic table.
 */
public interface DynamicTableSourceFactory extends DynamicTableFactory {

	/**
	 * Factory method for creating a {@link DynamicTableSource}.
	 */
	DynamicTableSource createDynamicTableSource(Context context);
}

/**
 * Factory for writing a dynamic table.
 */
public interface DynamicTableSinkFactory extends DynamicTableFactory {

	/**
	 * Factory method for creating a {@link DynamicTableSink}.
	 */
	DynamicTableSink createDynamicTableSource(Context context);
}

FormatFactory, DeserializationFormatFactory

For a smooth interplay, we also need to consider the relationship between connectors and formats.

We propose the following format interfaces. And illustrate their usage in the following example.

/**
 * Base interface for all kinds of formats.
 */
public interface FormatFactory extends Factory {

	/**
	 * Determines the changelog mode for this format.
	 */
	ChangelogMode deriveChangelogMode(DynamicTableFactory.Context tableContext);
}

/**
 * Factory for creating a {@link DeserializationSchema} that returns internal data structures.
 */
public interface DeserializationFormatFactory extends FormatFactory {

	/**
	 * Creates {@link DeserializationSchema} for the given produced {@link DataType} considering all
	 * contextual information.
	 */
	DeserializationSchema<RowData> createDeserializationSchema(
		DynamicTableFactory.Context tableContext,
		ScanTableSource.Context runtimeContext,
		DataType producedDataType);
}

For MySQL or Postgres CDC logs, the format should be able to return a `ChangelogMode`. Formats that don't produce changes, can simply return an insert-only changelog mode.

Because the final schema that formats need to handle is only known after optimization, we postpone instantiating concrete runtime interfaces such as `DeserializationSchema` to the `getXXXRuntimeProvider` methods.

However, the discovery and validation of the factory is done earlier. 


class KafkaSourceFactory extends DynamicTableSourceFactory {
	
	// ...

	DynamicTableSource createDynamicTableSource(Context context) {
		// perform format factory discovery
		TableFormatFactory keyFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, KEY_OPTION);
		TableFormatFactory valueFormatFactory = FactoryUtil.find(TableFormatFactory.class, context, VALUE_OPTION);

		// validate using required and optional options of each factory
		// also validates if there are left-over keys
		FactoryUtil.validate(
			context,
			Arrays.asList(KEY_OPTION, VALUE_OPTION),
			Arrays.asList(this, keyFormatFactory, valueFormatFactory));

		ChangelogMode mode = valueFormatFactory.createChangelogMode(context);

		// construct the source
		return KafkaTableSource.builder(context)
			.changelogMode(mode) // or extract it internally from the valueFormatFactory
			.keyFormatFactory(keyFormatFactory)
			.valueFormatFactory(valueFormatFactory)
			.build();
	}
}

class FactoryUtil {
	// ...

	void validate(
		DynamicTableFactory.Context context,
		List<ConfigOption> usedOptions,
		List<Factory> usedFactories) {

		// perform ConfigOption set arithmetic to check for
		// invalid, missing options etc.
	}
}


Data Structure Interfaces

...

Locations where `TableSource`s are exposed such as `fromTableSource()` are already deprecated or will be deprecated. It is recommended that users use the `connect()` API or DDL.

The legacy planner has different internal data structures and limited functionality in general. Therefore, we will not support the new interfaces in the legacy planner.

Test Plan

Unit tests will test the interfaces. Existing connector tests will verify the implementation.

...