Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

 

Status

Current state"Under Discussion"

Discussion thread: http://mail-archives.apache.org/mod_mbox/flink-dev/201712.mbox/%3C8a9d718b-5dae-0fe2-1da6-a8d557d45582%40apache.org%3E

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-7594

...

Release1.5


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

SQL is undoubtedly the most widely used language for data analytics. It is declarative and can be optimized and efficiently executed by most query processors. The necessity to apply those concepts also to stream processors is a logical consequence for making streaming accessible to a broader audience and enable faster development without exact knowledge of the underlying runtime.

...

The goal of this FLIP is to have an initial minimum viable product for using Flink with SQL without an IDE. We will use this product to incrementally refine the requirements based on the feedback from users and contributors. Further FLIPs and design documents might follow in order to define REST/JDBC capabilities or materialized view semantics.

Public Interfaces

  • A new Maven module “flink-sql-client” with the SQL client

  • A new binary file for executing the SQL client in embedded mode

  • New default configuration files and library directory

Proposed Changes

General Architecture

The SQL Client can be executed in two modes: a gateway and embedded mode. In this FLIP we mostly focus on the embedded mode but also consider a later gateway conceptually.

...

  • The user adds catalog information to the configuration files.

  • The user starts CLI client with custom JAR files and configuration (--jar, --context).

  • The user enters SQL query and submits it.

  • The executor submits Flink job with all involved JAR files.

  • If a query is a SELECT query, the executor materializes the result such that it can be pulled by the CLI client.

  • If a query is a INSERT INTO query, the executor submits the Flink job.

  • The user can stop the running query and submit a new one.

  • Exiting the CLI would also stop the running SELECT query but not INSERT INTO queries.

Gateway Mode

Image RemovedImage Added

 

Embedded Mode

Image RemovedImage Added

Configuration

 

Independent of the execution mode, the SQL client can be configured globally (sql-defaults.conf) and/or for every CLI session (sql-context.conf). The configuration specifies settings that would programmatically be applied to a ExecutionEnvironment/StreamExecutionEnvironment and TableEnvironment. It contains catalog information as well as job specific parameters.

...

  • Pre-registered table sources

    • Name

    • Source

      • Type (e.g. Kafka, Custom)

      • Properties (e.g. topic, connection info, custom TableSource class)

    • Encoding

      • Type (e.g. Avro, JSON)

      • Schema (e.g. Avro class, JSON field names/types)

    • Rowtime descriptor/Proctime

      • Watermark strategy and Watermark properties

      • Time attribute info

    • Bucketization

    • Statistics

  • Pre-registered table sinks

    • Name

    • Sink

      • Type (e.g. Kafka, Custom)

      • Properties (e.g. destination path, output types)

  • External catalogs

    • Name

    • Properties (e.g. connection info, credentials, ExternalCatalog class)

  • User-defined functions

    • Name

    • Parameters (e.g. constructor parameters for a TableFunction)

    • Class

  • [Optional] User-defined types

    • Name

    • Field names and types

  • [Optional] User-defined local variables (@local_variable)

    • Name

    • Value

  • Job parameters

    • Batch or streaming execution

    • Parallelism

    • Maximum parallelism

    • State Backend + parameters

    • Auto watermark interval

    • Restart strategy

    • Query Config (e.g. min/max retention time)

  • [Separate configuration?] SQL client parameters

    • Gateway properties
      (e.g. database properties, server credentials)

    • CLI Client properties
      (e.g. timeouts, credentials for authenticating to gateway, result serving speed)

Result Retrieval

 

In the future, we can use different options for retrieving materialized results both for debugging purposes and long-running maintained views. The retrieval is managed by the executor.

...

The supported materialization mode also depends on the query type:


 

Query Type

Materialization Backends

Internal Mode

External Mode*

Batch

collect() -> Heap/Database

, External

File
*
table sink

Append Stream

collect() -> Heap/Database

, Kafka*, File*

Kafka/file table sink

Retract/Upsert Stream

collect() -> Heap/Database

,

(Compacted Kafka)
*, Cassandra*
 
/Cassandra table sink


We might use usual heap space at the beginning. The internal database can be any JDBC database. External materialization modes (*) are not included in the first version. In the future, Kafka would be read by general Kafka utility functions. Files as well with support for different file systems.

Result Maintenance

While batch queries have bounded results, streaming queries are potentially never-ending and, therefore, require special treatment for keeping the results up to date and consistent. The streaming query can be considered as a view and the running streaming application as the view maintenance. Results might need to be supplied to systems that were not made for streaming queries, e.g., Java applications that read from a JDBC API. In those cases, every requested result set must be a snapshot (materialized view) of the running query at a point in time. The snapshot must be immutable until all result rows have been consumed by the application or a new result set is requested.

CREATE MATERIALIZED VIEW [in future versions]

 

We distinguish between two types of results that will require different materialization semantics: a materialized view and a materialized result stream.

Materialized View

A consistent materialized view of results for production use cases. Materialized views are not part of this FLIP but might be added in future versions. It requires another design document for the DDL statement and execution but here are some properties we aim for:

SQL: CREATE MATERIALIZED VIEW ...

  • intended for long running materialization queries that are updated periodically (e.g. , for powering dashboards)

  • this could power JDBC connections

  • checkpointed

  • every hour or on successful checkpoints)

  • retractions are not visible directly, only the materialized result

  • a result can be accessed by JDBC connections or the REST API (e.g. for powering dashboards)

  • materialization operators can cooperate with Flink's checkpointing (e.g. only checkpointed results are exposed through the APIs)

  • a user can specify different parameters for how and how often the view is maintained
    (see create_mv_refresh)

  • it requires another design document for the DDL statement and execution

SELECT

 

  • runs detached from the CLI client

Materialized Result Stream

A materialized stream of results for getting immediate insights into the running SQL query.

SQL: SELECT ...

  • indended for debugging during query creation and initial show cases

  • retractions are shown as streams of deletion and insertion

  • no guarantees about checkpointed results

  • the executor abstracts the underlying representation and supplies the interfaces for accessing the materialized stream in a FIFO fashion

  • only one running query per CLI session

  • cancelled if cancelled in CLI or CLI is closed


We focus on simple SELECT queries first that are materialized on the heap of the executor (internal materialization mode).

Compatibility, Deprecation, and Migration Plan

 No compatibility changes or other deprecation necessary.

Implementation Plan

1. Basic Embedded SQL Client 

Add the basic features to play around with Flink's streaming SQL.

  • Add CLI component that reads the configuration files
    • "Pre-registered table sources"
    • "Job parameters"
  • Add executor for retrieving pre-flight information and corresponding CLI SQL parser
    • SHOW TABLES
    • DESCRIBE TABLE
    • EXPLAIN
  • Add streaming append query submission to executor

    • Submit jars and run SELECT query using the ClusterClient

    • Collect results on heap and serve them on the CLI side (Internal Mode with SELECT)

    • EXECUTE 

  • Create SQL client in embedded mode with results stored on Heap

  • CLI functionality:

    • Query Submission

    • History

    • [Optional] Highlighter

  • Included SQL operations:

  • SELECT (Batch, Append, Retract/Upsert)

  • SHOW TABLES

  • SHOW FUNCTIONS

  • EXECUTE
    • (for executing a SQL statement stored in a local file)

  • DESCRIBE TABLE

2. Full Embedded SQL Client

Add important features to fully use the SQL client for a variety of use cases.

  • Add support for streaming retract/upsert queries
  • Add support for batch queries
  • Add user-defined function support

    • Support scalar, aggregate, and table functions
    • Submit corresponding jars to the cluster
    • DESCRIBE FUNCTION
  • EXPLAIN

  • Support INSERT INTO

  • Afterwards: Discuss further features
    Such as:
  • Add CLI query history
  • Add CLI query code completion/highlighting
  • Add support for INSERT INTO
    • Read configuration about "Pre-registered table sinks"
    • Allow submission without collect()

3. Discuss/design further features

Discuss and prioritize other features that are not part of this FLIP. 

  • Introduce gateway mode with REST API

  • Add support for catalogs

  • Allow creating Materialized Views

  • Support other materialization backends

  • Create a JDBC API

  • Further SQL DDL statements:

    • CREATE TABLE

    • CREATE TYPE

  • Further CLI features:

  • Auto-completion

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.