Status

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.13

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Currently there are many problems in sql client. For example, users can't register the table source/sink proposed by FLIP-95. Sql client is a wrapper of the Table Environment where users can use commands or SQL. Therefore, Sql client has two purposes:

  • Sql client works as the entry for users to submit their sql job;
  • Sql client works as the interactive tool;

Besides, sql clients should offer users a convenientreusable way to configure the client settings. 

Proposed Changes

Configure the client 

Use commands to configure the client

Currently sql-client uses a YAML file to configure the client, which has its own grammar rather than the commands used in the client. It causes overhead for users because users have to study both gramars and it's very tricky for users to debug the YAML problems. Considering the Table Api has developed sophisticated grammar,  it's better to use the same commands in the client for users to configure their settings including set the runtime/table settings and register Catalog/Module/Database/Table/Function. 


For better understanding, the chart below lists the relationship between YAML and the command in the client.


YAMLCommands
Register Settings

register CatalogCREATE CATALOG XXXX
register ModuleLOAD MODULE XXXX
register DatabaseCREATE DATABASE XXXX
register TableCREATE TABLE XXXX
Execution Settingsset planner typeSET table.planner=blink
set execution modeSET execution.runtime-mode=batch
set parallelismSET parallism.default=1
set  periodic watermarks intervalSET pipeline.auto-watermark-interval=200
set max parallelismSET pipeline.max-parallelism=10
set min idle state retentionSET table.exec.state.ttl=1000
set max idle state retentionNo Need
set current catalogUSE CATALOG XXXX
set current databaseUSE DATABASE XXXX
set restart strategySET restart-strategy=fixed-delay
set time characteristicNo Need (Use the watermark defination in ddl to determine time  characteristic)
Deployment Settingsset savepoint pathSET execution.savepoint.path=path
allow to skip savepoint that cannot be restoredSET execution.savepoint.ignore-unclaimed-state=true
set job nameSET pipeline.name=SqlJobName
set response timeoutNo Need (Response time when determine address of the client in the socket sink)
set socket sink target addressNo Need 
set socket sink target portNo Need

Currently the TableEnvironment uses the TableResult#collect() to fetch the results. The client uses the JM as the man in the middle to communicate with the socket sink and JM knows the address and port of the client. For more details, please refer to the references[1][2]. After apply this changes to the sql-client, users don't need to set the parameters of the socket sink any more.

However, some users may need to reset some settings to the default. For example, users submit a job that is restored from the specified savepoint. But the specified savepoint path lives in the session. Every job submitted later will try to load the specified savepoint. Therefore, we propose to add a new command in the sql client.

Flink SQL> RESET <key>;

In the case mentioned above, users can use the command as follows to clear the savepoint path.

Flink SQL> RESET execution.savepoint.path;

Add -i parameter to specify the initlization files

Users can use the parameter -i --init to start up the sql client with the initialization files (separated by comma).

> ./sql-client.sh embedded -i init1.sql,init2.sql

The sql file looks as follows.

-- set up the default properties
SET sql-client.execution.mode=batch;
SET parallism.default=10;
SET pipeline.auto-watermark-interval=500;

-- set up the catalog
CREATE CATALOG HiveCatalog WITH (
  'type' = 'hive'
);

USE CATALOG HiveCatalog;

-- register the table
CREATE IF NOT EXISTS TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

CREATE IF NOT EXISTS TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews_enriched',
  ...
);

Considering the -i parameter is used to set up the environment, it's forbidden to add any INSERT statements and Queries in the initialization files.

Use the sql client as the tool to submit job

Add -f parameter to specify the execution file

Currently, users can only submit a single INSERT statement by -u command.  We proposed to add a new command line parameter -f for users to specify the execution file. Users can use the following command to start up the sql client.

> ./sql-client.sh embedded -i init1.sql init2.sql -f sqljob.sql

The sql file looks as follows.

CREATE TEMPORARY TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

-- set sync mode
SET table.dml-sync=true;

-- set the job name
SET pipeline.name=SqlJob; 

-- set the queue that the job submit to
SET yarn.application.queue=root;

-- set the job parallism
SET parallism.default=100;

-- restore from the specific savepoint path
SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;

INSERT INTO pageviews_enriched
SELECT *
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;

Sometimes, users may add multiple INSERT INTO statements in the execution files and expect to execute queries one by one in batch mode. Therefore, we would add a new option `table.dml-sync`. The default value for this option is false, which means the client will execute the next job when client submits the job. When setting this option true,  the client will execute the next job until job finishes. One more words, this option also works for the `TableEnvironment#executeSql`, which is as same as invoking `TableResult#await()` automatically.

If the sql file contains SELECT statement, it should take up the terminal until job finishes because terminal is the "sink" in the job. However, the behavior is undefined, such as how to cancel the job if the source is unbounded. In this version, the sql file doesn't allow SELECT statements.

Difference between -i and -f parameter

Users can specify the -i and -f together when starting up the sql client. In this section, we list the differences between two parameters.


-i-f
Sql file restriction

Forbid to add INSERT and Queries in the file

No retriction

Number of filesMultiple filesSingle file
Execution orderExecute at the begining of the sessionExecute after the initlization finishes

Simplify the command to start up the client

If users want to start up the sql client in embedded mode, they have to input  the command as follows.

> ./sql-client.sh embedded

It seems a little cumbersome for users because we still haven't support gateway right now and users use embedded client in most cases. Therefore, we propose a simplified command without parameter `embedded` to start up the client in the embedded mode. That means embedded is the default mode. 

> ./sql-client.sh


Use the sql client as the interaction tool

Support more interactive commands.

CommandDescriptionExample
ADD JAR <path>

Add the jar to the list of the resources.  Jar resources are dynamically added into the Java classpath. 

Once a jar is added to a session, you can then use UDFs or connectors in queries which are provided by the jar.

<Note> It only supports to add the jar in the local.

Flink SQL> ADD JAR /local/test.jar;

[Info] Add the jar into current session.
REMOVE/DELETE JAR <path>

Delete the jar from the list of the resource with specified path. Jar resource are dynamically deleted from the Java classpath.


<Note> The default dialect supports REMOVE JAR and hive dialect only supports DELETE JAR.

Flink SQL> REMOVE JAR /local/test.jar;

[Info] Delete the jar from current session.
SHOW/LIST JARS

List the user added jars.


<Note> The default dialect supports SHOW JARS and hive dialect only supports LIST JARS.

Flink SQL> SHOW JARS;
/local/addedJar.jar
/local/usedJar.jar

Flink SQL>
SHOW CREATE TABLE [[<catalog>.]<database>.]<table name>Print the ddl that creates the specified table. Currently, it only supports to get the ddl of the table that is created by the ddl statement.
Flink SQL> SHOW CREATE TABLE MyTable;
CREATE TABLE `MyTable` (
  `id` INT,
  `value` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
)

EXPLAIN [ExplainDetail[, ExplainDetail]*] <statement>


ExplainDetail: {

  ESTIMATED_COST,

  CHANGELOG_MODE,

  JSON_EXECUTION_PLAN

}

Print the plan for the statement with specified ExplainDetails. 

  • ESTIMATED_COST

generates cost information on physical node estimated by optimizer, e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory})

  • CHANGELOG_MODE

generates changelog mode for every physical rel node. e.g. GroupAggregate(..., changelogMode=[I,UA,D])

  • JSON_EXECUTION_PLAN

generates the execution plan in json format of the program.


Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG SELECT * FROM MyTable;
...


What's more, we will add a new option 'sql-client.verbose' to allow users to print the exception stack when an error happens, because currently only exception message is shown on the terminal which is not useful for debugging problems. 

Support Multiple DML

Currently we have discussed the grammar about multi-line statements[3] as follows. 

BEGIN STATEMENT SET;
INSERT INTO ... SELECT ...;
INSERT INTO ... SELECT ...;
END;

In the sql-client, users should submit statements as follows.

Flink SQL> BEGIN STATEMENT SET;

[Info] Begin the statement set.

Flink SQL> INSERT INTO emps1 SELECT * FROM emps(x, y);

[Info] Add the statement into the statement set.

Flink SQL> INSERT INTO emps2 SELECT * FROM emps(x, y);

[Info] Add the statement into the statement set.

Flink SQL> END;

[Info] Submitting SQL statement set to the cluster... 

If users press CRTL+C after input BEGIN STATEMENT SET but before input END statement, the client will clear the statments in the set and exit statement set mode. 

Implementation Details

In our blueprint, we will maintain only one parser. It will first use regex to match simple commands, e.g. SET Commands. If the regex doesn't find any avaliable commands, the statement will be delegated to the underlying Calcite parser. 


For better understanding, the chart below demonstrates which commands belong to the sql-clinet and which command belong to the table environment.

CommandsTableEnv/SqlClient

CREATE Statements

Table Environment

DROP Statements

Table Environment

ALTER Statements

Table Environment

INSERT Statements

Table Environment

DESCRIBE Statements

Table Environment

EXPLAIN Statements

Table Environment

USE Statements

Table Environment

SHOW Statements

Table Environment
SHOW CREATE TABLE StatementsTable Environment

SET (<key>=<value>)

Table Environment
RESET (<key>)Table Environment

ADD JAR <jar path>

SQL Client

REMOVE/DELETE JAR  <jar path>

SQL Client

SHOW/LIST JARS

SQL Client

HELP

SQL Client

EXIT/QUIT

SQL Client

When use -f to submit job, we will create a DumpTerminal(InputStream, OutputStream). The input stream is a FileStream created from the specified file and the output stram is std output. In this way, we can unify the execution. 

Summary

After this FLIP finishes, the sql client will have the options, commands and command line parameters as follows. 

TableEnvironment

Added Option in table environment

OptionDefaultTypeDescription
table.planner'blink'StringDetermine use which planner. Avaliable values are 'blink' and 'old'. It only works when creating environment. If used after initialization, Flink should
throw an exception. Among other things, the different planners also differ from time semantics and so on.
table.dml-syncfalsebooleanDetermine the mode to execute dml. It will execute the next statement when submit the current job in default. If set true, it will execute the next statement when  the current job finishes.

Add `TableEnvironment.create(Configuration)`

In sql client and table environment, we can create table environment from the specified options in the configuration.

Supported Option in sql client

OptionDefaultTypeDescription
sql-client.execution.max-table-result.rows
1000000
intMaximum number of maintained rows in 'table' mode.
sql-client.verbosefalsebooleanDetermine whether to output the verbose output to the console. If set the option true, it will print the exceptio stack. Otherwise, it only output the cause.
sql-client.execution.result-mode`table`EnumDetermine the type of the result mode. Available values are `table`, `tableau`, `changelog`.

Supported Command in sql client

We use '+', '-' to identify the added and deprecated commands.

StatusCommandDescripiton

CREATE Statements

Create the Catalog/Database/Table/Funciton or load the module. 

DROP Statements

Drop the Catalog/Database/Table/Funciton or unload the module.

ALTER Statements

Modified a registered table/view/function definition in the catalog

INSERT Statements

Add rows to a table.

DESCRIBE Statements

Descirbe the schema of the table.
      +

EXPLAIN Statements

Explain the plan of the statement. Additinally support explain detail options syntax. 

USE Statements

Set the current database or catalog

SHOW Statements

List all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or show current catalog and database, or list all functions including temp system functions, system functions, temp catalog functions and catalog functions in the current catalog and the current database.
     +SHOW CREATE TABLE StatementsPrint the ddl that creates the specified table. Currently, it only supports to get the ddl of the table that is created by the ddl statement.

SET (<key>=<value>)

Set a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.

+RESET (<key>)

Reset a session configuration property. Syntax: 'RESET <key>;'. Use 'RESET;' to reset all session configuration properties.

+

ADD JAR <jar path>

Add the jar to the list of the resources. Note: it only supports to add the jar in the local.

<NOTE> This is a sql client command.

+

REMOVE/DELETE JAR  <jar path>

Delete the jar from the list of the resource with specified path.

<NOTE>This is a sql client command.

+

SHOW/LIST JARS

List the user added jars.

<NOTE> This is a sql client command.


HELP

Print the available commands.

<NOTE> This is a sql client command.


EXIT/QUIT

Quit the SQL CLI client.

<NOTE> This is a sql client command.

Supported command line parameter

We use '+', '-' to identify the added and deprecated parameters.

StatusCommand line parameterDescription
--d,--defaults <environment file>The environment properties with which every new session is initialized. Properties might be overwritten by session properties.
- -e,--environment <environment file> The environment properties to be imported into the session. It might overwrite default environment properties.
+-i,--init <files>Allow users to specify the initialization files.
+-f <file>Allow users to specify the execution file.

-h, --helpShow the help message with descriptions of all options.

-hist,--history <History file path>The file which you want to save the command history into. If not specified, we will auto-generate one under your user's home directory.

-j,--jar <JAR file>A JAR file to be imported into the session. The file might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times.

-l,--library <JAR directory>A JAR file directory with which every new session is initialized. The files might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times.

-pyarch,--pyArchives <arg>Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g.: --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutablepy37.zip/py37/bin/python). The data files could be accessed in PythonUDF, e.g.: f = open('data/data.txt','r').

-pyexec,--pyExecutable <arg> Specify the path of the python interpreter used to execute the python UDF worker (e.g.: --pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements.

-pyfs,--pyFiles <pythonFiles> Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma(',') could be used as the separator to specify multiple files (e.g.: –-pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).

-pyreq,--pyRequirements <arg>Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists (e.g.: –- pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).

-s,--session <session identifier> The identifier for a session. 'default' is the default identifier.
-

-u,--update <SQL update statement>

Experimental (for testing only!): Instructs the SQL Client to immediately execute the given update statement after starting up. The process is shut down after the statement has been submitted to the cluster and returns an appropriate return code. Currently, this feature is only supported for INSERT INTO statements that declare the target sink table.


Compatibility, Deprecation, and Migration Plan

This FLIP is an improvment for sql client.  Compatibility is not affected immediately.

The YAML initialization file will be deleted in this version and command line parameters related to the YAML file are deprecated. But we will continue to support the Yaml file if users specify explicitly. In the future, we may totally remove the support of Yaml file to configure the client properties.

References

[1]https://issues.apache.org/jira/browse/FLINK-14807 

[2]https://docs.google.com/document/d/13Ata18-e89_hAdfukzEJYreOg2FBZO_Y0RohLDAme6Y/edit#

[3]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-SQL-Syntax-for-Table-API-StatementSet-td42515.html