Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


For example, if your JSON payload looks something on the order of:

{
"c1": 10000,
"c2": "bar",
"create_ts": 1501834166000,
"update_ts": 1501834166000
}

 

This will not be compatible for Sink Connectors that require the schema for data ingest when mapping from Kafka Connect datatypes to, for example, JDBC datatypes. Rather, that data is expected to be structured like so:

{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": true,
"field": "c1"
}, {
"type": "string",
"optional": true,
"field": "c2"
}, {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "create_ts"
}, {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "update_ts"
}],
"optional": false,
"name": "foobar"
},
"payload": {
"c1": 10000,
"c2": "bar",
"create_ts": 1501834166000,
"update_ts": 1501834166000
}
}

 

The "schema" is a necessary component in order to dictate to the JsonConverter how to map the payload's JSON datatypes to Kafka Connect datatypes on the consumer side, and certain Sink Connectors absolutely require a schema in order for them to function (eg: JdbcSinkConnector). Instead of requiring explicitly defined schemas within all json records from every producer that is part of a data pipeline where a Sink Connector will consume its records, the JsonConverter should have the ability to make inferences based on the contents of the JSON data.

...

The default value for this configuration is false, and the original behavior of passing along SchemaAndValue objects with a null schema to downstream sink connector logic is observed. However, if this configuration is set to "true", the JsonNodeType of each JSON node will be used to draw an inference as to the Kafka Connect datatype that most closely resembles that piece of data. For instance, if our data was:

 

...

{"test1":"hi","test2":"there","test3":12,"test4":12.5,"test5":null,"test6":true,"test7":false,"test8":["element1","element2"]}

The inferred schema should yield a STRUCT with the following fields:

nameschema
test1STRING
test2STRING
test3INT64
test4FLOAT64
test5STRING
test6BOOLEAN
test7BOOLEAN
test8ARRAY

This is achievable by the introduction of a recursive method, inferSchema(JsonNode jsonValue)

Code Block
titleJsonConverter.java
 private Schema inferSchema(JsonNode jsonValue) {
        switch (jsonValue.getNodeType()) {
            case NULL:
                return Schema.OPTIONAL_STRING_SCHEMA;
            case BOOLEAN:
                return Schema.BOOLEAN_SCHEMA;
            case NUMBER:
                if (jsonValue.isIntegralNumber()) {
                    return Schema.INT64_SCHEMA;
                }
                else {
                    return Schema.FLOAT64_SCHEMA;
                }
            case ARRAY:
                SchemaBuilder arrayBuilder = SchemaBuilder.array(inferSchema(jsonValue.elements().next()));
                return arrayBuilder.build();
            case OBJECT:
                SchemaBuilder structBuilder = SchemaBuilder.struct();
                Iterator<Map.Entry<String, JsonNode>> it = jsonValue.fields();
                while (it.hasNext()) {
                    Map.Entry<String, JsonNode> entry = it.next();
                    structBuilder.field(entry.getKey(), inferSchema(entry.getValue()));
                }
                return structBuilder.build();
            case STRING:
                return Schema.STRING_SCHEMA;
            default:
                return null;
        }
    }

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

...