You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion threadhere

JIRAhere

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Though there does exist a converter in the connect-json library called "JsonConverter", there are limitations as to the domain of JSON payloads this converter is compatible with on the Sink Connector side when serializing them into Kafka Connect datatypes; When reading byte arrays from Kafka, the JsonConverter expects its inputs to be a JSON envelope that contains the fields "schema" and "payload", otherwise it'll throw a DataException reporting:


JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
(when schemas.enable is true) or


JSON value converted to Kafka Connect must be in envelope containing schema
(when schemas.enable is false)


For example, if your JSON payload looks something on the order of:

{
"c1": 10000,
"c2": "bar",
"create_ts": 1501834166000,
"update_ts": 1501834166000
}

 

This will not be compatible for Sink Connectors that require the schema for data ingest when mapping from Kafka Connect datatypes to, for example, JDBC datatypes. Rather, that data is expected to be structured like so:

{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": true,
"field": "c1"
}, {
"type": "string",
"optional": true,
"field": "c2"
}, {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "create_ts"
}, {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "update_ts"
}],
"optional": false,
"name": "foobar"
},
"payload": {
"c1": 10000,
"c2": "bar",
"create_ts": 1501834166000,
"update_ts": 1501834166000
}
}

 

The "schema" is a necessary component in order to dictate to the JsonConverter how to map the payload's JSON datatypes to Kafka Connect datatypes on the consumer side, and certain Sink Connectors absolutely require a schema in order for them to function (eg: JdbcSinkConnector). Instead of requiring explicitly defined schemas within all json records from every producer that is part of a data pipeline where a Sink Connector will consume its records, the JsonConverter should have the ability to make inferences based on the contents of the JSON data.

Public Interfaces and Proposed Changes

This proposal introduces a new configuration that would be read by all instances of the JsonConverter. In order to activate it, individual connectors can set this as a connector property, or users can set this as a worker property:

value.converter.schemas.infer.enable: "true"

The default value for this configuration is false, and the original behavior of passing along SchemaAndValue objects with a null schema to downstream sink connector logic is observed. However, if this configuration is set to "true", the JsonNodeType of each JSON node will be used to draw an inference as to the Kafka Connect datatype that most closely resembles that piece of data. For instance, if our data was:

{"test1":"hi","test2":"there","test3":12,"test4":12.5,"test5":null,"test6":true,"test7":false,"test8":["element1","element2"]}

The inferred schema should yield a STRUCT with the following fields:

nameschema
test1STRING
test2STRING
test3INT64
test4FLOAT64
test5STRING
test6BOOLEAN
test7BOOLEAN
test8ARRAY

This is achievable by the introduction of a recursive method, inferSchema(JsonNode jsonValue)

JsonConverter.java
 private Schema inferSchema(JsonNode jsonValue) {
        switch (jsonValue.getNodeType()) {
            case NULL:
                return Schema.OPTIONAL_STRING_SCHEMA;
            case BOOLEAN:
                return Schema.BOOLEAN_SCHEMA;
            case NUMBER:
                if (jsonValue.isIntegralNumber()) {
                    return Schema.INT64_SCHEMA;
                }
                else {
                    return Schema.FLOAT64_SCHEMA;
                }
            case ARRAY:
                SchemaBuilder arrayBuilder = SchemaBuilder.array(inferSchema(jsonValue.elements().next()));
                return arrayBuilder.build();
            case OBJECT:
                SchemaBuilder structBuilder = SchemaBuilder.struct();
                Iterator<Map.Entry<String, JsonNode>> it = jsonValue.fields();
                while (it.hasNext()) {
                    Map.Entry<String, JsonNode> entry = it.next();
                    structBuilder.field(entry.getKey(), inferSchema(entry.getValue()));
                }
                return structBuilder.build();
            case STRING:
                return Schema.STRING_SCHEMA;
            default:
                return null;
        }
    }

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels