Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The SerDe stack is an important component of any data pipeline. The current DeserializationSchema and SerializationSchema are lacking certain features that were requested by users or are needed to implement new features.

The core issues that we want to address are:

  • A way to initialize the schema
    • establish external connections
    • generate code on startup
    • no need for lazy initialization

Emitting deserialized values through a Collector

Problem 1: Handling of serialization errors. Returning no results

Bad data that cannot be deserialized properly is a reality in real world use cases. Simply failing a job is not an option, as it usually leads to a failover loop as the bad data will be received again. This was reported already in https://issues.apache.org/jira/browse/FLINK-3679 The solution implemented there was to treat null value returned from DeserializationSchema as an error in deserialization and is ignored. This has the downside that there is no straightforward way to encode null value. Some systems encode control messages as null value, e.g. kafka treats null value as a deletion of a value for a corresponding key.

Introducing a Collector interface will enable differentiating between no results (no calls to Collector#collect) and null result (Collector#collect(null)).

Problem 2: Compacted messages. Single source record might produce multiple records

Currently, the suggested workaround is to emit a single container result and flatten the results in a subsequent operator. The downside of this approach is that the subsequent operator cannot emit watermarks in a partition aware manner. This was reported https://github.com/apache/flink/pull/3314#issuecomment-376237266 

Another example of a potential use case is Debezium CDC format (https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL), where we want to split message like:

{
 "before": {
   "id": 1004,
   "first_name": "Anne",
   "last_name": "Kretchmar",
   "email": "annek@noanswer.org"
 },
 "after": {
   "id": 1004,
   "first_name": "Anne Marie",
   "last_name": "Kretchmar",
   "email": "annek@noanswer.org"
 },
 "source": { ... },
 "op": "u",
 "ts_ms": 1465581029523
}

into two messages, one for UPDATE_BEFORE and one for UPDATE_AFTER.

The workaround to that problem now is to emit a single "container" object and split in a subsequent operation. The problem is it is not possible to generate per partition watermarks based on the nested records.

Implications

In the current KafkaFetcher implementation the deserialization happens outside of a checkpoint lock and only the emitting + state update happens within the lock. This means that updating an offset in Kafka is bound to emit a single deserialized record. If we want to support emitting multiple records from a single source record we have a few options (first two mentioned here):

  1. buffer all the records outside of the checkpoint lock and emit them all at once inside the checkpoint lock. The downside of that is we would keep all the records in memory and users might hit OOM exceptions
  2. move the deserialization into the checkpoint lock. The downside of this approach is this could delay emitting a checkpoint by a significant amount of time in case of expensive deserialization (e.g. decrypting messages). The problem is that the thread that emits records (LegacySourceThread) is different from the thread that triggers the checkpoint barrier (Task Thread/Mailbox thread). Therefore we need to acquire the checkpoint lock for the time of the deserialization so that no checkpoint barrier can be emitted between multiple records produced from deserialization schema.
  3. Implement the number of emitted results bookkeeping that would be included in the checkpoint of KafkaSource. This would enable skipping those records upon restoring. The downside of this approach is that it requires the DeserializationSchema to be deterministic. That it produces the same results in the same order.

I suggest to go with the option 1,  as this has the least implications for current pipelines. At the same time the main use case for a collector is either emitting 0 results or a low number of records that are no larger than its size combined. (e.g. debezium format).

Add open/close methods

Multiple DeserializationSchema and SerializationSchema implementations require initializing internal state that is often not serializable, require establishing external connections, and/or has to happen exactly once at the very beginning. Some of the examples include:

  • establishing connection to a schema registry
  • instantiating an Avro’s DatumReader/Decoder etc.
  • generating code of the (de)serializer in Table API

The workaround right now is a lazy initialization on the deserialization/serialization path. Moreover, it is a common requirement to be able to register metrics in SerDe stack as mentioned e.g here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329

Proposed interfaces:

DeserializationSchema:

@Public
public interface DeserializationSchema<ResultT> extends Serializable, ResultTypeQueryable<ResultT> {
	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	@PublicEvolving
	default void open(InitializationContext context) throws Exception {
	}

	/**
	 * Deserializes the byte message.
	 *
	 * @param message The message, as a byte array.
	 *
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */
	T deserialize(byte[] message) throws IOException;

	/**
	 * Deserializes the byte message.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param message The message, as a byte array.
	 * @param out The collector to put the resulting messages.
	 */
	@PublicEvolving
	default void deserialize(byte[] message, Collector<T> out) throws IOException {
		T deserialize = deserialize(message);
		if (deserialize != null) {
			out.collect(deserialize);
		}
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement The element to test for the end-of-stream signal.
	 * @return True, if the element signals end of stream, false otherwise.
	 */
	boolean isEndOfStream(T nextElement);

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	@PublicEvolving
	default void close() throws Exception {
	}

	/**
	 * A contextual information provided for {@link #open(InitializationContext)} method. It can be used to:
	 * <ul>
	 *     <li>Register user metrics via {@link InitializationContext#getMetricGroup()}</li>
	 * </ul>
	 */
	@PublicEvolving
	interface InitializationContext {
		MetricGroup getMetricGroup();
	}
}

SerializationSchema:

@Public
public interface SerializationSchema<InputT> extends Serializable {
	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #serialize(Object)} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	@PublicEvolving
	default void open(InitializationContext context) throws Exception {
	}

	/**
	 * Serializes the incoming element to a specified type.
	 *
	 * @param element
	 *            The incoming element to be serialized
	 * @return The serialized element.
	 */
	byte[] serialize(T element);

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #serialize(Object)}.
	 */
	@PublicEvolving
	default void close() throws Exception {
	}

	/**
	 * A contextual information provided for {@link #open(InitializationContext)} method. It can be used to:
	 * <ul>
	 *     <li>Register user metrics via {@link InitializationContext#getMetricGroup()}</li>
	 * </ul>
	 */
	@PublicEvolving
	interface InitializationContext {
		MetricGroup getMetricGroup();
	}
}

KafkaDeserializationSchema:

/**
 * The deserialization schema describes how to turn the Kafka ConsumerRecords
 * into data types (Java/Scala objects) that are processed by Flink.
 *
 * @param <T> The type created by the keyed deserialization schema.
 */
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(DeserializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement The element to test for the end-of-stream signal.
	 *
	 * @return True, if the element signals end of stream, false otherwise.
	 */
	boolean isEndOfStream(T nextElement);

	/**
	 * Deserializes the Kafka record.
	 *
	 * @param record Kafka record to be deserialized.
	 *
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */
	T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

	/**
	 * Deserializes the Kafka record.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param message The message, as a byte array.
	 * @param out The collector to put the resulting messages.
	 */
	default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
		out.collect(deserialize(message));
	}

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	@Override
	default void close() throws Exception {
	}
}

KafkaSerializationSchema:

/**
 * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link
 * ProducerRecord ProducerRecords}.
 *
 * <p>Please also implement {@link KafkaContextAware} if your serialization schema needs
 * information
 * about the available partitions and the number of parallel subtasks along with the subtask ID on
 * which the Kafka Producer is running.
 *
 * @param <T> the type of values being serialized
 */
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {


	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #serialize(Object, Long)} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link SerializationSchema.InitializationContext} can be used to access additional
	 * features such as e.g. registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(SerializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Serializes given element and returns it as a {@link ProducerRecord}.
	 *
	 * @param element element to be serialized
	 * @param timestamp timestamp (can be null)
	 * @return Kafka {@link ProducerRecord}
	 */
	ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #serialize(Object, Long)}.
	 */
	default void close() throws Exception {
	}
}

KinesisDeserializationSchema

/**
 * This is a deserialization schema specific for the Flink Kinesis Consumer. Different from the
 * basic {@link DeserializationSchema}, this schema offers additional Kinesis-specific information
 * about the record that may be useful to the user application.
 *
 * @param <T> The type created by the keyed deserialization schema.
 */
@PublicEvolving
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(DeserializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Deserializes a Kinesis record's bytes. If the record cannot be deserialized, {@code null}
	 * may be returned. This informs the Flink Kinesis Consumer to process the Kinesis record
	 * without producing any output for it, i.e. effectively "skipping" the record.
	 *
	 * @param recordValue the record's value as a byte array
	 * @param partitionKey the record's partition key at the time of writing
	 * @param seqNum the sequence number of this record in the Kinesis shard
	 * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
	 * @param stream the name of the Kinesis stream that this record was sent to
	 * @param shardId The identifier of the shard the record was sent to
	 *
	 * @return the deserialized message as an Java object ({@code null} if the message cannot be deserialized).
	 * @throws IOException
	 */
	T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException;

	/**
	 * Deserializes the byte message.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param record a composite Kinesis record
	 * @param out The collector to put the resulting messages.
	 */
	default void deserialize(
			KinesisRecord record,
			Collector<T> out) throws IOException {
		T deserialize = deserialize(record.getData(), record.getPartitionKey(), record.getSequenceNumber(), record.getApproximateArrivalTimestamp(), record.getStream(), record.getShardId());
		if (deserialize != null) {
			out.collect(deserialize);
		}
	}

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	default void close() throws Exception {
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement the element to test for the end-of-stream signal
	 * @return true if the element signals end of stream, false otherwise
	 */
	// TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
}

interface KinesisRecord {
  /** @return consumed data bytes */
  byte[] getData();

  /** @return source AWS Kinesis stream */
  String getStream();

  /** @return source AWS Kinesis stream shard */
  String getShardId();

  /** @return attached partition key */
  String getPartitionKey();

  /** @return sequence number of the consumed record */
  String getSequenceNumber();

  /**
   * @return approximate arrival timestamp (ingestion time at AWS Kinesis) of the consumed record
   */
  long getApproximateArrivalTimestamp();
}

KinesisSerializationSchema

/**
 * Kinesis-specific serialization schema, allowing users to specify a target stream based
 * on a record's contents.
 * @param <T>
 */
@PublicEvolving
public interface KinesisSerializationSchema<T> extends Serializable {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #serialize(Object)} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link SerializationSchema.InitializationContext} can be used to access additional features such
	 * as e.g. registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(SerializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Serialize the given element into a ByteBuffer.
	 *
	 * @param element The element to serialize
	 * @return Serialized representation of the element
	 */
	ByteBuffer serialize(T element);

	/**
	 * Optional method to determine the target stream based on the element.
	 * Return <code>null</code> to use the default stream
	 *
	 * @param element The element to determine the target stream from
	 * @return target stream name
	 */
	String getTargetStream(T element);

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #serialize(Object)}.
	 */
	default void close() throws Exception {
	}
}

PubSubDeserializationSchema

/**
 * The deserialization schema describes how to turn the PubsubMessages
 * into data types (Java/Scala objects) that are processed by Flink.
 *
 * @param <T> The type created by the deserialization schema.
 */
@PublicEvolving
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(DeserializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement The element to test for the end-of-stream signal.
	 *
	 * @return True, if the element signals end of stream, false otherwise.
	 */
	boolean isEndOfStream(T nextElement);

	/**
	 * Deserializes a PubsubMessage.
	 *
	 * @param message PubsubMessage to be deserialized.
	 *
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */
	T deserialize(PubsubMessage message) throws Exception;

	/**
	 * Deserializes a PubsubMessage.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param message The message, as a byte array.
	 * @param out The collector to put the resulting messages.
	 */
	default void deserialize(PubsubMessage message, Collector<T> out) throws Exception {
		T deserialize = deserialize(message);
		if (deserialize != null) {
			out.collect(deserialize);
		}
	}

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	default void close() throws Exception {
	}
}