Status
Current state: Under Discussion
Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg105887.html
JIRA:
Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | KAFKA-8713 |
---|
|
Motivation
Kafka Connect use org.apache.kafka.connect.data.Schema
to describe record structure. There are two methods boolean isOptional()
and Object defaultValue()
may cause confused.
Code Block |
---|
|
private JsonNode convertToJson(Schema schema, Object value) {
if (value == null) {
if (schema == null)
return null;
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue());
if (schema.isOptional())
return JsonNodeFactory.instance.nullNode();
throw new DataException("Conversion error: null value for field that is required and has no default value");
}
...
} |
As currently implementation, when isOptional()
return true
and defaultValue()
return not null
, in other word, an optional field which has default value, if we use org.apache.kafka.connect.json.JsonConverter
to serialize the field, will eventually get defaultValue()
. It make sense in most scenario.
But there are some exceptions. For example, if we define a table named t1
in MySQL,
Code Block |
---|
|
create table t1 {
name varchar(40) not null,
create_time datetime default '1999-01-01 11:11:11' null,
update_time datetime default '1999-01-01 11:11:11' null
} |
and then insert one record into table t1
,
Code Block |
---|
|
INSERT INTO `t1` (`name`, `update_time`) VALUES ('kafka', null); |
the record will store in MySQL as:
Code Block |
---|
|
{
"name": "kafka",
"create_time": "1999-01-01 11:11:11",
"update_time": null
} |
but if we use debezium
(a kafka connect source plugin) pull binlog, and send the record to kafka, the record will change to:
Code Block |
---|
|
{
"name": "kafka",
"create_time": "1999-01-01 11:11:11",
"update_time": "1999-01-01 11:11:11"
} |
Generally, when an optional field which has default value is null
, we can treat it as null
or default value
, it depends on the context.
Public Interfaces
Add a config property `serialize.accept.optional.null`, default `false`, for compatibility.
Proposed Changes
Config property change
Add a config property `serialize.accept.optional.null` in org.apache.kafka.connect.storage.ConverterConfig
, default false
. This property would only affect JsonConverter in all build-in converters serialization. Any third party converters can extend ConverterConfig
just like JsonConverterConfig to read this property and implement it.
JsonConverter change
Serialization(convertToJson), for an optional null field which has default value,
if set serialize.accept.optional.null=true, take null;
if set serialize.accept.optional.null=false, take default value.
Compatibility, Deprecation, and Migration Plan
The change will not break the compatibility.
Rejected Alternatives
- Always take
null
on an optional null
field which has default value. It will break the compatibility and only cover partial cases. - Apply for both serialization and deserialization. I have not see scenario in deserialization.