Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The SerDe stack is an important component of any data pipeline. The current DeserializationSchema and SerializationSchema are lacking certain features that were requested by users or are needed to implement new features.

...

Emitting deserialized values through a Collector

Problem 1: Handling of serialization errors. Returning no results

Bad data that cannot be deserialized properly is a reality in real world use cases. Simply failing a job is not an option, as it usually leads to a failover loop as the bad data will be received again. This was reported already in https://issues.apache.org/jira/browse/FLINK-3679 The solution implemented there was to treat null value returned from DeserializationSchema as an error in deserialization and is ignored. This has the downside that there is no straightforward way to encode null value. Some systems encode control messages as null value, e.g. kafka treats null value as a deletion of a value for a corresponding key.

Introducing a Collector interface will enable differentiating between no results (no calls to Collector#collect) and null result (Collector#collect(null)).

Problem 2: Compacted messages. Single source record might produce multiple records

Currently, the suggested workaround is to emit a single container result and flatten the results in a subsequent operator. The downside of this approach is that the subsequent operator cannot emit watermarks in a partition aware manner. This was reported https://github.com/apache/flink/pull/3314#issuecomment-376237266 

...

The workaround to that problem now is to emit a single "container" object and split in a subsequent operation. The problem is it is not possible to generate per partition watermarks based on the nested records.

Implications

In the current KafkaFetcher implementation the deserialization happens outside of a checkpoint lock and only the emitting + state update happens within the lock. This means that updating an offset in Kafka is bound to emit a single deserialized record. If we want to support emitting multiple records from a single source record we have a few options (first two mentioned here):

...

I suggest to go with the option 1,  as this has the least implications for current pipelines. At the same time the main use case for a collector is either emitting 0 results or a low number of records that are no larger than its size combined. (e.g. debezium format).

Add open/close methods

Multiple DeserializationSchema and SerializationSchema implementations require initializing internal state that is often not serializable, require establishing external connections, and/or has to happen exactly once at the very beginning. Some of the examples include:

...

The workaround right now is a lazy initialization on the deserialization/serialization path. Moreover, it is a common requirement to be able to register metrics in SerDe stack as mentioned e.g here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329

Proposed interfaces:

DeserializationSchema:


Code Block
languagejava
@Public
public interface DeserializationSchema<ResultT> extends Serializable, ResultTypeQueryable<ResultT> {

    @PublicEvolving
    default void open(InitializationContext context) throws Exception {};
    
    T deserialize(byte[] message) throws IOException;

	@PublicEvolving
    default void deserialize(
           byte[] event,
           Collector<ResultT> collector) throws IOException {
        collector.collect(deserializer(event));
    }

	@PublicEvolving
    default void close() throws Exception {};

	@PublicEvolving
    interface InitializationContext {
        MetricGroup getMetricGroup();
    }
}


SerializationSchema:


Code Block
languagejava
@Public
public interface SerializationSchema<InputT> extends Serializable {

	@PublicEvolving
    default void open(InitializationContext context) throws Exception {};

    byte[] serialize(T element) throws IOException;    

    @PublicEvolving
    default void close() throws Exception {};

    @PublicEvolving
    interface InitializationContext {
        MetricGroup getMetricGroup();
    }
}

KafkaDeserializationSchema:

Code Block
languagejava
/**
 * The deserialization schema describes how to turn the Kafka ConsumerRecords
 * into data types (Java/Scala objects) that are processed by Flink.
 *
 * @param <T> The type created by the keyed deserialization schema.
 */
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	default void open(DeserializationSchema.InitializationContext context) throws Exception {}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement The element to test for the end-of-stream signal.
	 * @return True, if the element signals end of stream, false otherwise.
	 */
	boolean isEndOfStream(T nextElement);

	/**
	 * Deserializes the Kafka record.
	 *
	 * @param record Kafka record to be deserialized.
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */
	T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

	default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
		out.collect(deserialize(message));
	}

	default void close() throws Exception {}
}

KafkaSerializationSchema:

Code Block
languagejava
/**
 * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link
 * ProducerRecord ProducerRecords}.
 *
 * <p>Please also implement {@link KafkaContextAware} if your serialization schema needs
 * information
 * about the available partitions and the number of parallel subtasks along with the subtask ID on
 * which the Kafka Producer is running.
 *
 * @param <T> the type of values being serialized
 */
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {

    default void open(SerializationSchema.InitializationContext context) throws Exception {}; 

	/**
	 * Serializes given element and returns it as a {@link ProducerRecord}.
	 *
	 * @param element element to be serialized
	 * @param timestamp timestamp (can be null)
	 * @return Kafka {@link ProducerRecord}
	 */
	ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

    default void close() throws Exception {};
}

...