Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: https://lists.apache.org/thread/lxfq8y8co76mjwxhowwx6cjl0hw1mxpy
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Release: 3.5.0
Motivation
org.apache.kafka.connect.data.Schema
to describe record structure. There are two methods boolean isOptional()
and Object defaultValue()
may cause confused.
Code Block | ||
---|---|---|
| ||
private JsonNode convertToJson(Schema schema, Object value) {
if (value == null) {
if (schema == null)
return null;
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue());
if (schema.isOptional())
return JsonNodeFactory.instance.nullNode();
throw new DataException("Conversion error: null value for field that is required and has no default value");
}
...
} |
isOptional()
return true
and defaultValue()
return not null
, in other word, an optional field which has default value, if we use org.apache.kafka.connect.json.JsonConverter
to serialize the field, will eventually get defaultValue()
In particular, if we use Debezium to stream changes out of a table, t1, in mysql:
Code Block | ||
---|---|---|
| ||
create table t1 { name varchar(40) not null, create_time datetime default '1999-01-01 11:11:11' null, update_time datetime default '1999-01-01 11:11:11' null } |
...
Code Block | ||
---|---|---|
| ||
{ "name": "kafka", "create_time": "1999-01-01 11:11:11", "update_time": null } |
debezium
Code Block | ||
---|---|---|
| ||
{ "name": "kafka", "create_time": "1999-01-01 11:11:11", "update_time": "1999-01-01 11:11:11" } |
null
, we can treat it as null
or default value
, it depends on the context
Public Interfaces
Add
...
a configuration to JsonConverter:
Name: replace.null.with.default
Description: Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used.
Type: Boolean
Default: true
Proposed Changes
Update JsonConverter and JsonConverterConfig to handle the new configuration. It will affect serialization when the converter runs in a source pipeline and deserialization in sink pipelines.
Proposed Changes
in org.apache.kafka.connect.storage.ConverterConfig
, default false
ny third party converters can extend ConverterConfig
just like JsonConverterConfig to read this property and implement it.
Serialization(convertToJson), for an optional null field which has default value,
...
if set serialize.accept.optional.null=true, take null;
...
Compatibility, Deprecation, and Migration Plan
The change will not break the compatibilitynew configuration keeps the current behavior by default so it will not affect existing users. Users can optionally opt-in the new behavior by setting the configuration to false.
Rejected Alternatives
null
on an optional anull