You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Motivation

Currently there are many probelms in sql client. For example, users can't register the table source/sink proposed by FLIP-95. Sql client is a warpper of the Table Environment where users can use commands or SQL. Therefore, Sql client has two purposes:

  • Sql client works as the entry for users to submit their sql job;
  • Sql client works as the interactive tool;

Besides, sql clients should offer users a convient, reuseable way to configure the client settings. 

Proposed Changes

Configure the client 

Use the command to configure the client

Currently sql-client uses YAML file to configure the client, which has its own grammer rather than the commands used in the client. It causes overhead for users because users have to study and it's very tricky for users to debug line by line. Considering the Table Api has developed sophisticated grammer,  it's better to use the same commands in the client for users to configure their setting including set the runtime/table settings and register Catalog/Module/Database/Table/Function. 


For better understanding, the chart below lists the relationship between YAML and the command in the client.


YAMLCommands
Register Settings

register CatalogCREATE CATALOG XXXX
register ModuleLOAD Module
register DatabaseCREATE DATABASE XXXX
register TableCREATE TABLE XXXX
Execution Settingsset planner typeSET sql-client.planner=blink
set execution modeSET sql-client.execution.mode=batch
set parallelismSET parallism.default=1
set  periodic watermarks intervalSET pipeline.auto-watermark-interval=200
set max parallelismSET pipeline.max-parallelism=10
set min idle state retentionSET table.exec.state.ttl=1000
set max idle state retentionNo Need
set current catalogUSE CATALOG XXXX
set current databaseUSE DATABASE XXXX
set restart strategySET restart-strategy=
set time characteristicNo Need
Deployment Settingsset savepoint pathSET execution.savepoint.path=path
allow to skip savepoint that cannot be restoredSET execution.savepoint.ignore-unclaimed-state=true
set job nameSET pipeline.name=SqlJobName
set reponse timeoutNo Need

However, some users may need to reset some setttings to the default. For example, users submit a job that restored from the specified savepoint. But the specified savepont path lives in the session. Every job submitted later will try to load the specified savepoint. Therefore, we propose to add a new command in the sql client.

Flink SQL> RESET <key>;

In the case mentioned above, users can use the command as follows.

Flink SQL> RESET execution.savepoint.path;

Add -i parameter to specify the initlization file

Users can use the parameter -i --init to start up the sql client with the initialization files.

> ./sql-client.sh embedded -i init1.sql init2.sql

The sql file looks as follows.

-- use specified settings
SET sql-client.execution.mode=batch;
SET parallism.default=100;
SET pipeline.auto-watermark-interval=500;

-- set up the catalog
CREATE CATALOG HiveCatalog WITH (
  'type' = 'hive',
  'name' = 'TestCatalog'
);

USE CATALOG TestCatalog;

-- register the table
CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

CREATE TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews_enriched',
  ...
);

Considering the -i parameter is used to set up the environment, it's forbidden to add any INSERT statements and Queries in the initalization files.

Use the sql client as the tool to submit job

Add -f parameter to specify the execution file

Currenlty, users can only submit job a single INSERT statement by -u command.  We proposed to add a new command line parameter -f for users to specify the execution file. Users can use the following command to start up the sql client.

> ./sql-client.sh embedded -i init1.sql init2.sql -f sqljob.sql

The sql file looks as follows.

SET pipeline.name='SqlJob';

CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

INSERT INTO pageviews_enriched
SELECT *
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;

Difference between -i and -f parameter

Users can specify the -i and -f together when starting up the sql client. In this section, we list the differences between two parameters.


-i-f
Sql file restriction
  • No restriction about DDL, SET and RESET command;
  • Forbid to add INSERT and Queries in the file;
  • Forbid to add Queries;
  • No restriction about DDL, SET and RESET command;
Number of filesMultiple filesSingle file
Execution orderExecute after set up the sessionExecute after the initlization finishes


Use the sql client as the interaction tool

Support more interactive commands.

CommandDescription
ADD JAR <path>Add the jar to the list of the resources. 
DELETE JAR <path>Delete the jar from the list of the resource with specified path.
LIST JARList the user added jars.
SHOW CREATE TABLE <table name>Print the ddl that creates the specified table. 

EXPLAIN [ExplainDetail[, ExplainDetail]*] <statement>


ExplainDetail: {

  COST,

  CHANGELOG

}

Print the plan for the statement with specified ExplainDetails.


What's more, we decide to add a new option sql-client.verbose to allow users to print the exception stack.

Support Multiple DML

Currently we have discussed the grammer about multiline statement. It's time to support it in the sql parser. In the sql-client, users should submit statement as follows.

Flink SQL> BEGIN STATEMENT SET;
Flink SQL> INSERT INTO emps1 SELECT * FROM emps(x, y);

[Info] Submit the statement into the statement set.

Flink SQL> INSERT INTO emps2 SELECT * FROM emps(x, y);

[Info] Submit the statement into the statement set.

Flink SQL> END;

[Info] Submitting SQL statement set to the cluster... 

Compatibility, Deprecation, and Migration Plan

This FLIP is an improvment for sql client.  Compatibility is not affected immediately.

The YAML initialization file and -u parameter will be deprecated. It is recommended that users use the `-i` to specify the initialization file and `-f` to specify the execution file. 

Summary

After this FLIP finishes, the sql client will have the options, commands and command line parameter as follows.

Supported Option in sql client

OptionDefaultTypeDescription
sql-client.planner'blink'StringDetermine use which planner. Avaliable values are 'blink' and 'legacy'.
sql-client.execution.mode'stream'StringDetermine execution mode. Avaliable values are 'stream' and 'batch'.
sql-client.execution.max-table-result.rows
1000000
intMaximum number of maintained rows in 'table' mode.
sql-client.verbosefalsebooleanDetermine whether to output the verbose output to the console.

Supported Command in sql client

CommandDescripiton

CREATE Statements

Create the Catalog/Database/Table/Funciton or load the module.

DROP Statements

Drop the Catalog/Database/Table/Funciton or unload the module.

ALTER Statements

Modified a registered table/view/function definition in the catalog

INSERT Statements

Add rows to a table.

DESCRIBE Statements

Descirbe the schema of the table.

EXPLAIN Statements

Explain the plan of the statement.

USE Statements

Set the current database or catalog

SHOW Statements

List all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or show current catalog and database, or list all functions including temp system functions, system functions, temp catalog functions and catalog functions in the current catalog and the current database.

SET (<key>=<value>)

Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
RESET (<key>)Reset a session configuration property. Syntax: 'RESET <key>;'. Use 'RESET;' for listing all properties.

ADD JAR <jar path>

Add the jar to the list of the resources. 

DELETE JAR  <jar path>

Delete the jar from the list of the resource with specified path.

LIST JAR

List the user added jars.

HELP

Prints the available commands.

EXIT/QUIT

Quits the SQL CLI client.

Supported command line parameter

Command line parameterDescription
-i,--init <files>Allow users to specify the initialization files.
-f <file>Allow users to specify the execution file.
-h, --helpShow the help message with descriptions of all options.
-hist,--history <History file path>The file which you want to save the command history into. If not specified, we will auto-generate one under your user's home directory.
-j - - jar <JAR file>A JAR file to be imported into the session. The file might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times.
-l,--library <JAR directory>A JAR file directory with which every new session is initialized. The files might contain user-defined classes needed for the execution of statements such as functions, table sources, or sinks. Can be used multiple times.
-pyarch,--pyArchives <arg>Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g.: --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutablepy37.zip/py37/bin/python). The data files could be accessed in PythonUDF, e.g.: f = open('data/data.txt','r').
-pyexec,--pyExecutable <arg> Specify the path of the python interpreter used to execute the python UDF worker (e.g.: --pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma(',') could be used as the separator to specify multiple files (e.g.: –-pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).
-pyreq,--pyRequirements <arg>Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists (e.g.: –- pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).
-s,--session <session identifier> The identifier for a session. 'default' is the default identifier.





  • No labels