Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors

Forward Xu, Jark Wu

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-9477

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink SQL does not support the SQL 2016 JSON function. and many of the message middleware data are in JSON format. For example: Kafka, Flink SQL can support JSON operation to more easily parse the data in Kafka. In addition, the data in some relational databases is also stored and ingested in JSON format.

SQL/JSON path language

The SQL/JSON path language is a query language used by certain SQL operators (JSON_VALUE, JSON_QUERY, JSON_TABLE and JSON_EXISTS, collectively known as the SQL/JSON query operators) to query JSON text. The SQL/JSON path language is not, strictly speaking, SQL, though it is embedded in these operators within SQL. Lexically and syntactically, the SQL/JSON path language adopts many features of [ECMAscript], though it is neither a subset nor a superset of [ECMAscript]. The semantics of the SQL/JSON path language are primarily SQL semantics.

...

  1. Non-structural errors outside of a <JSON path predicate> are always unhandled errors, resulting in an exception condition returned from the path engine to the SQL/JSON query operator.
  2. The SQL/JSON query operators provide an ON ERROR clause to specify the behavior in case of an input conversion error, an unhandled structural error, an unhandled non-structural error, or an output conversion error.

Modes

The path engine has two modes, strict and lax. The motivation for these modes is that strict mode will be used to examine data from a strict schema perspective, for example, to look for data that diverges from an expected schema. Therefore, strict mode raises an error if the data does not strictly adhere to the requirements of a path expression. Lax mode is intended to be more forgiving, so lax mode converts errors to empty SQL/JSON sequences.

...

  1. Row 1 has phone# and phonetype as “bare” members of the outermost object. These two members will be picked up by the underlined columns called “phone#” and “phonetype”. The NESTED COLUMNS clause has a path that will find no rows. The default plan for NESTED COLUMNS is an outer join. Thus, there will be effectively a dummy row created with null values for the boxed columns. In the SELECT list, each COALESCE operator is used to choose the non-null values from an underlined column and the corresponding boxed column.
  2. Rows 2 and 3 do not have bare phone# and phonetype; instead they have an array called phones. In these rows, the underlined columns have paths that will find empty sequences, defaulting to the null value. The NESTED COLUMNS clause is used to iterate over the phones array, producing values for the boxed columns, and again, the COALESCE operators in the SELECT list retain the non-null values.
  3. Row 4 has no phone data at all. In this case, the underlined columns have paths that will find nothing (defaulting to null values). The NESTED COLUMNS clause also has a path that finds an empty sequence. Using the default outer join logic, this means that the boxed columns will also be null. The COALESCE operators must coalesce two null values, resulting in null.
  4. Row 5 has a phones array, but it is empty. This case is processed similarly to rows 2 and 3: the underlined columns are null because their paths are empty. The NESTED COLUMNS clause is used, but the array is empty, so this is an outer join with an empty table. Thus, the boxed columns also come up null, and the COALESCE operators combine these nulls to get null. The end result is the same as row 4.

Proposed Changes

We detail the implementation to support JSON functions in Flink SQL with this section.This article is mainly based on flink-table-planner-blink.

 SQL/JSON functions

This document mainly implements FLINK SQL JSON FUNCTION based on the SQL JSON FUNCTION already implemented in Jira CALCITE-2867.

...

path is a character string containing a JSON path expression; mode flag strict or lax should be specified in the beginning of path.

Query Functions

Use in SQL:

SQL SYNTAX

DESCRIPTION

RETURN TYPE

JSON_EXISTS(jsonValue, path [ { TRUE | FALSE | UNKNOWN | ERROR } ON ERROR ] )

Whether a jsonValue satisfies a search criterion described using JSON path expression path.

BOOLEAN

JSON_VALUE(jsonValue, path [ RETURNING type ] [ { ERROR | NULL | DEFAULT expr } ON EMPTY ] [ { ERROR | NULL | DEFAULT expr } ON ERROR ] )

Extract an SQL scalar from a jsonValue using JSON path expression path.

if no "RETURNING type" clause, STRING is returned. If a "RETURNING type" clause is included, an attempt is made to cast to the specified type. For example: json_value ('{"foo": 100}', 'lax $ .foo' returning integer null on empty) will return an integer type.

JSON_QUERY(jsonValue, JSON_QUERY(jsonValue, path [ { WITHOUT [ ARRAY ] | WITH [ CONDITIONAL | UNCONDITIONAL ] [ ARRAY ] } WRAPPER ] [ { ERROR | NULL | EMPTY ARRAY | EMPTY OBJECT } ON EMPTY ] [ { ERROR | NULL | EMPTY ARRAY | EMPTY OBJECT } ON ERROR ] )

Extract a JSON object or JSON array from jsonValue using the path JSON path expression.

STRING


Note: The ON ERROR and ON EMPTY clauses define the fallback behavior of the function when an error is thrown or a null value is about to be returned.

The ARRAY WRAPPER clause defines how to represent a JSON array result in JSON_QUERY function. The following examples compare the wrapper behaviors.

Use in TableApi:

jsonExists

JAVA/PYTHON/SCALA

DESCRIPTION

RETURN TYPE

STRING.jsonExists((Expression path [,JsonExistsBehavior behavior])

Whether a jsonValue satisfies a search criterion described using JSON path expression path.Table API currently only support path string parameter, doesn't support ON ERROR clause.. The JsonExistsBehavior is built as follows: behavior = JsonExistsBehavior.TrueOnError ("abc").

BOOLEAN


jsonValue

JAVA/PYTHON/SCALA

DESCRIPTION

RETURN TYPE

STRING.jsonValue(Expression path[,JsonValueBehavior behavior])

Extract an SQL scalar from a jsonValue using JSON path expression path.Table API currently only support path string parameter, doesn't support ON ERROR clause.

The JsonValueBehavior is built as follows: 

behavior = JsonValueBehavior.defaultOnEmpty ("abc"). NullOnError(). or 

behavior = JsonValueBehavior.returnInt().defaultOnEmpty ("abc"). NullOnError().

if no "RETURNING type" clause, STRING is returned. If a "RETURNING type" clause is included, an attempt is made to cast to the specified type. For example: json_value ('{"foo": 100}', 'lax $ .foo' returning integer null on empty) will return an integer type.


jsonQuery

JAVA/PYTHON/SCALA

DESCRIPTION

RETURN TYPE

STRING.jsonQuery(Expression path [,JsonQueryBehavior behavior]STRING.jsonQuery(path)

Extract a JSON object or JSON array from jsonValue using the path JSON path expression.Table API currently only support path string parameter, doesn't support ON ERROR clause.JSON path expression.

The JsonQueryBehavior is built as follows: 

behavior = JsonQueryBehavior.withoutArray().defaultOnEmpty ("abc"). NullOnError ().

STRING



Example Data:

{"a": "[1,2]", "b": [1,2], "c": "hi"}

...

OPERATOR

$.A

$.B

$.C

JSON_EXISTS

true

true

true

JSON_VALUE

[1, 2]

error

hi

JSON QUERY WITHOUT ARRAY WRAPPER

error

[1, 2]

error

JSON QUERY WITH UNCONDITIONAL ARRAY WRAPPER

[ “[1,2]” ]

[ [1,2] ]

[ “hi” ]

JSON QUERY WITH CONDITIONAL ARRAY WRAPPER

[ “[1,2]” ]

[1,2]

[ “hi” ]

Constructor Functions

Use in SQL:

“hi” ]

Constructor Functions

Use in SQL:

SQL SYNTAX

DESCRIPTION

RETURN TYPE

JSON_OBJECT( { [ KEY ] name VALUE value [ FORMAT JSON ] | name : value [ FORMAT JSON ] } * [ { NULL | ABSENT } ON NULL ] )

Construct JSON object using a series of key (name) value (value) pairs.

STRING

JSON_OBJECTAGG

SQL SYNTAX

DESCRIPTION

JSON_OBJECT( { [ KEY ] name VALUE value [ FORMAT JSON ] | name : value [ FORMAT JSON ] } [ { NULL | ABSENT } ON NULL ] )

Aggregate function to construct a JSON object using a key (name) value (value) pair.

STRING

JSON_ARRAY( { value [ FORMAT JSON ] } * [ { NULL | ABSENT } ON NULL ] )

Construct a JSON object array using a series of key (name) value values (value) pairs. 

STRING

JSON_OBJECTAGG( { [ KEY ] name VALUE ARRAYAGG( value [ FORMAT JSON ] | name : value [ FORMAT JSON ] } [ ORDER BY orderItem [, orderItem ]* ] [ { NULL | ABSENT } ON NULL ] )

Aggregate function to construct a JSON object array using a key (name) value (value) pair..

STRING


Use in TableApi:

jsonObject


JAVA/PYTHON/SCALA

DESCRIPTION

RETURN TYPE

jsonObject(ANY1, ANY2, …[,SqlJsonConstructorNullClause.NULL_ON_JSON_ARRAY( { value [ FORMAT JSON ] } * [ { NULL | ABSENT } ON NULL])

Construct a JSON array object using a series of values key (name) value (value) pairs.Table API currently only supports JSON string operations.

This means NULL ON NULL.

STRING

jsonObject(ANY1, ANY2, …[,SqlJsonConstructorNullClause.ABSENT_ON_NULL])

This means ABSENT ON NULL.

STRING


jsonObjectAgg

JAVA/PYTHON/SCALA

DESCRIPTION

RETURN TYPE

jsonObjectAgg(ANY1,ANY2[,SqlJsonConstructorNullClause.NULL_ON_JSON_ARRAYAGG( value [ FORMAT JSON ] [ ORDER BY orderItem [, orderItem ]* ] [ { NULL | ABSENT } ON NULL])

Aggregate function to construct a JSON array object using a key (name) value (value) pair.This means NULL ON NULL.

STRING

jsonObjectAgg(ANY1,ANY2[,SqlJsonConstructorNullClause.ABSENT_ON_NULL])

This means ABSENT ON NULL.

STRING


jsonArrayUse in TableApi:

JAVA/PYTHON/SCALA

DESCRIPTION

RETURN TYPE

jsonArrayjsonObject(ANY1, ANY2, ...[,SqlJsonConstructorNullClause.NULL_ON_NULL])

Construct a JSON object array using a series of key values (name) value (value) pairs.Table API currently only supports JSON string operations.

jsonObjectAgg(ANY1,ANY2)

Aggregate function to construct a JSON object using a key (name) value (value) pair.Table API currently only supports JSON string operations.

. This means NULL ON NULL.

STRING

jsonArray(ANY1, ANY2, ...).[,SqlJsonConstructorNullClause.ABSENT_ON_NULL])

This means ABSENT ON NULL.

STRING


jsonArrayAgg

JAVA/PYTHON/SCALA

DESCRIPTION

RETURN TYPE

Construct a JSON array using a series of values (value). Table API currently only supports JSON string operations.

jsonArrayAgg(ANY[,SqlJsonConstructorNullClause.NULL_ON_NULL]

jsonArrayAgg(ANY)

Aggregate function to construct a JSON array using a value (value). Table API currently only supports JSON string operations.. This means NULL ON NULL.

STRING

jsonArrayAgg(ANY[,SqlJsonConstructorNullClause.ABSENT_ON_NULL])

This means ABSENT ON NULL.

STRING


Note: The flag FORMAT JSON indicates the value is formatted as JSON character string. When FORMAT JSON is used, the value should be de-parse from JSON character string to a SQL structured value.

...

OPERATOR

RESULT

JSON_OBJECT('id', 87, 'name', 'carrot')

{"id": 87, "name": "carrot"} 

JSON_ARRAY(1, "abc", NULL, TRUE)

[1, "abc", null, true]


Simple  Simple Data Table T1:

o_id

attribute

value

2

color

red

2

fabric

silk

3

color

green

3

shape

square

...

OPERATOR

o_id

AFTER AGG

JSON_ARRAYAGG(attribute)

2

["color", "fabric"] 

JSON_ARRAYAGG(attribute)

3

["color", "shape"]


Comparison Operators

Applications will frequently want to ensure that the data they expect to consume as JSON data is, indeed, JSON data. The IS JSON predicate determines whether the value of a specified string does or does not conform to the structural rules for JSON. The syntax of the IS JSON predicate is:

Use in SQL:

SQL SYNTAX

DESCRIPTION

RETURN TYPE

jsonValue IS JSON [ VALUE ]

Whether jsonValue is a JSON value.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

jsonValue IS NOT JSON [ VALUE ]

Whether jsonValue is not a JSON value.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

jsonValue IS JSON SCALAR

Whether jsonValue is a JSON scalar value.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

jsonValue IS NOT JSON SCALAR

Whether jsonValue is not a JSON scalar value.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

jsonValue IS JSON OBJECT

Whether jsonValue is a JSON object.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

jsonValue IS NOT JSON OBJECT

Whether jsonValue is not a JSON object.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

jsonValue IS JSON ARRAY

Whether jsonValue is a JSON array.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

jsonValue IS NOT JSON ARRAY

Whether jsonValue is not a JSON array.Table API functions currently only support JSON strings as input parameters.

BOOLEAN


Use in TableApi:

JAVA/PYTHON/SCALA

DESCRIPTION

RETURN TYPE

ANY.isJsonValue()

Whether jsonValue is a JSON value.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

ANY.isNotJsonValue()

Whether jsonValue is not a JSON value.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

ANY.isJsonScalar()

Whether jsonValue is a JSON scalar value.Table API functions currently only support JSON strings as input parameters. 

BOOLEAN

ANY.isNotJsonScalar()

Whether jsonValue is not a JSON scalar value.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

ANY.isJsonObject()

Whether jsonValue is a JSON object.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

ANY.isNotJsonObject()

Whether jsonValue is not a JSON object.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

ANY.isJsonArray()

Whether jsonValue is a JSON array.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

ANY.isNotJsonArray()

Whether jsonValue is not a JSON array.Table API functions currently only support JSON strings as input parameters.

BOOLEAN

The test example:

tester.checkBoolean("'{}' is json value", true);

tester.checkBoolean("'{]' is json value", false);

tester.checkBoolean("'{}' is json object", true);

tester.checkBoolean("'[]' is json object", false);

tester.checkBoolean("'{}' is json array", false);

tester.checkBoolean("'[]' is json array", true);

tester.checkBoolean("'100' is json scalar", true);

tester.checkBoolean("'[]' is json scalar", false);

tester.checkBoolean("'{}' is not json value", false);

tester.checkBoolean("'{]' is not json value", true);

tester.checkBoolean("'{}' is not json object", false);

tester.checkBoolean("'[]' is not json object", true);

tester.checkBoolean("'{}' is not json array", true);

tester.checkBoolean("'[]' is not json array", false);

tester.checkBoolean("'100' is not json scalar", false);

tester.checkBoolean("'[]' is not json scalar", true);

Implementation Details

Table Api interface proposal

I suggest that the first step about the table api does not implement the passing syntax. Start with a simple implementation and later implement further functionality if further functionality is required.

...

/**

  * tests for Json Table Api

  */

@RunWith(classOf[Parameterized])

class JsonITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {

  // input data

  val data = List(

    ("{\"foo\":\"bar\"}"),

    ("flink")

  )

  @Test

  def testJsonExists(): Unit = {

    val stream = env.fromCollection(data)

    val table = stream.toTable(tEnv, 'str)

    val resultTable = table

      .select('str, 'str.jsonExists("strict $.foo"))

    val sink = new TestingAppendSink

    resultTable.toAppendStream[Row].addSink(sink)

    env.execute()

    val expected = mutable.MutableList("flink,false", "{\"foo\":\"bar\"},true")

    assertEquals(expected.sorted, sink.getAppendResults.sorted)

  }

}

Migration Plan and Compatibility

It is a new feature for Flink SQL, there is no migration needed.The implementation sequence is as follows:

...

3.Implement Constructor Functions.

References

[1] A more detailed JSON standard description can be viewed: c067367_ISO_IEC_TR_19075-6_2017

...

[3] https://calcite.apache.org/docs/reference.html#json-functions

Document

https://docs.google.com/document/d/1JfaFYIFOAY8P2pFhOYNCQ9RTzwF4l85_bnTvImOLKMk/edit#heading=h.76mb88ca6yjp