Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Changes to reflect implementation needs.

...

A new org.apache.kafka.connect.Header interface will be added and used as the public API for a single header on a record. The interface defines simple getters for the key, the value, and the value's schema, as well as the ability to get the raw serialized byte representation. These are immutable objects, and there are also methods to creating a new Header object with a different name or value.

Code Block
package org.apache.kafka.connect.header;
public interface Header {

    // Access the key and value
    String key(); // never null
    Schema schema(); // may be null
    Object value(); // may be null

    byte[] rawValue(); // may be null
 
    // Methods to create a copy
    Header with(Schema schema, Object value);
    Header rename(String key);
}

...

Each Header has a value that can be used by sink connectors and simple message transforms. However, the type of the header's values depends on how the headers were created in the first place and how they were serialized and deserialized. A new set of conversion utility methods will be added to make it easy for SMTs and sink connectors to convert the header values into a type that it can easily use. These conversions may require both the original schema and value. The conversions to and from strings use the same mechanism described by the SimpleHeaderConverter above. 

For example, an SMT or sink connector might expect a header value to be a long, and can use these utility methods to convert any numeric value (e.g., int, short, String, BigDecimal, etc.). Or, a sink connector might expect a Timestamp logical data type, so it can use the Values.convertToTimestamp(s,v) method to convert from any ISO-8601 formatted string representation of a timestamp or date, or number of millis past epoch represented as a long or string.

...

Code Block
package org.apache.kafka.connect.data;
public class Values {

    // All methods return null when value is null, and throw a DataException
    // if the value cannot be converted to the desired type.
    // If the value is already the desired type, these methods simply return it.
    public static Boolean convertToBoolean(Schema schema, Object value) throws DataException {...}
    public static Byte convertToByte(Schema schema, Object value) throws DataException {...}
    public static Short convertToShort(Schema schema, Object value) throws DataException {...}
    public static Integer convertToInteger(Schema schema, Object value) throws DataException {...}
    public static Long convertToLong(Schema schema, Object value) throws DataException {...}
    public static Float convertToFloat(Schema schema, Object value) throws DataException {...}
    public static Double convertToDouble(Schema schema, Object value) throws DataException {...}
    public static String convertToString(Schema schema, Object value) {...}
    public static java.util.Date convertToTime(Schema schema, Object value) throws DataException {...}
    public static java.util.Date convertToDate(Schema schema, Object value) throws DataException {...}
    public static java.util.Date convertToTimestamp(Schema schema, Object value) throws DataException {...}
    public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) throws DataException {...}
 
    // These only support converting from a compatible string form, which is the same
    // format used in the SimpleHeaderConverter described above
    public static List<?> convertToList(Object value) {...}
    public static Map<?, ?> convertToMap(Object value) {...}
 
    // Only supports returning the value if it already is a Struct.
    public static Struct convertToStruct(Object value) {...}
}

...