Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread: 

JIRA: 

...

Page properties


Discussion threadhttps://lists.apache.org/thread/x75h151nd028b6c6wm67fg21nnddxndf
Vote threadhttps://lists.apache.org/thread/59mj7nwkjn00f140nfftomfqmtrzdpg5
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19719

Release1.13


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Most connectors have been migrated to FLIP-95[1], but there are still the Filesystem and Hive that have not been migrated. They have some requirements on table connector API. And users also have some additional requirements:

  • Users have customized parallelism configuration requirements for source and sink. [2]
  • Some connectors have the ability to infer parallelism, the parallelism is good for most cases.
  • The connectors need to use topology to build their source/sink instead of a single function. Like JIRA[3], Partition Commit feature[4] and File Compaction feature[5]. 

Public Interfaces

ParallelismProvider

@PublicEvolving

public interface ParallelismProvider {

  /**

   * Gets the parallelism for this contract instance. The parallelism    

   * denotes how many parallel instances of the user source/sink will be 

   * spawned during the execution.

   *

   * @return empty if the connector does not want to provide parallelism,     

   * then the planner will decide the number of parallel instances by 

   * itself.

   */

  default Optional<Integer> getParallelism() {

     return Optional.empty();

  }

}

Sink Customized parallelism

The following interfaces inherits ParallelismProvider:

  • SinkFunctionProvider
  • OutputFormatProvider


The user specifies the customized parallelism through connector options:

Option

Type

Default value

sink.parallelism

Integer

None

- Chained: Use upstream parallelism

- Non-Chained: Use global parallelism setting

SourceProvider

ScanRuntimeProvider for FLIP-27:

/**

* Provider of a {@link Source} instance as a runtime implementation for {@link ScanTableSource}.

*/

@PublicEvolving

public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider {

  /**

   * Helper method for creating a static provider.

   */

  static SourceProvider of(Source<RowData, ?, ?> source) {

     return new SourceProvider() {

        @Override

        public Source<RowData, ?, ?> createSource() {

           return source;

        }

        @Override

        public boolean isBounded() {

           return Boundedness.BOUNDED.equals(source.getBoundedness());

        }

     };

  }

  /**

   * Creates a {@link Source} instance.

   */

  Source<RowData, ?, ?> createSource();

}

The SourceProvider doesn’t need to provide “WatermarkStrategy”, there is no need to push the watermark strategy down to the source because the new Source API is already a flexible interface. 

DataStream Provider

This is only for advanced connector developers.

DataStream is the only Public API to take this role.

DataStream providers can not inherit ParallelismProvider.

Scan


@PublicEvolving

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {

  /**

   * Creates a scan {@link DataStream} from {@link StreamExecutionEnvironment}.

   */

  DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv);

}

Can FLIP-27 help in such cases? I think FLIP-27 can help all internal connector cases, but I am not sure about user connector cases.

A clear problem is migration. The connectors implemented by users before are complex topologies constructed by DataStream.

Theoretically, they can be migrated to FLIP-27, but the migration may be more difficult. For example, the connector developed by some platform users often has very flexible mechanisms in the connector, and these mechanisms often rely on DataStream. (DataStream can process data more conveniently and flexibly)

And users need the features of new TableSource and TableSink.

Sink


@PublicEvolving

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider {

  /**

   * Consume the DataStream and return the sink transformation {@link DataStreamSink}.

   */

  DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);

}

Future Plan

Source parallelism

We will not provide the function of inference at first, because the source of DataStream does not have this capability.

When the underlying source provides this capability, we will consider exposing it in the SQL layer.

At present, the final state of the source parallelism setting is not clear. So consider shelving the parallelism of source. Let's focus on the parallelism setting of sink. (users specify the parallelism on sink)

Customized Scan parallelism

The following interfaces inherits ParallelismProvider:

  • SourceFunctionProvider
  • InputFormatProvider
  • SourceProvider


The user specifies the customized parallelism through connector options:

Option

Type

Default value

scan.parallelism

Integer

None (Use global parallelism setting)

Infer Scan parallelism

Connector

Can infer Source or Sink

How to infer

Kafka

Unbounded Source

Infer by partitions

Filesystem / Hive / Iceberg

Bounded Source

Infer by split numbers

JDBC/HBase

Bounded Source

Infer by split numbers

Elasticsearch

None



As can be seen, most connectors infer parallelism according to split numbers, and only infer source parallelism. But it can't rule out that users have customized parallelism inference mode.

User can control inference logical by connector options:

Option

Type

Default value

scan.infer-parallelism.enabled

Boolean

True

scan.infer-parallelism.max

Integer

None (Use global parallelism setting)


(The global parallelism setting is StreamExecutionEnvironment.getParallelism, in table, it can be configured by “table.exec.resource.default-parallelism”)

Choose Scan Parallelism

A pseudo code to show the relationship between parallelism options:

def chooseScanParallelism(

   `scan.parallelism`: Option[Int],

   `scan.infer-parallelism.enable`: Boolean,

   `scan.infer-parallelism.max`: Option[Int],

   inferParallelism: () => Int,

   globalParallelism: Int): Int = {

 `scan.parallelism` match {

   case None    => `scan.infer-parallelism.enable` match {

     case true  => `scan.infer-parallelism.max` match {

       case None    =>

         // default

         Math.min(inferParallelism(), globalParallelism)

       case Some(p) => Math.min(inferParallelism(), p)

     }

     case false => globalParallelism

   }

   case Some(p) => p

 }

}

How do users choose:

  • Default (Most cases), min(inferredParallelism, globalParallelism):
    • For most Kafka users, it can meet the requirements.
  • Limit inferred parallelism, configure `scan.infer-parallelism.max`:
    • Resources are limited, for example, FileSystem/Hive may produce too many parallelism, we can use smaller parallelism, part by part run.
    • Connector storage cannot be accessed too much at the same time.
  • Fixed parallelism, configure `scan.parallelism` or use global parallelism:
    •  For example, clients cannot access Kafka cluster or filesystem. So we can't use parallelism inference.

Rejected Alternatives

SupportsParallelismReport

In [6], we have discussed SupportsParallelismReport interface:

> Kurt: Regarding the interface SupportsParallelismReport, first of all, my feeling is that such a mechanism is not like other abilities like SupportsProjectionPushDown. Parallelism of source operators would be decided anyway, the only difference here is whether it's decided purely by framework or by table source itself. So another angle to understand this issue is, we can always assume a table source has the ability to determine the parallelism. The table source can choose to set the parallelism by itself, or delegate it to the framework. This might sound like personal taste, but there is another bad case if we introduce the interface. You may already know we currently have two major table sources, LookupTableSource and ScanTableSource. IIUC it won't make much sense if the user provides a LookupTableSource and also implements SupportsParallelismReport.

Moreover, the parallelism reporting is not orthogonal to DataStream Providers.

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

[2]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-source-sink-parallelism-config-in-Flink-sql-td44556.html

[3]https://issues.apache.org/jira/browse/FLINK-18674

[4]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#partition-commit

[5]https://issues.apache.org/jira/browse/FLINK-19345

[6]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-SupportsParallelismReport-and-SupportsStatisticsReport-for-Hive-and-Filesystem-td43531.html


Google document: Google document:  Widget Connectorurlhttps://docs.google.com/document/d/1S5B4UPQGfzZX2Sb6gYOvFrcRnYireXwZq5XX0W27s_k/edit?usp=sharing

(Please keep the discussion on the mailing list for relatively large opinion)