Discussion threadhttps://lists.apache.org/thread/gyvybodvk4t0grpl6ltywvqqhkgqosfh
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Pub/Sub Lite is a low-cost google cloud streaming system with more familiar semantics to users of OSS streaming systems, exposing a partitioned log of messages for the user to interact with. We have had requests from customers to implement a Flink connector for our system to process streaming data.

Public Interfaces

This connector also introduces the following public interfaces. If any of them can be replaced by a common, generic interface that already exists, that will be done instead.

/** Extracts the timestamp from an incoming message.
 *
 *  <p>Generic to allow extraction from attributes or payload instead.
 */
public interface MessageTimestampExtractor extends Serializable {
  Instant timestamp(SequencedMessage m);

  static MessageTimestampExtractor publishTimeExtractor() { ... }
  static MessageTimestampExtractor eventTimeExtractor() { ... }
}

public interface PubsubLiteDeserializationSchema<T> extends Serializable {
  void open(DeserializationSchema.InitializationContext context) throws Exception;

  /**
   * Deserialize a Pub/Sub Lite message
   *
   * <p>If a message cannot be deserialized, the schema can either throw a exception which will
   * fail the source, or it can return null in which case the source will skip the message and
   * proceed.
   *
   * @param message The pub/sub lite message
   * @return The deserialized message as an object (null if the message cannot be deserialized).
   */
  @Nullable T deserialize(SequencedMessage message) throws Exception;

  TypeInformation<T> getProducedType();

  /** Deserialize only the data field to the destination type. */
  static <T> PubsubLiteDeserializationSchema<T> dataOnly(DeserializationSchema<T> schema) { ... }
}

public interface PubsubLiteSerializationSchema<T> extends Serializable {
  void open(SerializationSchema.InitializationContext context) throws Exception;

  PubSubMessage serialize(T value, Instant timestamp);

  /** Only populate the data field and timestamp of the produced message. */
  static <T> PubsubLiteSerializationSchema<T> dataOnly(SerializationSchema<T> schema) { ... }
}


Proposed Changes

This will initially implement a DataStream source and sink API for Pub/Sub Lite. A Table API will be considered for the future based on demand, but will initially rely on the DataStream-Table API interop.

The proposed implementation is staged on github.

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users.

For future users, support will be provided by the service team at the same level as other client libraries and connectors are supported. An email alias to route issues to the service team will be used to route issues from the github repo, as well as to auto-assign issues in JIRA to if possible.

Test Plan

This connector will have both unit tests for components and an integration test initially. Due to the closed-source nature of the source system, it is not possible to stand up an instance of the implementation at present. The integration test will only run when provided credentials. Standard Google Cloud Client CI (Kokoro) that has credentials will be added to the repo if that is possible- if not, tests will be manually run before each release, and continually run in a separate repo.