Status
Current state: "Under Discussion"
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The SerDe stack is an important component of any data pipeline. The current DeserializationSchema and SerializationSchema are lacking certain features that were requested by users or are needed to implement new features.
...
- Access to metrics [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329]
Emitting deserialized values through a Collector
Problem 1: Handling of serialization errors. Returning no results
Bad data that cannot be deserialized properly is a reality in real world use cases. Simply failing a job is not an option, as it usually leads to a failover loop as the bad data will be received again. This was reported already in https://issues.apache.org/jira/browse/FLINK-3679 The solution implemented there was to treat null value returned from DeserializationSchema as an error in deserialization and is ignored. This has the downside that there is no straightforward way to encode null value. Some systems encode control messages as null value, e.g. kafka treats null value as a deletion of a value for a corresponding key.
Introducing a Collector interface will enable differentiating between no results (no calls to Collector#collect) and null result (Collector#collect(null)).
Problem 2: Compacted messages. Single source record might produce multiple records
Currently, the suggested workaround is to emit a single container result and flatten the results in a subsequent operator. The downside of this approach is that the subsequent operator cannot emit watermarks in a partition aware manner. This was reported https://github.com/apache/flink/pull/3314#issuecomment-376237266
...
The workaround to that problem now is to emit a single "container" object and split in a subsequent operation. The problem is it is not possible to generate per partition watermarks based on the nested records.
Implications
In the current KafkaFetcher implementation the deserialization happens outside of a checkpoint lock and only the emitting + state update happens within the lock. This means that updating an offset in Kafka is bound to emit a single deserialized record. If we want to support emitting multiple records from a single source record we have a few options (first two mentioned here):
...
I suggest to go with the option 1, as this has the least implications for current pipelines. At the same time the main use case for a collector is either emitting 0 results or a low number of records that are no larger than its size combined. (e.g. debezium format).
Add open/close methods
Multiple DeserializationSchema and SerializationSchema implementations require initializing internal state that is often not serializable, require establishing external connections, and/or has to happen exactly once at the very beginning. Some of the examples include:
...
The workaround right now is a lazy initialization on the deserialization/serialization path. Moreover, it is a common requirement to be able to register metrics in SerDe stack as mentioned e.g here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
Proposed interfaces:
DeserializationSchema:
Code Block | ||
---|---|---|
| ||
@Public public interface DeserializationSchema<ResultT> extends Serializable, ResultTypeQueryable<ResultT> { @PublicEvolving default void open(InitializationContext context) throws Exception {}; T deserialize(byte[] message) throws IOException; @PublicEvolving default void deserialize( byte[] event, Collector<ResultT> collector) throws IOException { collector.collect(deserializer(event)); } @PublicEvolving default void close() throws Exception {}; @PublicEvolving interface InitializationContext { MetricGroup getMetricGroup(); } } |
SerializationSchema:
Code Block | ||
---|---|---|
| ||
@Public public interface SerializationSchema<InputT> extends Serializable { @PublicEvolving default void open(InitializationContext context) throws Exception {}; byte[] serialize(T element) throws IOException; @PublicEvolving default void close() throws Exception {}; @PublicEvolving interface InitializationContext { MetricGroup getMetricGroup(); } } |
KafkaDeserializationSchema:
Code Block | ||
---|---|---|
| ||
/** * The deserialization schema describes how to turn the Kafka ConsumerRecords * into data types (Java/Scala objects) that are processed by Flink. * * @param <T> The type created by the keyed deserialization schema. */ @PublicEvolving public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { default void open(DeserializationSchema.InitializationContext context) throws Exception {} /** * Method to decide whether the element signals the end of the stream. If * true is returned the element won't be emitted. * * @param nextElement The element to test for the end-of-stream signal. * @return True, if the element signals end of stream, false otherwise. */ boolean isEndOfStream(T nextElement); /** * Deserializes the Kafka record. * * @param record Kafka record to be deserialized. * @return The deserialized message as an object (null if the message cannot be deserialized). */ T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception { out.collect(deserialize(message)); } default void close() throws Exception {} } |
KafkaSerializationSchema:
Code Block | ||
---|---|---|
| ||
/** * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link * ProducerRecord ProducerRecords}. * * <p>Please also implement {@link KafkaContextAware} if your serialization schema needs * information * about the available partitions and the number of parallel subtasks along with the subtask ID on * which the Kafka Producer is running. * * @param <T> the type of values being serialized */ @PublicEvolving public interface KafkaSerializationSchema<T> extends Serializable { default void open(SerializationSchema.InitializationContext context) throws Exception {}; /** * Serializes given element and returns it as a {@link ProducerRecord}. * * @param element element to be serialized * @param timestamp timestamp (can be null) * @return Kafka {@link ProducerRecord} */ ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp); default void close() throws Exception {}; } |
...