...
Three aspects of path evaluation governed by modes
lax | strict | |
Automatic unnesting of arrays | Certain path steps, such as the member accessor $.key, automatically iterate over SQL/JSON sequences. To make these iterative path steps friendlier for arrays, arrays are automatically unnested prior to performing the iterative path step. This means that the user does not need to use an explicit [*] to unnest an array prior to performing an iterative path step. This facilitates the use case where a field may be either an array or a scalar. | Arrays are not automatically unnested (the user can still write [*] to unnest an array explicitly). |
Automatic wrapping within an array | Subscript path steps, such as $[0] or $[*], may be applied to a non-array. To do this, the non-array is implicitly wrapped in an array prior to applying the subscript operation. This also facilitates the use case where a field may be either an array or a scalar. | There is no automatic wrapping prior to subscript path steps. |
Error han- dling | Many errors related to whether data is or is not an array or scalar are handled by the two preceding features. The remain- ing errors are classified as either struc- tural or non-structural. An example of a structural error is $.name if $ has no member whose key is name. Structural errors are converted to empty SQL/JSON sequences. An example of a non-struc- tural error is divide by zero; such errors are not elided. | Errors are strictly defined in all cases |
Note that the path language mode is orthogonal to the ON ERROR clause. There are numerous use cases for having any combination of ON ERROR clauses combined with either strict or lax modes.
...
Consider the following data, stored in a table called Data:
pk | col |
1 | { name: "Fred", phonetype: "work","phone#": "650-506-2051"} |
2 | { name: "Molly", phones: [ { phonetype: "work","phone#": "650-506-7000" }, { phonetype: "cell","phone#": "650-555-5555" } ] |
3 | { name: "Afu", phones: [ { phonetype: "cell","phone#": "88-888-8888" } ] } |
4 | { name: "Justin"} |
5 | { name: "U La La",phones: []} |
This data has been created with a sloppy schema. If a person has just one phone (row 1), then the phonetype and phone# are members of the JSON object. If a person has more than one phone (row 2), then there is a member called phones whose value is an array holding the phone information. But sometimes a person with just one phone still has a phones array (row 3). Also, some people have no phones, which can be indicated by an absence of the phonetype and phone# members (row 4), or by the presence of a phones array whose value is empty (row 5).
...
The solution to this use case is the following query:
COALESCE (JT."phone#", JT."phones.phone#") AS "phone#", COALESCE (JT."phonetype", JT."phones.phonetype#") AS "phonetype" FROM Data AS D, JSON_TABLE (D.col, 'lax $' COLUMNS ( name VARCHAR(30) PATH 'lax $.name', "phone#" VARCHAR(30) PATH 'lax $.phone#', "phonetype" VARCHAR(30) PATH 'lax $.phonetype', NESTED COLUMNS PATH 'lax $.phones[*]' ( ) ) ) AS JT |
Above, two output columns of the JSON_TABLE have been underlined, and two others are boxed. To understand this query, note the following:
...
Query Functions
Use in SQL:
SQL SYNTAX | DESCRIPTION |
JSON_EXISTS(jsonValue, path [ { TRUE | FALSE | UNKNOWN | ERROR } ON ERROR ] ) | Whether a jsonValue satisfies a search criterion described using JSON path expression path. |
JSON_VALUE(jsonValue, path [ RETURNING type ] [ { ERROR | NULL | DEFAULT expr } ON EMPTY ] [ { ERROR | NULL | DEFAULT expr } ON ERROR ] ) | Extract an SQL scalar from a jsonValue using JSON path expression path. |
JSON_QUERY(jsonValue, path [ { WITHOUT [ ARRAY ] | WITH [ CONDITIONAL | UNCONDITIONAL ] [ ARRAY ] } WRAPPER ] [ { ERROR | NULL | EMPTY ARRAY | EMPTY OBJECT } ON EMPTY ] [ { ERROR | NULL | EMPTY ARRAY | EMPTY OBJECT } ON ERROR ] ) | Extract a JSON object or JSON array from jsonValue using the path JSON path expression. |
Note: The ON ERROR and ON EMPTY clauses define the fallback behavior of the function when an error is thrown or a null value is about to be returned.
The ARRAY WRAPPER clause defines how to represent a JSON array result in JSON_QUERY function. The following examples compare the wrapper behaviors.
Use in TableApi:
JAVA/PYTHON/SCALA | DESCRIPTION |
STRING.jsonExists(path) | Whether a jsonValue satisfies a search criterion described using JSON path expression path.Table API currently only support path string parameter, doesn't support ON ERROR clause. |
STRING.jsonValue(path) | Extract an SQL scalar from a jsonValue using JSON path expression path.Table API currently only support path string parameter, doesn't support ON ERROR clause. |
STRING.jsonQuery(path) | Extract a JSON object or JSON array from jsonValue using the path JSON path expression.Table API currently only support path string parameter, doesn't support ON ERROR clause. |
Example Data:
{"a": "[1,2]", "b": [1,2], "c": "hi"} |
Comparison:
OPERATOR | $.A | $.B | $.C |
JSON_EXISTS | true | true | true |
JSON_VALUE | [1, 2] | error | hi |
JSON QUERY WITHOUT ARRAY WRAPPER | error | [1, 2] | error |
JSON QUERY WITH UNCONDITIONAL ARRAY WRAPPER | [ “[1,2]” ] | [ [1,2] ] | [ “hi” ] |
JSON QUERY WITH CONDITIONAL ARRAY WRAPPER | [ “[1,2]” ] | [1,2] | [ “hi” ] |
Constructor Functions
Use in SQL:
SQL SYNTAX | DESCRIPTION |
JSON_OBJECT( { [ KEY ] name VALUE value [ FORMAT JSON ] | name : value [ FORMAT JSON ] } * [ { NULL | ABSENT } ON NULL ] ) | Construct JSON object using a series of key (name) value (value) pairs. |
JSON_OBJECTAGG( { [ KEY ] name VALUE value [ FORMAT JSON ] | name : value [ FORMAT JSON ] } [ { NULL | ABSENT } ON NULL ] ) | Aggregate function to construct a JSON object using a key (name) value (value) pair. |
JSON_ARRAY( { value [ FORMAT JSON ] } * [ { NULL | ABSENT } ON NULL ] ) | Construct a JSON array using a series of values (value). |
JSON_ARRAYAGG( value [ FORMAT JSON ] [ ORDER BY orderItem [, orderItem ]* ] [ { NULL | ABSENT } ON NULL ] ) | Aggregate function to construct a JSON array using a value (value). |
Use in TableApi:
JAVA/PYTHON/SCALA | DESCRIPTION |
jsonObject(ANY1, ANY2, ...) | Construct JSON object using a series of key (name) value (value) pairs.Table API currently only supports JSON string operations. |
jsonObjectAgg(ANY1,ANY2) | Aggregate function to construct a JSON object using a key (name) value (value) pair.Table API currently only supports JSON string operations. |
jsonArray(ANY1, ANY2, ...) | Construct a JSON array using a series of values (value). Table API currently only supports JSON string operations. |
jsonArrayAgg(ANY) | Aggregate function to construct a JSON array using a value (value). Table API currently only supports JSON string operations. |
Note: The flag FORMAT JSON indicates the value is formatted as JSON character string. When FORMAT JSON is used, the value should be de-parse from JSON character string to a SQL structured value.
...
If ORDER BY clause is provided, JSON_ARRAYAGG sorts the input rows into the specified order before performing aggregation.
Comparison:
OPERATOR | RESULT |
JSON_OBJECT('id', 87, 'name', 'carrot') | {"id": 87, "name": "carrot"} |
JSON_ARRAY(1, "abc", NULL, TRUE) | [1, "abc", null, true] |
Simple Data Table T1:
o_id | attribute | value |
2 | color | red |
2 | fabric | silk |
3 | color | green |
3 | shape | square |
SELECT o_id, JSON_OBJECTAGG(attribute, value) FROM t1 GROUP BY o_id;
OPERATOR | o_id | AFTER AGG |
JSON_OBJECTAGG(attribute, value) | 2 | {"color": "red", "fabric": "silk"} |
JSON_OBJECTAGG(attribute, value) | 3 | {"color": "green", "shape": "square"} |
SELECT o_id, JSON_ARRAYAGG(attribute) AS attributes FROM t1 GROUP BY o_id;
OPERATOR | o_id | AFTER AGG |
JSON_ARRAYAGG(attribute) | 2 | ["color", "fabric"] |
JSON_ARRAYAGG(attribute) | 3 | ["color", "shape"] |
Comparison Operators
Applications will frequently want to ensure that the data they expect to consume as JSON data is, indeed, JSON data. The IS JSON predicate determines whether the value of a specified string does or does not conform to the structural rules for JSON. The syntax of the IS JSON predicate is:
Use in SQL:
SQL SYNTAX | DESCRIPTION |
jsonValue IS JSON [ VALUE ] | Whether jsonValue is a JSON value.Table API functions currently only support JSON strings as input parameters. |
jsonValue IS NOT JSON [ VALUE ] | Whether jsonValue is not a JSON value.Table API functions currently only support JSON strings as input parameters. |
jsonValue IS JSON SCALAR | Whether jsonValue is a JSON scalar value.Table API functions currently only support JSON strings as input parameters. |
jsonValue IS NOT JSON SCALAR | Whether jsonValue is not a JSON scalar value.Table API functions currently only support JSON strings as input parameters. |
jsonValue IS JSON OBJECT | Whether jsonValue is a JSON object.Table API functions currently only support JSON strings as input parameters. |
jsonValue IS NOT JSON OBJECT | Whether jsonValue is not a JSON object.Table API functions currently only support JSON strings as input parameters. |
jsonValue IS JSON ARRAY | Whether jsonValue is a JSON array.Table API functions currently only support JSON strings as input parameters. |
jsonValue IS NOT JSON ARRAY | Whether jsonValue is not a JSON array.Table API functions currently only support JSON strings as input parameters. |
Use in TableApi:
JAVA/PYTHON/SCALA | DESCRIPTION |
ANY.isJsonValue() | Whether jsonValue is a JSON value.Table API functions currently only support JSON strings as input parameters. |
ANY.isNotJsonValue() | Whether jsonValue is not a JSON value.Table API functions currently only support JSON strings as input parameters. |
ANY.isJsonScalar() | Whether jsonValue is a JSON scalar value.Table API functions currently only support JSON strings as input parameters. |
ANY.isNotJsonScalar() | Whether jsonValue is not a JSON scalar value.Table API functions currently only support JSON strings as input parameters. |
ANY.isJsonObject() | Whether jsonValue is a JSON object.Table API functions currently only support JSON strings as input parameters. |
ANY.isNotJsonObject() | Whether jsonValue is not a JSON object.Table API functions currently only support JSON strings as input parameters. |
ANY.isJsonArray() | Whether jsonValue is a JSON array.Table API functions currently only support JSON strings as input parameters. |
ANY.isNotJsonArray() | Whether jsonValue is not a JSON array.Table API functions currently only support JSON strings as input parameters. |
The test example:
tester.checkBoolean("'{}' is json value", true); tester.checkBoolean("'{]' is json value", false); tester.checkBoolean("'{}' is json object", true); tester.checkBoolean("'[]' is json object", false); tester.checkBoolean("'{}' is json array", false); tester.checkBoolean("'[]' is json array", true); tester.checkBoolean("'100' is json scalar", true); tester.checkBoolean("'[]' is json scalar", false); tester.checkBoolean("'{}' is not json value", false); tester.checkBoolean("'{]' is not json value", true); tester.checkBoolean("'{}' is not json object", false); tester.checkBoolean("'[]' is not json object", true); tester.checkBoolean("'{}' is not json array", true); tester.checkBoolean("'[]' is not json array", false); tester.checkBoolean("'100' is not json scalar", false); tester.checkBoolean("'[]' is not json scalar", true); |
Implementation Details
Table Api interface proposal
...
BuiltInFunctionDefinitions.java
// json functions public static final BuiltInFunctionDefinition JSON_EXISTS = new BuiltInFunctionDefinition.Builder() .name("json_exists") .kind(SCALAR) .outputTypeStrategy(TypeStrategies.MISSING) .build(); |
expressionDsl.scala
/** * Returns a boolean of json_exists. * E.g. json_exists("{\"foo\":\"bar\"}".345,'strict $.foo') to true. */ def jsonExists(path: Expression): Expression = unresolvedCall(JSON_EXISTS, expr, path) |
DirectConvertRule.java
// json functions DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.JSON_EXISTS, FlinkSqlOperatorTable.JSON_EXISTS); |
FlinkSqlOperatorTable.java
// JSON FUNCTIONS public static final SqlFunction JSON_EXISTS = SqlStdOperatorTable.JSON_EXISTS; |
BuiltInMethods.scala
val JSON_EXISTS = Types.lookupMethod(classOf[JsonFunctions], "jsonExists", classOf[String], classOf[String]) |
FunctionGenerator.scala
addSqlFunctionMethod( JSON_EXISTS, Seq(VARCHAR, VARCHAR), BuiltInMethods.JSON_EXISTS) |
StringCallGen.scala
case JSON_EXISTS if operands.size == 2 && isCharacterString(operands.head.resultType) && isCharacterString(operands(1).resultType) => methodGen(BuiltInMethods.JSON_EXISTS) |
PlannerExpressionConverter.scala
case JSON_EXISTS => assert(args.size == 2) JsonExists(args.head, args.last) |
jsonExpressions.scala
package org.apache.flink.table.planner.expressions import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationResult} case class JsonExists(child: PlannerExpression, path: PlannerExpression) extends PlannerExpression with InputTypeSpec { override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO override private[flink] def children: Seq[PlannerExpression] = Seq(child, path) override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO, STRING_TYPE_INFO) override def toString: String = s"json_exists(${children.mkString(",")})" override private[flink] def validateInput(): ValidationResult = { if (child != null && path != null) { if (!TypeInfoCheckUtils.isString(child.resultType) || !TypeInfoCheckUtils.isString(path.resultType)) { ValidationFailure(s"json_exists num requires int, get " + s"$child : ${child.resultType}, $path : ${path.resultType}") } } TypeInfoCheckUtils.assertNumericExpr(BOOLEAN_TYPE_INFO, s"json_exists base :$child") } } |
ScalarTypesTestBase.scala
testData.setField(55, "{\"foo\":\"bar\"}") /* 55 */ Types.STRING) |
ScalarFunctionsTest.scala
//------------------------------------------------------------------- // JSON functions //------------------------------------------------------------------- @Test def testJsonExists(): Unit = { testAllApis( 'f55.jsonExists("strict $.foo"), "f55.json_exists('strict $.foo')", "json_exists(f55, 'strict $.foo')", "true") } |
JsonITCase.scala
/** * tests for Json Table Api */ @RunWith(classOf[Parameterized]) class JsonITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { // input data val data = List( ("{\"foo\":\"bar\"}"), ("flink") ) @Test def testJsonExists(): Unit = { val stream = env.fromCollection(data) val table = stream.toTable(tEnv, 'str) val resultTable = table .select('str, 'str.jsonExists("strict $.foo")) val sink = new TestingAppendSink resultTable.toAppendStream[Row].addSink(sink) env.execute() val expected = mutable.MutableList("flink,false", "{\"foo\":\"bar\"},true") assertEquals(expected.sorted, sink.getAppendResults.sorted) } } |
Migration Plan and Compatibility
...