You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: "Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The SerDe stack is an important component of any data pipeline. The current DeserializationSchema and SerializationSchema are lacking certain features that were requested by users or are needed to implement new features.

The core issues that we want to address are:

  • A way to initialize the schema
    • establish external connections
    • generate code on startup
    • no need for lazy initialization

Emitting deserialized values through a Collector

Problem 1: Handling of serialization errors. Returning no results

Bad data that cannot be deserialized properly is a reality in real world use cases. Simply failing a job is not an option, as it usually leads to a failover loop as the bad data will be received again. This was reported already in https://issues.apache.org/jira/browse/FLINK-3679 The solution implemented there was to treat null value returned from DeserializationSchema as an error in deserialization and is ignored. This has the downside that there is no straightforward way to encode null value. Some systems encode control messages as null value, e.g. kafka treats null value as a deletion of a value for a corresponding key.

Introducing a Collector interface will enable differentiating between no results (no calls to Collector#collect) and null result (Collector#collect(null)).

Problem 2: Compacted messages. Single source record might produce multiple records

Currently, the suggested workaround is to emit a single container result and flatten the results in a subsequent operator. The downside of this approach is that the subsequent operator cannot emit watermarks in a partition aware manner. This was reported https://github.com/apache/flink/pull/3314#issuecomment-376237266 

Another example of a potential use case is Debezium CDC format (https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL), where we want to split message like:

{
 "before": {
   "id": 1004,
   "first_name": "Anne",
   "last_name": "Kretchmar",
   "email": "annek@noanswer.org"
 },
 "after": {
   "id": 1004,
   "first_name": "Anne Marie",
   "last_name": "Kretchmar",
   "email": "annek@noanswer.org"
 },
 "source": { ... },
 "op": "u",
 "ts_ms": 1465581029523
}

into two messages, one for UPDATE_BEFORE and one for UPDATE_AFTER.

The workaround to that problem now is to emit a single "container" object and split in a subsequent operation. The problem is it is not possible to generate per partition watermarks based on the nested records.

Implications

In the current KafkaFetcher implementation the deserialization happens outside of a checkpoint lock and only the emitting + state update happens within the lock. This means that updating an offset in Kafka is bound to emit a single deserialized record. If we want to support emitting multiple records from a single source record we have a few options (first two mentioned here):

  1. buffer all the records outside of the checkpoint lock and emit them all at once inside the checkpoint lock. The downside of that is we would keep all the records in memory and users might hit OOM exceptions
  2. move the deserialization into the checkpoint lock. The downside of this approach is this could delay emitting a checkpoint by a significant amount of time in case of expensive deserialization (e.g. decrypting messages). The problem is that the thread that emits records (LegacySourceThread) is different from the thread that triggers the checkpoint barrier (Task Thread/Mailbox thread). Therefore we need to acquire the checkpoint lock for the time of the deserialization so that no checkpoint barrier can be emitted between multiple records produced from deserialization schema.
  3. Implement the number of emitted results bookkeeping that would be included in the checkpoint of KafkaSource. This would enable skipping those records upon restoring. The downside of this approach is that it requires the DeserializationSchema to be deterministic. That it produces the same results in the same order.

I suggest to go with the option 1,  as this has the least implications for current pipelines. At the same time the main use case for a collector is either emitting 0 results or a low number of records that are no larger than its size combined. (e.g. debezium format).

Add open/close methods

Multiple DeserializationSchema and SerializationSchema implementations require initializing internal state that is often not serializable, require establishing external connections, and/or has to happen exactly once at the very beginning. Some of the examples include:

  • establishing connection to a schema registry
  • instantiating an Avro’s DatumReader/Decoder etc.
  • generating code of the (de)serializer in Table API

The workaround right now is a lazy initialization on the deserialization/serialization path. Moreover, it is a common requirement to be able to register metrics in SerDe stack as mentioned e.g here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329

Proposed interfaces:

DeserializationSchema:


@Public
public interface DeserializationSchema<ResultT> extends Serializable, ResultTypeQueryable<ResultT> {

    @PublicEvolving
    default void open(InitializationContext context) throws Exception {};
    
    T deserialize(byte[] message) throws IOException;

	@PublicEvolving
    default void deserialize(
           byte[] event,
           Collector<ResultT> collector) throws IOException {
        collector.collect(deserializer(event));
    }

	@PublicEvolving
    default void close() throws Exception {};

	@PublicEvolving
    interface InitializationContext {
        MetricGroup getMetricGroup();
    }
}


SerializationSchema:


@Public
public interface SerializationSchema<InputT> extends Serializable {

	@PublicEvolving
    default void open(InitializationContext context) throws Exception {};

    byte[] serialize(T element) throws IOException;    

    @PublicEvolving
    default void close() throws Exception {};

    @PublicEvolving
    interface InitializationContext {
        MetricGroup getMetricGroup();
    }
}

KafkaDeserializationSchema:

/**
 * The deserialization schema describes how to turn the Kafka ConsumerRecords
 * into data types (Java/Scala objects) that are processed by Flink.
 *
 * @param <T> The type created by the keyed deserialization schema.
 */
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	default void open(DeserializationSchema.InitializationContext context) throws Exception {}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement The element to test for the end-of-stream signal.
	 * @return True, if the element signals end of stream, false otherwise.
	 */
	boolean isEndOfStream(T nextElement);

	/**
	 * Deserializes the Kafka record.
	 *
	 * @param record Kafka record to be deserialized.
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */
	T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

	default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
		out.collect(deserialize(message));
	}

	default void close() throws Exception {}
}

KafkaSerializationSchema:

/**
 * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link
 * ProducerRecord ProducerRecords}.
 *
 * <p>Please also implement {@link KafkaContextAware} if your serialization schema needs
 * information
 * about the available partitions and the number of parallel subtasks along with the subtask ID on
 * which the Kafka Producer is running.
 *
 * @param <T> the type of values being serialized
 */
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {

    default void open(SerializationSchema.InitializationContext context) throws Exception {}; 

	/**
	 * Serializes given element and returns it as a {@link ProducerRecord}.
	 *
	 * @param element element to be serialized
	 * @param timestamp timestamp (can be null)
	 * @return Kafka {@link ProducerRecord}
	 */
	ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

    default void close() throws Exception {};
}




  • No labels