...
...
...
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
| Vote thread |
|
---|
JIRA | Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-17305 |
---|
|
|
---|
Release | 1.11 |
---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
|
@Public
public interface DeserializationSchema<ResultT> extends Serializable, ResultTypeQueryable<ResultT> {
@PublicEvolving
default void open(InitializationContext context) throws Exception {};
T deserialize(byte[] message) throws IOException;
@PublicEvolving
default void deserialize(
byte[] event,
Collector<ResultT> collector) throws IOException {
collector.collect(deserializer(event));
}
@PublicEvolving
default void close() throws Exception {};
@PublicEvolving
interface InitializationContext {
MetricGroup getMetricGroup();
}
} |
SerializationSchema:
Code Block |
---|
|
@Public
public interface SerializationSchema<InputT> extends Serializable {
@PublicEvolving
default void open(InitializationContext context) throws Exception {};
byte[] serialize(T element) throws IOException;
@PublicEvolving
default void close() throws Exception {};
@PublicEvolving
interface InitializationContext {
MetricGroup getMetricGroup();
}
} |
KafkaDeserializationSchema:
Serializable, ResultTypeQueryable<ResultT> {
/**
* Initialization method for the schema. It is called before the actual working methods
* {@link #deserialize} and thus suitable for one time setup work.
*
* <p>The provided {@link InitializationContext} can be used to access additional features such as e.g.
* registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
@PublicEvolving
default void open(InitializationContext context) throws Exception {
}
/**
* Deserializes the byte message.
*
* @param message The message, as a byte array.
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(byte[] message) throws IOException;
/**
* Deserializes the byte message.
*
* <p>Can output multiple records through the {@link Collector}. Note that number and size of the
* produced records should be relatively small. Depending on the source implementation records
* can be buffered in memory or collecting records might delay emitting checkpoint barrier.
*
* @param message The message, as a byte array.
* @param out The collector to put the resulting messages.
*/
@PublicEvolving
default void deserialize(byte[] message, Collector<T> out) throws IOException {
T deserialize = deserialize(message);
if (deserialize != null) {
out.collect(deserialize);
}
}
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
/**
* Tear-down method for the user code. It is called after all messages has been processed.
* After this method is called there will be no more invocations to {@link #deserialize}.
*/
@PublicEvolving
default void close() throws Exception {
}
/**
* A contextual information provided for {@link #open(InitializationContext)} method. It can be used to:
* <ul>
* <li>Register user metrics via {@link InitializationContext#getMetricGroup()}</li>
* </ul>
*/
@PublicEvolving
interface InitializationContext {
MetricGroup getMetricGroup();
}
} |
SerializationSchema:
Code Block |
---|
|
@Public
public interface SerializationSchema<InputT> extends Serializable {
/**
* Initialization method for the schema. It is called before the actual working methods
* {@link #serialize(Object)} and thus suitable for one time setup work.
*
* <p>The provided {@link InitializationContext} can be used to access additional features such as e.g.
* registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
@PublicEvolving
default void open(InitializationContext context) throws Exception {
}
/**
* Serializes the incoming element to a specified type.
*
* @param element
* The incoming element to be serialized
* @return The serialized element.
*/
byte[] serialize(T element);
/**
* Tear-down method for the user code. It is called after all messages has been processed.
* After this method is called there will be no more invocations to {@link #serialize(Object)}.
*/
@PublicEvolving
default void close() throws Exception {
}
/**
* A contextual information provided for {@link #open(InitializationContext)} method. It can be used to:
* <ul>
* <li>Register user metrics via {@link InitializationContext#getMetricGroup()}</li>
* </ul>
*/
@PublicEvolving
interface InitializationContext {
MetricGroup getMetricGroup();
}
} |
KafkaDeserializationSchema:
Code Block |
---|
|
/**
* The deserialization schema describes how to turn the Kafka ConsumerRecords
* into data types (Java/Scala objects) that are processed by Flink.
*
* @param <T> The type created by the keyed deserialization schema.
*/
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Initialization method for the schema. It is called before the actual working methods
* {@link #deserialize} and thus suitable for one time setup work.
*
* <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
* registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
default void open(DeserializationSchema.InitializationContext context) throws Exception {
}
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
*
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
/**
* Deserializes the Kafka record.
*
* @param record Kafka record to be deserialized.
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
/**
* Deserializes the Kafka record.
*
* <p>Can output multiple records through the {@link Collector}. Note that number and size of the
* produced records should be relatively small. Depending on the source implementation records
* can be buffered in memory or collecting records might delay emitting checkpoint barrier.
*
* @param message The message, as a byte array.
* @param out The collector to put the resulting messages.
*/
default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
out.collect(deserialize(message));
}
/**
* Tear-down method for the user code. It is called after all messages has been processed.
* After this method is called there will be no more invocations to {@link #deserialize}.
*/
@Override
default void close() throws Exception {
}
} |
KafkaSerializationSchema:
Code Block |
---|
|
/**
* A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link
* ProducerRecord ProducerRecords}.
*
* <p>Please also implement {@link KafkaContextAware} if your serialization schema needs
* information
* about the available partitions and the number of parallel subtasks along with the subtask ID on
* which the Kafka Producer is running.
*
* @param <T> the type of values being serialized
*/
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {
/**
* Initialization method for the schema. It is called before the actual working methods
* {@link #serialize(Object, Long)} and thus suitable for one time setup work.
*
* <p>The provided {@link SerializationSchema.InitializationContext} can be used to access additional
* features such as e.g. registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
default void open(SerializationSchema.InitializationContext context) throws Exception {
}
/**
* Serializes given element and returns it as a {@link ProducerRecord}.
*
* @param element element to be serialized
* @param timestamp timestamp (can be null)
* @return Kafka {@link ProducerRecord}
*/
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
/**
* Tear-down method for the user code. It is called after all messages has been processed.
* After this method is called there will be no more invocations to {@link #serialize(Object, Long)}.
*/
default void close() throws Exception {
}
} |
KinesisDeserializationSchema
Code Block |
---|
|
/**
* This is a deserialization schema specific for the Flink Kinesis Consumer. Different from the
* basic {@link DeserializationSchema}, this schema offers additional Kinesis-specific information
* about the record that may be useful to the user application.
*
* @param <T> The type created by the keyed deserialization schema.
*/
@PublicEvolving
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Initialization method for the schema. It is called before the actual working methods
* {@link #deserialize} and thus suitable for one time setup work.
*
* <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
* registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
default void open(DeserializationSchema.InitializationContext context) throws Exception {
}
/**
* Deserializes a Kinesis record's bytes. If the record cannot be deserialized, {@code null}
* may be returned. This informs the Flink Kinesis Consumer to process the Kinesis record
* without producing any output for it, i.e. effectively "skipping" the record.
*
* @param recordValue the record's value as a byte array
* @param partitionKey the record's partition key at the time of writing
* @param seqNum the sequence number of this record in the Kinesis shard
* @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
* @param stream the name of the Kinesis stream that this record was sent to
* @param shardId The identifier of the shard the record was sent to
*
* @return the deserialized message as an Java object ({@code null} if the message cannot be deserialized).
* @throws IOException
*/
T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException;
/**
* Deserializes the byte message.
*
* <p>Can output multiple records through the {@link Collector}. Note that number and size of the
* produced records should be relatively small. Depending on the source implementation records
* can be buffered in memory or collecting records might delay emitting checkpoint barrier.
*
* @param record a composite Kinesis record
* @param out The collector to put the resulting messages.
*/
default void deserialize(
KinesisRecord record,
Collector<T> out) throws IOException {
T deserialize = deserialize(record.getData(), record.getPartitionKey(), record.getSequenceNumber(), record.getApproximateArrivalTimestamp(), record.getStream(), record.getShardId());
if (deserialize != null) {
out.collect(deserialize);
}
}
/**
* Tear-down method for the user code. It is called after all messages has been processed.
* After this method is called there will be no more invocations to {@link #deserialize}.
*/
default void close() throws Exception {
}
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement the element to test for the end-of-stream signal
* @return true if the element signals end of stream, false otherwise
*/
// TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
}
interface KinesisRecord {
/** @return consumed data bytes */
byte[] getData();
/** @return source AWS Kinesis stream */
String getStream();
/** @return source AWS Kinesis stream shard */
String getShardId();
/** @return attached partition key */
String getPartitionKey();
/** @return sequence number of the consumed record */
String getSequenceNumber();
/**
* @return approximate arrival timestamp (ingestion time at AWS Kinesis) of the consumed record
*/
long getApproximateArrivalTimestamp();
} |
KinesisSerializationSchema
Code Block |
---|
|
/**
* Kinesis-specific serialization schema, allowing users to specify a target stream based
* on a record's contents.
* @param <T>
*/
@PublicEvolving
public interface KinesisSerializationSchema<T> extends Serializable {
/**
* Initialization method for the schema. It is called before the actual working methods
* {@link #serialize(Object)} and thus suitable for one time setup work.
*
* <p>The provided {@link SerializationSchema.InitializationContext} can be used to access additional features such
* as e.g. registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
default void open(SerializationSchema.InitializationContext context) throws Exception {
}
/**
* Serialize the given element into a ByteBuffer.
*
* @param element The element to serialize
* @return Serialized representation of the element
*/
ByteBuffer serialize(T element);
/**
* Optional method to determine the target stream based on the element.
* Return <code>null</code> to use the default stream
*
* @param element The element to determine the target stream from
* @return target stream name
*/
String getTargetStream(T element);
/**
* Tear-down method for the user code. It is called after all messages has been processed.
* After this method is called there will be no more invocations to {@link #serialize(Object)}.
*/
default void close() throws Exception {
}
} |
PubSubDeserializationSchema
Code Block |
---|
|
/**
* The deserialization schema describes how to turn the PubsubMessages
* into data types (Java/Scala objects) that are processed by Flink.
*
* @param <T> The type created by the deserialization schema.
*/
@PublicEvolving
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Initialization method for the schema. It is called before the actual working methods
* {@link #deserialize} and thus suitable for one time setup work.
*
* <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
* registering user metrics.
*
* @param context Contextual information that can be used during initialization.
*/
|
Code Block |
---|
|
/**
* The deserialization schema describes how to turn the Kafka ConsumerRecords
* into data types (Java/Scala objects) that are processed by Flink.
*
* @param <T> The type created by the keyed deserialization schema.
*/
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
default void open(DeserializationSchema.InitializationContext context) throws Exception {
}
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
*
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
/**
* Deserializes thea Kafka recordPubsubMessage.
*
* @param record@param Kafkamessage recordPubsubMessage to be deserialized.
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
out.collect(deserialize(message));
}
default void close() throws Exception {}
} |
KafkaSerializationSchema:
Code Block |
---|
|
/**
* A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link
* ProducerRecord ProducerRecords}.
*
* <p>Please also implement {@link KafkaContextAware} if your serialization schema needs
* information
* about the available partitions and the number of parallel subtasks along with the subtask ID on
* which the Kafka Producer is running.
*
* @param <T> the type of values being serialized
*/
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {
default void open(SerializationSchema.InitializationContext context) throws Exception {}; if the message cannot be deserialized).
*/
T deserialize(PubsubMessage message) throws Exception;
/**
* Deserializes a PubsubMessage.
*
* <p>Can output multiple records through the {@link Collector}. Note that number and size of the
* produced records should be relatively small. Depending on the source implementation records
* can be buffered in memory or collecting records might delay emitting checkpoint barrier.
*
* @param message The message, as a byte array.
* @param out The collector to put the resulting messages.
*/
default void deserialize(PubsubMessage message, Collector<T> out) throws Exception {
T deserialize = deserialize(message);
if (deserialize != null) {
out.collect(deserialize);
}
}
/**
* SerializesTear-down givenmethod elementfor andthe returnsuser itcode. asIt ais {@link ProducerRecord}.
*
* @param element element to be serializedcalled after all messages has been processed.
* @param timestamp timestamp (can After this method is called there will be null)
* @return Kafkano more invocations to {@link ProducerRecord#deserialize}.
*/
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
default void close() throws Exception {
};
} |