Status

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.14

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Background and Motivation

As we have proposed in dev-list. We have developed a Pulsar Flink Connector based on Flink 1.9.0 and Pulsar 2.4.0, and wanted to contribute it back to Flink.

A brief introduction to Apache Pulsar

Apache Pulsar is a multi-tenant, high-performance distributed pub-sub messaging system. Pulsar includes multiple features such as native support for multiple clusters in a Pulsar instance, with seamless geo-replication of messages across clusters, very low publish and end-to-end latency, seamless scalability to over a million topics, and guaranteed message delivery with persistent message storage provided by Apache BookKeeper. Nowadays, Pulsar has been adopted by more and more companies.

Proposal

We propose a throughout Pulsar connector implementation in Flink.

Scope

We’ll design and develop a connector with the following features:

  • Pulsar as a streaming source with exactly-once guarantee.
  • Sink streaming results to Pulsar with exactly-once semantics. 
  • Build upon Flink New TableSource and TableSink interfaces (FLIP-95), and metadata (FLIP-107).
  • Integrate with Flink new Catalog API (FLIP-30), which enables the use of Pulsar topics as tables in Table API as well as SQL client.
  • Integrate with Flink new Source API (FLIP-27).
  • Integrate with Flink new Sink API (FLIP-143).
  • Integrate with Flink upsert-pulsar (FLIP-149).

Overall Design

Source: The exactly-once source should implement CheckpointedFunction to persist reading status (MessageId in Pulsar which uniquely identifies a message in topic) to state store, and could notify Pulsar of messages finished consumption by implementing CheckpointListener.

Sink: The at-least-once sink should implement CheckpointedFunction to flush all messages written by sink tasks to Pulsar on snapshotState.

Table API type system: all datatypes used in connector are converting to/from the new type system.

  • Auto Deserialize: Each source task would talk to Pulsar for topic schema, and schema is used to auto deserialize Message in Pulsar to Row in Flink.
  • Auto Serialize: Each sink task would deduce its row schema, convert the schema to Pulsar schema and maybe upload (if the topic does not exist before sink) the schema. And serialize each row into message and send to Pulsar.

Table API / SourceSink Factory: enables to register Pulsar datastream as table in catalogs.

Catalog API: use Pulsar as a catalog, mapping Pulsar topics to Flink tables in Pulsar catalog.


Note: 

A partitioned topic in Pulsar is internally implemented as multiple topics sharing one same prefix:

persistent://public/deafult/topic-partition-idx  

Therefore, a topic partition is actually a topic in Pulsar and we could use it independently.

Source design

For each source task, a topic/partition discoverer thread is scheduled periodically to check newly added topics/partitions. All source tasks share the same logic on distributing partition among tasks. Therefore, each discoverer can identify whether it is responsible for one newly come partition and start a reader accordingly.


Pulsar is built upon the log abstraction: messages in each topic/partition are durably stored in order and can be uniquely identified by a message ID. It’s replayable by nature. Therefore, whenever a snapshot for a checkpoint is requested for a source task, the task checks all reader threads with its reading position and adds each (topic-name, message ID) pair to the state. When recovering from failure, the reader threads seek the snapshotted message ID and re-consume all messages after it.

Keep messages alive until a checkpoint is finished

By default, Pulsar brokers immediately delete all messages that have been acknowledged by a consumer. However, we cannot ack messages in reader thread immediately since the dataflow graph would fail, and we need to replay sources by message ID. In Flink, when a checkpoint is finished, it means all records that the source had emitted before the checkpoint went through the streaming dataflow and updated the corresponding operator states which are also snapshotted.

From then on, Flink doesn’t need to replay these messages anymore, and Pulsar can safely delete them to save space.

We use a durable cursor to keep un-checkpointed messages alive for each topic/partition. Whenever a checkpoint is finished, we move the cursor to checkpointed message IDs. As shown in the figure below, the durable cursor is moved forward once the checkpoint is completed.

Sink design

When you send a message to Pulsar using sendAsync, your message will be buffered in a pendingMessages queue, and you will get a CompletableFuturehandle. You can register a callback on the handle and get notified on completion. Another Pulsar producer API flush sends all messages buffered in the client directly and wait until all messages have been successfully persisted.

We use these two APIs in our Pulsar sink implementation to guarantee its at-least-once semantic. For each record we receive in the sink, we send it to Pulsar with sendAsync and maintain a count pendingRecords that has not been persistent. On each checkpoint, we call flush() manually and wait for message acknowledgments from Pulsar brokers. The checkpoint is considered complete when we get all acknowledgments and pendingRecordsdecreases to 0, and the checkpoint is regarded as a failure if an exception occurs while persisting messages. By default, a failing checkpoint in Flink causes an exception that results in an application restart; therefore, messages are guaranteed to be persisted at least once.

Schema and Auto SerDe

Each source task would fetch the schema through Pulsar Admin API independently, and share the schema with all reader threads that read data from Pulsar topics directly.

Since Pulsar topics have schema expressed in Pulsar Schema API, we convert Pulsar schema to Flink FieldsDataType. The conversion are of two folds:

  • Normal data fields:
    • Primitive SchemaType in Pulsar, such as BooleanSchema, LongSchema, are converted to a field name `value`.
    • Complex SchemaType in Pulsar is expressed as AvroSchema, it is converted to Flink datatypes accordingly.
  • Metadata fields:
    • Messages in Pulsar also contains many useful information besides the message payload, we expose "__topic", "__key", "__messageId", "__publishTime", "__eventTime" as fields in FieldsDataType  as well.

The following is the schema of a Pulsar topic with Schema.DOUBLE:

root

|-- value: DOUBLE

|-- __key: BYTES

|-- __topic: STRING

|-- __messageId: BYTES

|-- __publishTime: TIMESTAMP(3)

|-- __eventTime: TIMESTAMP(3)


The Pulsar topic of AVRO schema `s` converted to a Flink table has the following schema .

 case class Foo(@BeanProperty i: Int, @BeanProperty f: Float, @BeanProperty bar: Bar)

 case class Bar(@BeanProperty b: Boolean, @BeanProperty s: String)

 val s = Schema.AVRO(Foo.getClass)

root

 |-- i: INT

 |-- f: FLOAT

 |-- bar: ROW<`b` BOOLEAN, `s` STRING>

 |-- __key: BYTES

 |-- __topic: STRING

 |-- __messageId: BYTES

 |-- __publishTime: TIMESTAMP(3)

 |-- __eventTime: TIMESTAMP(3)

Each record’s serialize/deserialize is delegated to a type-specific serializer/deserializer that is created once during initialization procedure.

Catalog and SQL Client

According to the AbstractCatalog API, we could define a Pulsar instance as a catalog in Flink, and regard Pulsar topics as tables in the catalog. 

Pulsar `tenant/namespace` is converted to database in Flink, and Pulsar topic is mapped to Flink table.

Currently, Pulsar doesn’t contain partition, statistic, view, and functions. Therefore these parts are left unimplemented in Pulsar catalog.

Limitations

Currently, one limitation we are facing with is Pulsar table API doesn’t support internal columns

Therefore, when writing a single int row into topic with int schema, the table api would complain the sink stream doesn’t have identical schema with topic:  Since the sink stream have schema of Row(value int) and topic to be written to have schema of Row(value int, __key bytes, __messageId bytes …...). There should be a way to identify columns as internal columns / meta columns and avoid schema matching on these columns. 

API for the current implementation

Use Pulsar as a streaming source

val env = StreamExecutionEnvironment.getExecutionEnvironment

val props = new Properties()

props.setProperty("service.url", "pulsar://...")

props.setProperty("admin.url", "http://...")

props.setProperty("partitionDiscoveryIntervalMillis", "5000")

props.setProperty("startingOffsets", "earliest")

props.setProperty("topic", "test-source-topic")

val source = new FlinkPulsarSource(props)

// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable

val dataStream = env.addSource(source)(null)

// chain operations on dataStream of Row and sink the output

// end method chaining

env.execute()

Register topics in Pulsar as streaming tables

val env = StreamExecutionEnvironment.getExecutionEnvironment

val tEnv = StreamTableEnvironment.create(env)

val props = new Properties()

props.setProperty("service.url", "pulsar://...")

props.setProperty("admin.url", "http://...")

props.setProperty("partitionDiscoveryIntervalMillis", "5000")

props.setProperty("startingOffsets", "earliest")

props.setProperty("topic", "test-source-topic")

tEnv

  .connect(new Pulsar().properties(props))

  .inAppendMode()

  .registerTableSource("pulsar-test-table")

Source configurations (mandatory)

Option

Value

Description

`topic`

A topic name string

The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.

`topics`

A comma-separated list of topics

The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.

`topicsPattern`

A Java regex string

The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.

`service.url`

A service URL of your Pulsar cluster

The Pulsar `serviceUrl` configuration.

`admin.url`

A service HTTP URL of your Pulsar cluster

The Pulsar `serviceHttpUrl` configuration.

Source configurations (optional)

Option

Value

Default

Description

`startingOffsets`

The following are valid values:

  • "earliest"(streaming and batch queries)
  • "latest" (streaming query)
  • A JSON string

    Example

    """ {"topic-1":[8,11,16,101,24,1,32,1],"topic-5":[8,15,16,105,24,5,32,5]} """

"latest"

startingOffsets option controls where a consumer reads data from.

  • "earliest": lacks a valid offset, the consumer reads all the data in the partition, starting from the very beginning.
  • "latest": lacks a valid offset, the consumer reads from the newest records written after the consumer starts running.
  • A JSON string: specifies a starting offset for each Topic.
    You can use org.apache.flink.pulsar.JsonUtils.topicOffsets(Map[String, MessageId]) to convert a message offset to a JSON string. 

Note: 

  • "latest" only applies when a new query is started, and the resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at "earliest".

`partitionDiscoveryIntervalMillis`

A long value or a string which can be converted to long

-1

partitionDiscoveryIntervalMillis option controls whether the source discovers newly added topics or partitions match the topic options while executing the streaming job. A positive long l would trigger the discoverer run every l milliseconds, and negative values would turn off a topic or a partition discoverer. 

 

Create a Pulsar sink for streaming queries

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = .....

val prop = new Properties()

prop.setProperty("service.url", serviceUrl)

prop.setProperty("admin.url", adminUrl)

prop.setProperty("flushOnCheckpoint", "true")

prop.setProperty("failOnWrite", "true")

props.setProperty("topic", "test-sink-topic")

stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))

env.execute()

Write a streaming table to Pulsar

val env = StreamExecutionEnvironment.getExecutionEnvironment

val tEnv = StreamTableEnvironment.create(env)

val prop = new Properties()

prop.setProperty("service.url", serviceUrl)

prop.setProperty("admin.url", adminUrl)

prop.setProperty("flushOnCheckpoint", "true")

prop.setProperty("failOnWrite", "true")

props.setProperty("topic", "test-sink-topic")

tEnv

  .connect(new Pulsar().properties(props))

  .inAppendMode()

  .registerTableSource("sink-table")

val sql = "INSERT INTO sink-table ....."

tEnv.sqlUpdate(sql)

env.execute()

Sink configurations (mandatory)

Option

Value

Description

`service.url`

A service URL of your Pulsar cluster

The Pulsar `serviceUrl` configuration.

`admin.url`

A service HTTP URL of your Pulsar cluster

The Pulsar `serviceHttpUrl` configuration.

Sink configurations (optional)

Option

Value

Default

Description

`topic`

A topic name string

None

The topic to be write to. If this option is not set, DataStreams or tables write to Pulsar must contain a TopicKeyExtractor that return nonNull topics or `__topic` field.

`flushOnCheckpoint`

Whether flush all records write until checkpoint and wait for confirms.

true

At-least-once semantic is achieved when flushOnCheckpoint is set to true and checkpoint is enabled on execution environment. Otherwise, you get no write guarantee.

`failOnWrite`

Whether fail the sink while sending records to Pulsar fail.

false

None

Use Pulsar instance as a catalog

In YAML file:

catalogs:

- name: pulsarcatalog

    type: pulsar

    default-database: tn/ns

    service.url: "pulsar://localhost:6650"

    admin.url: "http://localhost:8080"

Usage in Pulsar SQL Client

Flink SQL> USE CATALOG pulsarcatalog;

Flink SQL> USE `public/default`;

Flink SQL> select * from topic0;

Document

https://docs.google.com/document/d/1rES79eKhkJxrRfQp1b3u8LB2aPaq-6JaDHDPJIA8kMY/edit#heading=h.28v5v23yeq1u