Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

thread

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-17305

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The SerDe stack is an important component of any data pipeline. The current DeserializationSchema and SerializationSchema are lacking certain features that were requested by users or are needed to implement new features.

...

Emitting deserialized values through a Collector

Problem 1: Handling of serialization errors. Returning no results

Bad data that cannot be deserialized properly is a reality in real world use cases. Simply failing a job is not an option, as it usually leads to a failover loop as the bad data will be received again. This was reported already in https://issues.apache.org/jira/browse/FLINK-3679 The solution implemented there was to treat null value returned from DeserializationSchema as an error in deserialization and is ignored. This has the downside that there is no straightforward way to encode null value. Some systems encode control messages as null value, e.g. kafka treats null value as a deletion of a value for a corresponding key.

Introducing a Collector interface will enable differentiating between no results (no calls to Collector#collect) and null result (Collector#collect(null)).

Problem 2: Compacted messages. Single source record might produce multiple records

Currently, the suggested workaround is to emit a single container result and flatten the results in a subsequent operator. The downside of this approach is that the subsequent operator cannot emit watermarks in a partition aware manner. This was reported https://github.com/apache/flink/pull/3314#issuecomment-376237266 

...

The workaround to that problem now is to emit a single "container" object and split in a subsequent operation. The problem is it is not possible to generate per partition watermarks based on the nested records.

Implications

In the current KafkaFetcher implementation the deserialization happens outside of a checkpoint lock and only the emitting + state update happens within the lock. This means that updating an offset in Kafka is bound to emit a single deserialized record. If we want to support emitting multiple records from a single source record we have a few options (first two mentioned here):

...

I suggest to go with the option 1,  as this has the least implications for current pipelines. At the same time the main use case for a collector is either emitting 0 results or a low number of records that are no larger than its size combined. (e.g. debezium format).

Add open/close methods

Multiple DeserializationSchema and SerializationSchema implementations require initializing internal state that is often not serializable, require establishing external connections, and/or has to happen exactly once at the very beginning. Some of the examples include:

...

The workaround right now is a lazy initialization on the deserialization/serialization path. Moreover, it is a common requirement to be able to register metrics in SerDe stack as mentioned e.g here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329

Proposed interfaces:

DeserializationSchema:

Code Block
languagejava
@Public
public interface DeserializationSchema<ResultT> extends Serializable, ResultTypeQueryable<ResultT> {

    @PublicEvolving
    default void open(InitializationContext context) throws Exception {};
    
    T deserialize(byte[] message) throws IOException;

	@PublicEvolving
    default void deserialize(
           byte[] event,
           Collector<ResultT> collector) throws IOException {
        collector.collect(deserializer(event));
    }

	@PublicEvolving
    default void close() throws Exception {};

	@PublicEvolving
    interface InitializationContext {
        MetricGroup getMetricGroup();
    }
}

SerializationSchema:

Code Block
languagejava
@Public
public interface SerializationSchema<InputT> extends Serializable {

	@PublicEvolving
    default void open(InitializationContext context) throws Exception {};

    byte[] serialize(T element) throws IOException;    

    @PublicEvolving
    default void close() throws Exception {};

    @PublicEvolving
    interface InitializationContext {
        MetricGroup getMetricGroup();
    }
}

KafkaDeserializationSchema:

 Serializable, ResultTypeQueryable<ResultT> {
	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	@PublicEvolving
	default void open(InitializationContext context) throws Exception {
	}

	/**
	 * Deserializes the byte message.
	 *
	 * @param message The message, as a byte array.
	 *
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */
	T deserialize(byte[] message) throws IOException;

	/**
	 * Deserializes the byte message.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param message The message, as a byte array.
	 * @param out The collector to put the resulting messages.
	 */
	@PublicEvolving
	default void deserialize(byte[] message, Collector<T> out) throws IOException {
		T deserialize = deserialize(message);
		if (deserialize != null) {
			out.collect(deserialize);
		}
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement The element to test for the end-of-stream signal.
	 * @return True, if the element signals end of stream, false otherwise.
	 */
	boolean isEndOfStream(T nextElement);

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	@PublicEvolving
	default void close() throws Exception {
	}

	/**
	 * A contextual information provided for {@link #open(InitializationContext)} method. It can be used to:
	 * <ul>
	 *     <li>Register user metrics via {@link InitializationContext#getMetricGroup()}</li>
	 * </ul>
	 */
	@PublicEvolving
	interface InitializationContext {
		MetricGroup getMetricGroup();
	}
}

SerializationSchema:

Code Block
languagejava
@Public
public interface SerializationSchema<InputT> extends Serializable {
	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #serialize(Object)} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	@PublicEvolving
	default void open(InitializationContext context) throws Exception {
	}

	/**
	 * Serializes the incoming element to a specified type.
	 *
	 * @param element
	 *            The incoming element to be serialized
	 * @return The serialized element.
	 */
	byte[] serialize(T element);

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #serialize(Object)}.
	 */
	@PublicEvolving
	default void close() throws Exception {
	}

	/**
	 * A contextual information provided for {@link #open(InitializationContext)} method. It can be used to:
	 * <ul>
	 *     <li>Register user metrics via {@link InitializationContext#getMetricGroup()}</li>
	 * </ul>
	 */
	@PublicEvolving
	interface InitializationContext {
		MetricGroup getMetricGroup();
	}
}

KafkaDeserializationSchema:

Code Block
languagejava
/**
 * The deserialization schema describes how to turn the Kafka ConsumerRecords
 * into data types (Java/Scala objects) that are processed by Flink.
 *
 * @param <T> The type created by the keyed deserialization schema.
 */
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(DeserializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement The element to test for the end-of-stream signal.
	 *
	 * @return True, if the element signals end of stream, false otherwise.
	 */
	boolean isEndOfStream(T nextElement);

	/**
	 * Deserializes the Kafka record.
	 *
	 * @param record Kafka record to be deserialized.
	 *
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */
	T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

	/**
	 * Deserializes the Kafka record.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param message The message, as a byte array.
	 * @param out The collector to put the resulting messages.
	 */
	default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
		out.collect(deserialize(message));
	}

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	@Override
	default void close() throws Exception {
	}
}

KafkaSerializationSchema:

Code Block
languagejava
/**
 * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link
 * ProducerRecord ProducerRecords}.
 *
 * <p>Please also implement {@link KafkaContextAware} if your serialization schema needs
 * information
 * about the available partitions and the number of parallel subtasks along with the subtask ID on
 * which the Kafka Producer is running.
 *
 * @param <T> the type of values being serialized
 */
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {


	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #serialize(Object, Long)} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link SerializationSchema.InitializationContext} can be used to access additional
	 * features such as e.g. registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(SerializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Serializes given element and returns it as a {@link ProducerRecord}.
	 *
	 * @param element element to be serialized
	 * @param timestamp timestamp (can be null)
	 * @return Kafka {@link ProducerRecord}
	 */
	ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #serialize(Object, Long)}.
	 */
	default void close() throws Exception {
	}
}

KinesisDeserializationSchema

Code Block
languagejava
/**
 * This is a deserialization schema specific for the Flink Kinesis Consumer. Different from the
 * basic {@link DeserializationSchema}, this schema offers additional Kinesis-specific information
 * about the record that may be useful to the user application.
 *
 * @param <T> The type created by the keyed deserialization schema.
 */
@PublicEvolving
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(DeserializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Deserializes a Kinesis record's bytes. If the record cannot be deserialized, {@code null}
	 * may be returned. This informs the Flink Kinesis Consumer to process the Kinesis record
	 * without producing any output for it, i.e. effectively "skipping" the record.
	 *
	 * @param recordValue the record's value as a byte array
	 * @param partitionKey the record's partition key at the time of writing
	 * @param seqNum the sequence number of this record in the Kinesis shard
	 * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
	 * @param stream the name of the Kinesis stream that this record was sent to
	 * @param shardId The identifier of the shard the record was sent to
	 *
	 * @return the deserialized message as an Java object ({@code null} if the message cannot be deserialized).
	 * @throws IOException
	 */
	T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException;

	/**
	 * Deserializes the byte message.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param record a composite Kinesis record
	 * @param out The collector to put the resulting messages.
	 */
	default void deserialize(
			KinesisRecord record,
			Collector<T> out) throws IOException {
		T deserialize = deserialize(record.getData(), record.getPartitionKey(), record.getSequenceNumber(), record.getApproximateArrivalTimestamp(), record.getStream(), record.getShardId());
		if (deserialize != null) {
			out.collect(deserialize);
		}
	}

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	default void close() throws Exception {
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement the element to test for the end-of-stream signal
	 * @return true if the element signals end of stream, false otherwise
	 */
	// TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
}

interface KinesisRecord {
  /** @return consumed data bytes */
  byte[] getData();

  /** @return source AWS Kinesis stream */
  String getStream();

  /** @return source AWS Kinesis stream shard */
  String getShardId();

  /** @return attached partition key */
  String getPartitionKey();

  /** @return sequence number of the consumed record */
  String getSequenceNumber();

  /**
   * @return approximate arrival timestamp (ingestion time at AWS Kinesis) of the consumed record
   */
  long getApproximateArrivalTimestamp();
}

KinesisSerializationSchema

Code Block
languagejava
/**
 * Kinesis-specific serialization schema, allowing users to specify a target stream based
 * on a record's contents.
 * @param <T>
 */
@PublicEvolving
public interface KinesisSerializationSchema<T> extends Serializable {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #serialize(Object)} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link SerializationSchema.InitializationContext} can be used to access additional features such
	 * as e.g. registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(SerializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Serialize the given element into a ByteBuffer.
	 *
	 * @param element The element to serialize
	 * @return Serialized representation of the element
	 */
	ByteBuffer serialize(T element);

	/**
	 * Optional method to determine the target stream based on the element.
	 * Return <code>null</code> to use the default stream
	 *
	 * @param element The element to determine the target stream from
	 * @return target stream name
	 */
	String getTargetStream(T element);

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #serialize(Object)}.
	 */
	default void close() throws Exception {
	}
}

PubSubDeserializationSchema

Code Block
languagejava
/**
 * The deserialization schema describes how to turn the PubsubMessages
 * into data types (Java/Scala objects) that are processed by Flink.
 *
 * @param <T> The type created by the deserialization schema.
 */
@PublicEvolving
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
Code Block
languagejava
/**
 * The deserialization schema describes how to turn the Kafka ConsumerRecords
 * into data types (Java/Scala objects) that are processed by Flink.
 *
 * @param <T> The type created by the keyed deserialization schema.
 */
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	default void open(DeserializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement The element to test for the end-of-stream signal.
	 *
	 * @return True, if the element signals end of stream, false otherwise.
	 */
	boolean isEndOfStream(T nextElement);

	/**
	 * Deserializes thea Kafka recordPubsubMessage.
	 *
	 * @param record@param Kafkamessage recordPubsubMessage to be deserialized.
	 *
	 * @return The deserialized message as an object (null if the message cannot be deserialized).
	 */
	T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;

	default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
		out.collect(deserialize(message));
	}

	default void close() throws Exception {}
}

KafkaSerializationSchema:

Code Block
languagejava
/**
 * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link
 * ProducerRecord ProducerRecords}.
 *
 * <p>Please also implement {@link KafkaContextAware} if your serialization schema needs
 * information
 * about the available partitions and the number of parallel subtasks along with the subtask ID on
 * which the Kafka Producer is running.
 *
 * @param <T> the type of values being serialized
 */
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {

    default void open(SerializationSchema.InitializationContext context) throws Exception {};  if the message cannot be deserialized).
	 */
	T deserialize(PubsubMessage message) throws Exception;

	/**
	 * Deserializes a PubsubMessage.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param message The message, as a byte array.
	 * @param out The collector to put the resulting messages.
	 */
	default void deserialize(PubsubMessage message, Collector<T> out) throws Exception {
		T deserialize = deserialize(message);
		if (deserialize != null) {
			out.collect(deserialize);
		}
	}

	/**
	 * SerializesTear-down givenmethod elementfor andthe returnsuser itcode. asIt ais {@link ProducerRecord}.
	 *
	 * @param element element to be serializedcalled after all messages has been processed.
	 * @param timestamp timestamp (can After this method is called there will be null)
	 * @return Kafkano more invocations to {@link ProducerRecord#deserialize}.
	 */
	ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

    default void close() throws Exception {
	};
}