...
Current state: Under Discussion
Discussion thread: here
JIRA: here
Motivation
Though there does exist a converter in the connect-json library called "JsonConverter", there are limitations as to the domain of JSON payloads this converter is compatible with on the Sink Connector side when serializing them into Kafka Connect datatypes; When reading byte arrays from Kafka, the JsonConverter expects its inputs to be a JSON envelope that contains the fields "schema" and "payload", otherwise it'll throw a DataException reporting:
...
{"test1":"hi","test2":"there","test3":12,"test4":12.5,"test5":null,"test6":true,"test7":false,"test8":["element1","element2"],"test9":[],"test10":[1]}
The inferred schema should yield a STRUCT with the following fields:
name | schema |
---|---|
test1 | STRING |
test2 | STRING |
test3 | INT64 |
test4 | FLOAT64 |
test5 | STRING |
test6 | BOOLEAN |
test7 | BOOLEAN |
test8 | ARRAY{STRING} |
test9 | ARRAY{STRING} |
test10 | ARRAY{INT64} |
This is achievable by the introduction of introducing a recursive method, inferSchema(JsonNode jsonValue), which is capable of both 1.) inferring the schema for simple JSON documents, and 2.) making a recursive call to break down JSON documents with complex data types into their constituent parts and Schemas:
Code Block | ||
---|---|---|
| ||
privateprivate Schema inferSchema(JsonNode jsonValue) { switch (jsonValue.getNodeType()) { case NULL: return Schema.OPTIONAL_STRING_SCHEMA; case BOOLEAN: return Schema.BOOLEAN_SCHEMA; case NUMBER: if (jsonValue.isIntegralNumber()) { return Schema.INT64_SCHEMA; } else { return Schema.FLOAT64_SCHEMA; } case ARRAY: SchemaBuilder arrayBuilder = SchemaBuilder.array(jsonValue.elements().hasNext() ? inferSchema(jsonValue.elements().next()) : Schema.OPTIONAL_STRING_SCHEMA); return arrayBuilder.build(); case OBJECT: SchemaBuilder structBuilder = SchemaBuilder.struct(); Iterator<Map.Entry<String, JsonNode>> it = jsonValue.fields(); while (it.hasNext()) { Map.Entry<String, JsonNode> entry = it.next(); structBuilder.field(entry.getKey(), inferSchema(entry.getValue())); } return structBuilder.build(); case STRING: return Schema.STRING_SCHEMA; case BINARY: default case MISSING: case POJO: return null;default: } return null; } } |
In the current model, the assumption made in the event that a JSON value is not capable of being intelligibly inferenced (null, []) is that the actual schema is a STRING.
Compatibility, Deprecation, and Migration Plan
...