Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8713

Motivation

Kafka Connect use org.apache.kafka.connect.data.Schema to describe record structure. There are two methods boolean isOptional() and Object defaultValue() may cause confused.

Code Block
languagejava
private JsonNode convertToJson(Schema schema, Object value) {
    if (value == null) {
        if (schema == null)
            return null;
        if (schema.defaultValue() != null)
            return convertToJson(schema, schema.defaultValue());
        if (schema.isOptional())
            return JsonNodeFactory.instance.nullNode();
        throw new DataException("Conversion error: null value for field that is required and has no default value");
    }
    ...
}

As currently implementation, when isOptional() return true and defaultValue() return not null, in other word, an optional field which has default value, if we use org.apache.kafka.connect.json.JsonConverter to serialize the field, will eventually get defaultValue(). It make sense in most scenario.

JsonConverter is used by Connect to convert raw JSON payloads, in bytes, to and from Connect schemas an values objects. When converting Connect objects to bytes, if the value is null and the field has a default value, JsonConverter always emits the default value. In some cases, for example when the value is explicitly set to null, it would be preferable to keep the null value instead.

In particular, if we use Debezium to stream changes out of a table, t1, in mysql:But there are some exceptions. For example, if we define a table named t1 in MySQL,

Code Block
languagesql
create table t1 {
   name varchar(40) not null,
   create_time datetime default '1999-01-01 11:11:11' null,
   update_time datetime default '1999-01-01 11:11:11' null
}

...

Code Block
languagejs
{
  "name": "kafka",
  "create_time": "1999-01-01 11:11:11",
  "update_time": null
}

but if we use debezium (a kafka connect source plugin) pull binlog, and send the record to kafka, the record will change to:the JsonConverter will always emit

Code Block
languagejs
{
  "name": "kafka",
  "create_time": "1999-01-01 11:11:11",
  "update_time": "1999-01-01 11:11:11"
}

Generally, when an optional field which has default value is null, we can treat it as null or default value, it depends on the contextIt would be beneficial to be able to specify whether fields with a default value and set to null should be converted to null or the default value.

Public Interfaces

Add

...

Proposed Changes

Config property change

Add a config property `serialize.accept.optional.null` in org.apache.kafka.connect.storage.ConverterConfigdefault false. This property would only affect JsonConverter in all build-in converters serialization. Any third party converters can extend ConverterConfig just like JsonConverterConfig to read this property and implement it.

JsonConverter change

Serialization(convertToJson), for an optional null field which has default value,

...

if set serialize.accept.optional.null=true, take null; 

a configuration to JsonConverter:

Name: serialize.null.to.default
Description: Whether to serialize fields that have a default value and that are null to the default value or to null. When set to true, the default value is used, otherwise null is used.
Type: Boolean
Default: true

Proposed Changes

Update JsonConverter and JsonConverterConfig to handle the new configuration

...

.

Compatibility, Deprecation, and Migration Plan

The change will not break the compatibilitynew configuration keeps the current behavior by default so it will not affect existing users. Users can optionally opt-in the new behavior by setting the configuration to false.

Rejected Alternatives

  1. Always take null on an optional null field which has default value. It will break the compatibility and only cover partial cases.
  2. Apply for both serialization and deserialization. I have not see scenario in deserialization.
  3. Add the configuration to ConverterConfig