Status

Current stateAccepted

Discussion thread: KIP-481 Discussion

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Most JSON data that utilizes precise decimal data represents it as a decimal number. Connect, on the other hand, only supports a binary BASE64 string encoding (see example below). This KIP intends to support both representations so that it can better integrate with legacy systems (and make the internal topic data easier to read/debug):

Public Interfaces

Introduce a new configuration to the JsonConverter named decimal.format to control whether source converters will serialize decimals in numeric or binary formats. The value will be case insensitive and can be either "BASE64" (default, to maintain compatibility) or "NUMERIC".

Proposed Changes

We propose the following changes:

  1. Define a new decimal.format configuration property on JsonConverter to specify the serialization format for Connect DECIMAL logical type values with two allowed literals as the configuration property:
  2. The JsonConverter deserialization method currently expects only a BinaryNode, but will be changed to also handle NumericNode by calling NumericNode.decimalValue().
  3. JsonDeserializer will now default floating point deserialization to BigDecimal to avoid losing precision. This may impact performance when deserializing doubles - a JMH microbenchmark on my local MBP, this estimated about 3x degradation for deserializing JSON floating points. If the connect schema is not the decimal logical type, the JsonConverter will convert this BigDecimal value into the corresponding floating point java object.
  4. Configure the JsonConverter for internal topics with `decimal.format=NUMERIC` so that if the DECIMAL types will be serialized in a more natural representation. This is safe since connect internal topics do not currently use any decimal types.

To understand behavior of this configuration with and without schemas, refer to the table below.

Source Schema (Info)JsonConverter BehaviorJsonDeserializer Behavior

Schemas EnabledSchemas Disabled
DECIMAL (BASE64)returns DECIMAL logical typethrows DataExceptionstores BinaryNode (byte[] data)
DECIMAL (NUMERIC)returns DECIMAL logical typereturns FLOAT64 (lossy)*stores NumericNode (BigDecimal data)
FLOAT64 (Legacy)returns FLOAT64returns FLOAT64stores NumericNode (Double data)
FLOAT64 (KIP-481)returns FLOAT64returns FLOAT64stores NumericNode (BigDecimal data)**

* previously it was impossible for a sink converter to read decimal data encoded in BASE64 (a DataException was raised when decoding BINARY data without a schema) - after KIP-481, it is possible that a decimal data produced by a source converter with NUMERIC encoding will be deserialized as a float by a sink converter without schemas enabled. The data being read here will be lossy. This is okay because all existing applications will function exactly the same.

** with this behavior, users will be provided with a different NumberNode than previously (DecimalNode vs. DoubleNode). Since Jackson requires calling a strongly typed method (e.g. floatValue vs. decimalValue) to extract data from a NumericNode, applications relying on JsonDeserializer will not be affected by this change

Compatibility, Deprecation, and Migration Plan

Migration Plan. Assuming that the baseline source converter is on Kafka Connect version "A" <= 2.3 and that all sink converters are on Kafka Connect version "B" <= 2.3, and that this change makes it into Kafka Connect version "C" >= 2.4 the upgrade path to utilize this is:

  1. Upgrade all sink converters to version "C" or higher and restart the sink converters. No configuration change is needed for the sink converters.
  2. Upgrade the source converters to version "C" or higher.
  3. If the Connect worker uses the JsonConverter for the key and/or value converters, optionally set the `decimal.format=NUMERIC` for the key and/or value converter and restart the workers.
  4. If desired, update any source connector configs that use the JsonConverter for key and/or value converters to use `decimal.format=NUMERIC`.

Rejected Alternatives