Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg105887.html

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8713

Motivation

Kafka Connect use org.apache.kafka.connect.data.Schema to describe record structure. There are two methods boolean isOptional() and Object defaultValue() may cause confused.

Code Block
languagejava
private JsonNode convertToJson(Schema schema, Object value) {
    if (value == null) {
        if (schema == null)
            return null;
        if (schema.defaultValue() != null)
            return convertToJson(schema, schema.defaultValue());
        if (schema.isOptional())
            return JsonNodeFactory.instance.nullNode();
        throw new DataException("Conversion error: null value for field that is required and has no default value");
    }
    ...
}

As currently implementation, when isOptional() return true and defaultValue() return not null, in other word, an optional field which has default value, if we use org.apache.kafka.connect.json.JsonConverter to serialize the field, will eventually get defaultValue(). It make sense in most scenario.

But there are some exceptions. For example, if we define a table named t1 in MySQL,

Code Block
languagesql
create table t1 {
   name varchar(40) not null,
   create_time datetime default '1999-01-01 11:11:11' null,
   update_time datetime default '1999-01-01 11:11:11' null
}

and then insert one record into table t1,

Code Block
languagesql
INSERT INTO `t1` (`name`, `update_time`) VALUES ('kafka', null);

the record will store in MySQL as:

Code Block
languagejs
{
  "name": "kafka",
  "create_time": "1999-01-01 11:11:11",
  "update_time": null
}

but if we use debezium (a kafka connect source plugin) pull binlog, and send the record to kafka, the record will change to:

Code Block
languagejs
{
  "name": "kafka",
  "create_time": "1999-01-01 11:11:11",
  "update_time": "1999-01-01 11:11:11"
}

Generally, when an optional field which has default value is null, we can treat it as null or default value, it depends on the context.

Public Interfaces

Add a config property `serialize.accept.optional.null`, default `false`, for compatibility.

Proposed Changes

Config property change

Add a config property `serialize.accept.optional.null` in org.apache.kafka.connect.storage.ConverterConfigdefault false. This property would only affect JsonConverter in all build-in converters serialization. Any third party converters can extend ConverterConfig just like JsonConverterConfig to read this property and implement it.

JsonConverter change

Serialization(convertToJson), for an optional null field which has default value,
  • if set serialize.accept.optional.null=true, take null; 
  • if set serialize.accept.optional.null=false, take default value.

Compatibility, Deprecation, and Migration Plan

The change will not break the compatibility.

Rejected Alternatives

  1. Always take null on an optional null field which has default value. It will break the compatibility and only cover partial cases.
  2. Apply for both serialization and deserialization. I have not see scenario in deserialization.