Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

thread

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-17305

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
/**
 * This is a deserialization schema specific for the Flink Kinesis Consumer. Different from the
 * basic {@link DeserializationSchema}, this schema offers additional Kinesis-specific information
 * about the record that may be useful to the user application.
 *
 * @param <T> The type created by the keyed deserialization schema.
 */
@PublicEvolving
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

	/**
	 * Initialization method for the schema. It is called before the actual working methods
	 * {@link #deserialize} and thus suitable for one time setup work.
	 *
	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
	 * registering user metrics.
	 *
	 * @param context Contextual information that can be used during initialization.
	 */
	default void open(DeserializationSchema.InitializationContext context) throws Exception {
	}

	/**
	 * Deserializes a Kinesis record's bytes. If the record cannot be deserialized, {@code null}
	 * may be returned. This informs the Flink Kinesis Consumer to process the Kinesis record
	 * without producing any output for it, i.e. effectively "skipping" the record.
	 *
	 * @param recordValue the record's value as a byte array
	 * @param partitionKey the record's partition key at the time of writing
	 * @param seqNum the sequence number of this record in the Kinesis shard
	 * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
	 * @param stream the name of the Kinesis stream that this record was sent to
	 * @param shardId The identifier of the shard the record was sent to
	 *
	 * @return the deserialized message as an Java object ({@code null} if the message cannot be deserialized).
	 * @throws IOException
	 */
	T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException;

	/**
	 * Deserializes the byte message.
	 *
	 * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
	 * produced records should be relatively small. Depending on the source implementation records
	 * can be buffered in memory or collecting records might delay emitting checkpoint barrier.
	 *
	 * @param recordValue the record's value as a byte array
	 * @param partitionKey the record's partition key at the time of writingcomposite Kinesis record
	 * @param seqNumout theThe sequencecollector number of this record in the Kinesis shard
	 * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
	 * @param stream the name of the Kinesis stream that this record was sent to
	 * @param shardId The identifier of the shard the record was sent to
	 * @param out The collector to put the resulting messages.
	 */
	default void deserialize(
			byte[] recordValue,
			String partitionKey,
			String seqNum,
			long approxArrivalTimestamp,
			String stream,
			String shardId,
			Collector<T> out) throws IOException {
		T deserialize = deserialize(recordValue, partitionKey, seqNum, approxArrivalTimestamp, stream, shardId);
		if (deserialize != null) {
			out.collect(deserialize);
		}
	}

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	default void close() throws Exception {
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement the element to test for the end-of-stream signal
	 * @return true if the element signals end of stream, false otherwise
	 */
	// TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElementto put the resulting messages.
	 */
	default void deserialize(
			KinesisRecord record,
			Collector<T> out) throws IOException {
		T deserialize = deserialize(record.getData(), record.getPartitionKey(), record.getSequenceNumber(), record.getApproximateArrivalTimestamp(), record.getStream(), record.getShardId());
		if (deserialize != null) {
			out.collect(deserialize);
		}
	}

	/**
	 * Tear-down method for the user code. It is called after all messages has been processed.
	 * After this method is called there will be no more invocations to {@link #deserialize}.
	 */
	default void close() throws Exception {
	}

	/**
	 * Method to decide whether the element signals the end of the stream. If
	 * true is returned the element won't be emitted.
	 *
	 * @param nextElement the element to test for the end-of-stream signal
	 * @return true if the element signals end of stream, false otherwise
	 */
	// TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
}

interface KinesisRecord {
  /** @return consumed data bytes */
  byte[] getData();

  /** @return source AWS Kinesis stream */
  String getStream();

  /** @return source AWS Kinesis stream shard */
  String getShardId();

  /** @return attached partition key */
  String getPartitionKey();

  /** @return sequence number of the consumed record */
  String getSequenceNumber();

  /**
   * @return approximate arrival timestamp (ingestion time at AWS Kinesis) of the consumed record
   */
  long getApproximateArrivalTimestamp();
}

KinesisSerializationSchema

...