...
The goal of this FLIP is to have an initial minimum viable product for using Flink with SQL without an IDE. We will use this product to incrementally refine the requirements based on the feedback from users and contributors. Further FLIPs and design documents might follow in order to define REST/JDBC capabilities or materialized view semantics.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log formatThe network protocol and api behaviorAny class in the public packages under clientsConfiguration, especially client configurationorg/apache/kafka/common/serializationorg/apache/kafka/commonorg/apache/kafka/common/errorsorg/apache/kafka/clients/producerorg/apache/kafka/clients/consumer (eventually, once stable)
MonitoringCommand line tools and argumentsAnything else that will likely break existing users in some way when they upgrade
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
...
A new Maven module “flink-sql-client” with the SQL client
A new binary file for executing the SQL client in embedded mode
New default configuration files and library directory
Proposed Changes
General Architecture
The SQL Client can be executed in two modes: a gateway and embedded mode. In this FLIP we mostly focus on the embedded mode but also consider a later gateway conceptually.
In gateway mode, a dedicated SQL gateway process offers APIs to which other applications can connect to. A REST API and JDBC API allow to submit queries, cancel jobs, and retrieve results. Both REST and JDBC connect to a common executor that is responsible for communicating with Flink and external catalogs. The executor also keeps state about currently running sessions. The optional SQL CLI client connects to the REST API of the gateway and allows for managing queries via console.
In embedded mode, the SQL CLI is tightly coupled with the executor in a common process. This allows for playing around with Flink quickly and submit jobs without having to start an additional components.
The communication to Flink happens through the ClusterClient. By using the ClusterClient we can ensure that the SQL client will run Flink on YARN, Mesos, and other supported environments without additional effort.
For the embedded mode, a general communication flow would like:
The user adds catalog information to the configuration files.
The user starts CLI client with custom JAR files and configuration (
--jar
,--context
).The user enters SQL query and submits it.
The executor submits Flink job with all involved JAR files.
If a query is a
SELECT
query, the executor materializes the result such that it can be pulled by the CLI client.If a query is a
INSERT INTO
query, the executor submits the Flink job.The user can stop the running query and submit a new one.
Exiting the CLI would also stop the running
SELECT
query but notINSERT INTO
queries.
Gateway Mode
Embedded Mode
Configuration
Independent of the execution mode, the SQL client can be configured globally (sql-defaults.conf
) and/or for every CLI session (sql-context.conf
). The configuration specifies settings that would programmatically be applied to a ExecutionEnvironment
/StreamExecutionEnvironment
and TableEnvironment
. It contains catalog information as well as job specific parameters.
The global configuration is located in a dedicated file and is read during startup. Global configuration applies to all CLI sessions. A context configuration file can be specified when starting the CLI client and is attached to any query executed in the current CLI session. Thus, context properties might overwrite global default properties. In future versions, the configuration might also be refined by using DDL statements such as:
CREATE TYPE...
CREATE TABLE…
DECLARE @CustomerID = 'XYZ'
DDL statements will have highest priority and overwrite the properties specified in configuration files.
Here are some properties that might need to be defined in the long run:
Pre-registered table sources
Name
Source
Type (e.g. Kafka, Custom)
Properties (e.g. topic, connection info, custom TableSource class)
Encoding
Type (e.g. Avro, JSON)
Schema (e.g. Avro class, JSON field names/types)
Rowtime descriptor/Proctime
Watermark strategy and Watermark properties
Time attribute info
Bucketization
Statistics
Pre-registered table sinks
Name
Sink
Type (e.g. Kafka, Custom)
Properties (e.g. destination path, output types)
External catalogs
Name
Properties (e.g. connection info, credentials, ExternalCatalog class)
User-defined functions
Name
Parameters (e.g. constructor parameters for a TableFunction)
Class
[Optional] User-defined types
Name
Field names and types
[Optional] User-defined local variables (@local_variable)
Name
Value
Job parameters
Batch or streaming execution
Parallelism
Maximum parallelism
State Backend + parameters
Auto watermark interval
Restart strategy
Query Config (e.g. min/max retention time)
[Separate configuration?] SQL client parameters
Gateway properties
(e.g. database properties, server credentials)CLI Client properties
(e.g. timeouts, credentials for authenticating to gateway, result serving speed)
Result Retrieval
In the future, we can use different options for retrieving materialized results both for debugging purposes and long-running maintained views. The retrieval is managed by the executor.
There are three materialization modes:
Internal mode (for small results):
collect()
into Heap/Derby/... (we decide internally how this is done)
We use DataSet/DataStream collect in order to retrieve results. They are materialized in the executor and deleted once pulled by the CLI client.External mode (for larger results):
automatic TableSink into Kafka, HDFS, DBMS, etc. (the content is accessible for the executor)
Flink writes larger results into external systems by using pre-defined TableSinks. The executor knows about the format and location and can access the information there.Custom mode (for any result):
custom TableSink into arbitrary systems
Flink writes any result into a destination that is determined by the user using INSERT INTO. The results are not accessible by the executor and can thus not be served via REST or CLI console.
The supported materialization mode also depends on the query type:
Query Type | Materialization Backends |
Batch |
|
Append Stream |
|
Retract/Upsert Stream |
|
We might use usual heap space at the beginning. The internal database can be any JDBC database. External materialization modes (*) are not included in the first version. In the future, Kafka would be read by general Kafka utility functions. Files as well with support for different file systems.
CREATE MATERIALIZED VIEW
[in future versions]
for long running materialization queries (e.g., for powering dashboards)
this could power JDBC connections
checkpointed
different parameters for how and how often the view is maintained
(see create_mv_refresh)it requires another design document for the DDL statement and execution
SELECT
for debugging during query creation and initial show cases
retractions are shown as streams of deletion and insertion
the executor abstracts the underlying representation and supplies the interfaces
only one running query per CLI session
cancelled if cancelled in CLI
We focus on simple SELECT queries first that are materialized on the heap of the executor.
Compatibility, Deprecation, and Migration Plan
No compatibility changes or other deprecation necessary.
Implementation Plan
Create SQL client in embedded mode with results stored on Heap
CLI functionality:
Query Submission
History
[Optional] Highlighter
Included SQL operations:
SELECT
(Batch, Append, Retract/Upsert)SHOW TABLES
SHOW FUNCTIONS
EXECUTE
(for executing a SQL statement stored in a local file)DESCRIBE TABLE
DESCRIBE FUNCTION
EXPLAIN
Support
INSERT INTO
Afterwards: Discuss further features
Such as:Introduce gateway mode with REST API
Add support for catalogs
Allow creating Materialized Views
Support other materialization backends
Create a JDBC API
Further SQL DDL statements:
CREATE TABLE
CREATE TYPE
Further CLI features:
Auto-completion
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.