Authors
Forward Xu, Jark Wu
Motivation
Currently, Flink SQL does not support the SQL 2016 JSON function. and many of the message middleware data are in JSON format. For example: Kafka, Flink SQL can support JSON operation to more easily parse the data in Kafka. In addition, the data in some relational databases is also stored and ingested in JSON format.
SQL/JSON path language
The SQL/JSON path language is a query language used by certain SQL operators (JSON_VALUE, JSON_QUERY, JSON_TABLE and JSON_EXISTS, collectively known as the SQL/JSON query operators) to query JSON text. The SQL/JSON path language is not, strictly speaking, SQL, though it is embedded in these operators within SQL. Lexically and syntactically, the SQL/JSON path language adopts many features of [ECMAscript], though it is neither a subset nor a superset of [ECMAscript]. The semantics of the SQL/JSON path language are primarily SQL semantics.
...
- Non-structural errors outside of a <JSON path predicate> are always unhandled errors, resulting in an exception condition returned from the path engine to the SQL/JSON query operator.
- The SQL/JSON query operators provide an ON ERROR clause to specify the behavior in case of an input conversion error, an unhandled structural error, an unhandled non-structural error, or an output conversion error.
Modes
The path engine has two modes, strict and lax. The motivation for these modes is that strict mode will be used to examine data from a strict schema perspective, for example, to look for data that diverges from an expected schema. Therefore, strict mode raises an error if the data does not strictly adhere to the requirements of a path expression. Lax mode is intended to be more forgiving, so lax mode converts errors to empty SQL/JSON sequences.
...
- Row 5 has a phones array, but it is empty. This case is processed similarly to rows 2 and 3: the underlined columns are null because their paths are empty. The NESTED COLUMNS clause is used, but the array is empty, so this is an outer join with an empty table. Thus, the boxed columns also come up null, and the COALESCE operators combine these nulls to get null. The end result is the same as row 4.
Proposed Changes
We detail the implementation to support JSON functions in Flink SQL with this section.This article is mainly based on flink-table-planner-blink.
SQL/JSON functions
This document mainly implements FLINK SQL JSON FUNCTION based on the SQL JSON FUNCTION already implemented in Jira CALCITE-2867.
...
path is a character string containing a JSON path expression; mode flag strict or lax should be specified in the beginning of path.
Query Functions
Use in SQL:
SQL SYNTAX | DESCRIPTION |
JSON_EXISTS(jsonValue, path [ { TRUE | FALSE | UNKNOWN | ERROR } ON ERROR ] ) | Whether a jsonValue satisfies a search criterion described using JSON path expression path. |
JSON_VALUE(jsonValue, path [ RETURNING type ] [ { ERROR | NULL | DEFAULT expr } ON EMPTY ] [ { ERROR | NULL | DEFAULT expr } ON ERROR ] ) | Extract an SQL scalar from a jsonValue using JSON path expression path. |
JSON_QUERY(jsonValue, path [ { WITHOUT [ ARRAY ] | WITH [ CONDITIONAL | UNCONDITIONAL ] [ ARRAY ] } WRAPPER ] [ { ERROR | NULL | EMPTY ARRAY | EMPTY OBJECT } ON EMPTY ] [ { ERROR | NULL | EMPTY ARRAY | EMPTY OBJECT } ON ERROR ] ) | Extract a JSON object or JSON array from jsonValue using the path JSON path expression. |
...
OPERATOR | $.A | $.B | $.C |
JSON_EXISTS | true | true | true |
JSON_VALUE | [1, 2] | error | hi |
JSON QUERY WITHOUT ARRAY WRAPPER | error | [1, 2] | error |
JSON QUERY WITH UNCONDITIONAL ARRAY WRAPPER | [ “[1,2]” ] | [ [1,2] ] | [ “hi” ] |
JSON QUERY WITH CONDITIONAL ARRAY WRAPPER | [ “[1,2]” ] | [1,2] | [ “hi” ] |
Constructor Functions
Use in SQL:
SQL SYNTAX | DESCRIPTION |
JSON_OBJECT( { [ KEY ] name VALUE value [ FORMAT JSON ] | name : value [ FORMAT JSON ] } * [ { NULL | ABSENT } ON NULL ] ) | Construct JSON object using a series of key (name) value (value) pairs. |
JSON_OBJECTAGG( { [ KEY ] name VALUE value [ FORMAT JSON ] | name : value [ FORMAT JSON ] } [ { NULL | ABSENT } ON NULL ] ) | Aggregate function to construct a JSON object using a key (name) value (value) pair. |
JSON_ARRAY( { value [ FORMAT JSON ] } * [ { NULL | ABSENT } ON NULL ] ) | Construct a JSON array using a series of values (value). |
JSON_ARRAYAGG( value [ FORMAT JSON ] [ ORDER BY orderItem [, orderItem ]* ] [ { NULL | ABSENT } ON NULL ] ) | Aggregate function to construct a JSON array using a value (value). |
...
OPERATOR | o_id | AFTER AGG |
JSON_ARRAYAGG(attribute) | 2 | ["color", "fabric"] |
JSON_ARRAYAGG(attribute) | 3 | ["color", "shape"] |
Comparison Operators
Applications will frequently want to ensure that the data they expect to consume as JSON data is, indeed, JSON data. The IS JSON predicate determines whether the value of a specified string does or does not conform to the structural rules for JSON. The syntax of the IS JSON predicate is:
...
tester.checkBoolean("'{}' is json value", true); tester.checkBoolean("'{]' is json value", false); tester.checkBoolean("'{}' is json object", true); tester.checkBoolean("'[]' is json object", false); tester.checkBoolean("'{}' is json array", false); tester.checkBoolean("'[]' is json array", true); tester.checkBoolean("'100' is json scalar", true); tester.checkBoolean("'[]' is json scalar", false); tester.checkBoolean("'{}' is not json value", false); tester.checkBoolean("'{]' is not json value", true); tester.checkBoolean("'{}' is not json object", false); tester.checkBoolean("'[]' is not json object", true); tester.checkBoolean("'{}' is not json array", true); tester.checkBoolean("'[]' is not json array", false); tester.checkBoolean("'100' is not json scalar", false); tester.checkBoolean("'[]' is not json scalar", true); |
Implementation Details
Table Api interface proposal
I suggest that the first step about the table api does not implement the passing syntax. Start with a simple implementation and later implement further functionality if further functionality is required.
...
/** * tests for Json Table Api */ @RunWith(classOf[Parameterized]) class JsonITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { // input data val data = List( ("{\"foo\":\"bar\"}"), ("flink") ) @Test def testJsonExists(): Unit = { val stream = env.fromCollection(data) val table = stream.toTable(tEnv, 'str) val resultTable = table .select('str, 'str.jsonExists("strict $.foo")) val sink = new TestingAppendSink resultTable.toAppendStream[Row].addSink(sink) env.execute() val expected = mutable.MutableList("flink,false", "{\"foo\":\"bar\"},true") assertEquals(expected.sorted, sink.getAppendResults.sorted) } } |
Migration Plan and Compatibility
It is a new feature for Flink SQL, there is no migration needed.The implementation sequence is as follows:
...
3.Implement Constructor Functions.
References
[1] A more detailed JSON standard description can be viewed: c067367_ISO_IEC_TR_19075-6_2017
...
[3] https://calcite.apache.org/docs/reference.html#json-functions