You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current stateUnder Discussion

Discussion threadhttps://lists.apache.org/thread.html/r69b11ca1d298a5c849f8af1af0b2708abfc5b1c3e44a402917f6646c%40%3Cdev.kafka.apache.org%3E

JIRA Unable to render Jira issues macro, execution error.

Motivation

Kafka Connect use org.apache.kafka.connect.data.Schema to describe record structure. There are two methods boolean isOptional() and Object defaultValue() may cause confused.

private JsonNode convertToJson(Schema schema, Object value) {
    if (value == null) {
        if (schema == null)
            return null;
        if (schema.defaultValue() != null)
            return convertToJson(schema, schema.defaultValue());
        if (schema.isOptional())
            return JsonNodeFactory.instance.nullNode();
        throw new DataException("Conversion error: null value for field that is required and has no default value");
    }
    ...
}

As currently implementation, when isOptional() return true and defaultValue() return not null, in other word, an optional field which has default value, if we use org.apache.kafka.connect.json.JsonConverter to serialize the field, will eventually get defaultValue(). It make sense in most scenario.

But there are some exceptions. For example, if we define a table named t1 in MySQL,

create table t1 {
   name varchar(40) not null,
   create_time datetime default '1999-01-01 11:11:11' null,
   update_time datetime default '1999-01-01 11:11:11' null
}

and then insert one record into table t1,

INSERT INTO `t1` (`name`, `update_time`) VALUES ('kafka', null);

the record will store in MySQL as:

{
  "name": "kafka",
  "create_time": "1999-01-01 11:11:11",
  "update_time": null
}

but if we use debezium (a kafka connect source plugin) pull binlog, and send the record to kafka, the record will change to:

{
  "name": "kafka",
  "create_time": "1999-01-01 11:11:11",
  "update_time": "1999-01-01 11:11:11"
}

Generally, when an optional field which has default value is null, we can treat it as null or default value, it depends on the context.

Public Interfaces

Add a config property accept.optional.nulldefault false, for compatibility.

Proposed Changes

Config property change

Add a config property accept.optional.null in org.apache.kafka.connect.storage.ConverterConfigdefault false. This property would only affect JsonConverter in all build-in converters, both serialization and deserialization. Any third party converters can extend ConverterConfig just like JsonConverterConfig to read this property and implement it.

JsonConverter change

Change private fields TO_CONNECT_CONVERTERS, LOGICAL_CONVERTERS, and private method convertToConnect(Schema schema, JsonNode jsonValue) to no-static.

Serialization(convertToJson), for an optional null field which has default value,

  • if set accept.optional.null=true, take null;
  • if set accept.optional.null=false, take default value.

Deserialization(convertToConnectData), for an optional null field which has default value,

  • if set accept.optional.null=true, take null;
  • if set accept.optional.null=false, take default value.

Compatibility, Deprecation, and Migration Plan

The change will not break the compatibility.

Rejected Alternatives

  1. Always take null on an optional null field which has default value. It will break the compatibility and only cover partial cases.
  • No labels