Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Most connectors have been migrated to FLIP-95[1], but there are still the Filesystem and Hive that have not been migrated. They have some requirements on table connector API. And users also have some additional requirements:
- Users have customized parallelism configuration requirements for source and sink. [2]
- Some connectors have the ability to infer parallelism, the parallelism is good for most cases.
- The connectors need to use topology to build their source/sink instead of a single function. Like JIRA[3], Partition Commit feature[4] and File Compaction feature[5].
Public Interfaces
ParallelismProvider
@PublicEvolving
public interface ParallelismProvider {
/**
* Gets the parallelism for this contract instance. The parallelism
* denotes how many parallel instances of the user source/sink will be
* spawned during the execution.
*
* @return empty if the connector does not want to provide parallelism,
* then the planner will decide the number of parallel instances by
* itself.
*/
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}
Sink Customized parallelism
The following interfaces inherits ParallelismProvider:
- SinkFunctionProvider
- OutputFormatProvider
The user specifies the customized parallelism through connector options:
Option | Type | Default value |
sink.parallelism | Integer | None - Chained: Use upstream parallelism - Non-Chained: Use global parallelism setting |
SourceProvider
ScanRuntimeProvider for FLIP-27:
/**
* Provider of a {@link Source} instance as a runtime implementation for {@link ScanTableSource}.
*/
@PublicEvolving
public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider {
/**
* Helper method for creating a static provider.
*/
static SourceProvider of(Source<RowData, ?, ?> source) {
return new SourceProvider() {
@Override
public Source<RowData, ?, ?> createSource() {
return source;
}
@Override
public boolean isBounded() {
return Boundedness.BOUNDED.equals(source.getBoundedness());
}
};
}
/**
* Creates a {@link Source} instance.
*/
Source<RowData, ?, ?> createSource();
}
The SourceProvider doesn’t need to provide “WatermarkStrategy”, there is no need to push the watermark strategy down to the source because the new Source API is already a flexible interface.
DataStream Provider
This is only for advanced connector developers.
DataStream is the only Public API to take this role.
DataStream providers can not inherit ParallelismProvider.
Scan
@PublicEvolving
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {
/**
* Creates a scan {@link DataStream} from {@link StreamExecutionEnvironment}.
*/
DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv);
}
Can FLIP-27 help in such cases? I think FLIP-27 can help all internal connector cases, but I am not sure about user connector cases.
A clear problem is migration. The connectors implemented by users before are complex topologies constructed by DataStream.
Theoretically, they can be migrated to FLIP-27, but the migration may be more difficult. For example, the connector developed by some platform users often has very flexible mechanisms in the connector, and these mechanisms often rely on DataStream. (DataStream can process data more conveniently and flexibly)
And users need the features of new TableSource and TableSink.
Sink
@PublicEvolving
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
/**
* Consume the DataStream and return the sink transformation {@link DataStreamSink}.
*/
DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);
}
Future Plan
Source parallelism
We will not provide the function of inference at first, because the source of DataStream does not have this capability.
When the underlying source provides this capability, we will consider exposing it in the SQL layer.
At present, the final state of the source parallelism setting is not clear. So consider shelving the parallelism of source. Let's focus on the parallelism setting of sink. (users specify the parallelism on sink)
Customized Scan parallelism
The following interfaces inherits ParallelismProvider:
- SourceFunctionProvider
- InputFormatProvider
- SourceProvider
The user specifies the customized parallelism through connector options:
Option | Type | Default value |
scan.parallelism | Integer | None (Use global parallelism setting) |
Infer Scan parallelism
Connector | Can infer Source or Sink | How to infer |
Kafka | Unbounded Source | Infer by partitions |
Filesystem / Hive / Iceberg | Bounded Source | Infer by split numbers |
JDBC/HBase | Bounded Source | Infer by split numbers |
Elasticsearch | None |
As can be seen, most connectors infer parallelism according to split numbers, and only infer source parallelism. But it can't rule out that users have customized parallelism inference mode.
User can control inference logical by connector options:
Option | Type | Default value |
scan.infer-parallelism.enabled | Boolean | True |
scan.infer-parallelism.max | Integer | None (Use global parallelism setting) |
(The global parallelism setting is StreamExecutionEnvironment.getParallelism, in table, it can be configured by “table.exec.resource.default-parallelism”)
Choose Scan Parallelism
A pseudo code to show the relationship between parallelism options:
def chooseScanParallelism(
`scan.parallelism`: Option[Int],
`scan.infer-parallelism.enable`: Boolean,
`scan.infer-parallelism.max`: Option[Int],
inferParallelism: () => Int,
globalParallelism: Int): Int = {
`scan.parallelism` match {
case None => `scan.infer-parallelism.enable` match {
case true => `scan.infer-parallelism.max` match {
case None =>
// default
Math.min(inferParallelism(), globalParallelism)
case Some(p) => Math.min(inferParallelism(), p)
}
case false => globalParallelism
}
case Some(p) => p
}
}
How do users choose:
- Default (Most cases), min(inferredParallelism, globalParallelism):
- For most Kafka users, it can meet the requirements.
- Limit inferred parallelism, configure `scan.infer-parallelism.max`:
- Resources are limited, for example, FileSystem/Hive may produce too many parallelism, we can use smaller parallelism, part by part run.
- Connector storage cannot be accessed too much at the same time.
- Fixed parallelism, configure `scan.parallelism` or use global parallelism:
- For example, clients cannot access Kafka cluster or filesystem. So we can't use parallelism inference.
Rejected Alternatives
SupportsParallelismReport
In [6], we have discussed SupportsParallelismReport interface:
> Kurt: Regarding the interface SupportsParallelismReport, first of all, my feeling is that such a mechanism is not like other abilities like SupportsProjectionPushDown. Parallelism of source operators would be decided anyway, the only difference here is whether it's decided purely by framework or by table source itself. So another angle to understand this issue is, we can always assume a table source has the ability to determine the parallelism. The table source can choose to set the parallelism by itself, or delegate it to the framework. This might sound like personal taste, but there is another bad case if we introduce the interface. You may already know we currently have two major table sources, LookupTableSource and ScanTableSource. IIUC it won't make much sense if the user provides a LookupTableSource and also implements SupportsParallelismReport.
Moreover, the parallelism reporting is not orthogonal to DataStream Providers.
[3]https://issues.apache.org/jira/browse/FLINK-18674
[5]https://issues.apache.org/jira/browse/FLINK-19345
Google document: https://docs.google.com/document/d/1S5B4UPQGfzZX2Sb6gYOvFrcRnYireXwZq5XX0W27s_k/edit?usp=sharing
(Please keep the discussion on the mailing list for relatively large opinion)