Status
Current state: Under Discussion
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
...
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Background and Motivation
As we have proposed in dev-list. We have developed a Pulsar Flink Connector based on Flink 1.9.0 and Pulsar 2.4.0, and wanted to contribute it back to Flink.
A brief introduction to Apache Pulsar
Apache Pulsar is a multi-tenant, high-performance distributed pub-sub messaging system. Pulsar includes multiple features such as native support for multiple clusters in a Pulsar instance, with seamless geo-replication of messages across clusters, very low publish and end-to-end latency, seamless scalability to over a million topics, and guaranteed message delivery with persistent message storage provided by Apache BookKeeper. Nowadays, Pulsar has been adopted by more and more companies.
Proposal
We propose a throughout Pulsar connector implementation in Flink.
Scope
We’ll design and develop a connector with the following features:
- Pulsar as a streaming source with exactly-once guarantee.
- Sink streaming results to Pulsar with atwith exactly-least-once semantics.
- Build upon Flink new Table API Type systemNew TableSource and TableSink interfaces (FLIP-3795), and can automatically (de)serialize messages with the help of Pulsar schemametadata (FLIP-107).
- Integrate with Flink new Catalog API (FLIP-30), which enables the use of Pulsar topics as tables in Table API as well as SQL client.
- Integrate with Flink new Source API (FLIP-27).
- Integrate with Flink new Sink API (FLIP-143).
- Integrate with Flink upsert-pulsar (FLIP-149).
Overall Design
Source: The exactly-once source should implement CheckpointedFunction to persist reading status (MessageId in Pulsar which uniquely identifies a message in topic) to state store, and could notify Pulsar of messages finished consumption by implementing CheckpointListener.
...
Note:
A partitioned topic in Pulsar is internally implemented as multiple topics sharing one same prefix:
persistent://public/deafult/topic-partition-idx
Therefore, a topic partition is actually a topic in Pulsar and we could use it independently.
Source design
For each source task, a topic/partition discoverer thread is scheduled periodically to check newly added topics/partitions. All source tasks share the same logic on distributing partition among tasks. Therefore, each discoverer can identify whether it is responsible for one newly come partition and start a reader accordingly.
Pulsar is built upon the log abstraction: messages in each topic/partition are durably stored in order and can be uniquely identified by a message ID. It’s replayable by nature. Therefore, whenever a snapshot for a checkpoint is requested for a source task, the task checks all reader threads with its reading position and adds each (topic-name, message ID) pair to the state. When recovering from failure, the reader threads seek the snapshotted message ID and re-consume all messages after it.
Keep messages alive until a checkpoint is finished
By default, Pulsar brokers immediately delete all messages that have been acknowledged by a consumer. However, we cannot ack messages in reader thread immediately since the dataflow graph would fail, and we need to replay sources by message ID. In Flink, when a checkpoint is finished, it means all records that the source had emitted before the checkpoint went through the streaming dataflow and updated the corresponding operator states which are also snapshotted.
...
We use a durable cursor to keep un-checkpointed messages alive for each topic/partition. Whenever a checkpoint is finished, we move the cursor to checkpointed message IDs. As shown in the figure below, the durable cursor is moved forward once the checkpoint is completed.
Sink design
When you send a message to Pulsar using sendAsync, your message will be buffered in a pendingMessages queue, and you will get a CompletableFuturehandle. You can register a callback on the handle and get notified on completion. Another Pulsar producer API flush sends all messages buffered in the client directly and wait until all messages have been successfully persisted.
We use these two APIs in our Pulsar sink implementation to guarantee its at-least-once semantic. For each record we receive in the sink, we send it to Pulsar with sendAsync and maintain a count pendingRecords that has not been persistent. On each checkpoint, we call flush() manually and wait for message acknowledgments from Pulsar brokers. The checkpoint is considered complete when we get all acknowledgments and pendingRecordsdecreases to 0, and the checkpoint is regarded as a failure if an exception occurs while persisting messages. By default, a failing checkpoint in Flink causes an exception that results in an application restart; therefore, messages are guaranteed to be persisted at least once.
Schema and Auto SerDe
Each source task would fetch the schema through Pulsar Admin API independently, and share the schema with all reader threads that read data from Pulsar topics directly.
...
Each record’s serialize/deserialize is delegated to a type-specific serializer/deserializer that is created once during initialization procedure.
Catalog and SQL Client
According to the AbstractCatalog API, we could define a Pulsar instance as a catalog in Flink, and regard Pulsar topics as tables in the catalog.
...
Currently, Pulsar doesn’t contain partition, statistic, view, and functions. Therefore these parts are left unimplemented in Pulsar catalog.
Limitations
Currently, one limitation we are facing with is Pulsar table API doesn’t support internal columns.
Therefore, when writing a single int row into topic with int schema, the table api would complain the sink stream doesn’t have identical schema with topic: Since the sink stream have schema of Row(value int) and topic to be written to have schema of Row(value int, __key bytes, __messageId bytes …...). There should be a way to identify columns as internal columns / meta columns and avoid schema matching on these columns.
API for the current implementation
Use Pulsar as a streaming source
val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("service.url", "pulsar://...") props.setProperty("admin.url", "http://...") props.setProperty("partitionDiscoveryIntervalMillis", "5000") props.setProperty("startingOffsets", "earliest") props.setProperty("topic", "test-source-topic") val source = new FlinkPulsarSource(props) // you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable val dataStream = env.addSource(source)(null) // chain operations on dataStream of Row and sink the output // end method chaining env.execute() |
---|
Register topics in Pulsar as streaming tables
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val props = new Properties() props.setProperty("service.url", "pulsar://...") props.setProperty("admin.url", "http://...") props.setProperty("partitionDiscoveryIntervalMillis", "5000") props.setProperty("startingOffsets", "earliest") props.setProperty("topic", "test-source-topic") tEnv .connect(new Pulsar().properties(props)) .inAppendMode() .registerTableSource("pulsar-test-table") |
---|
Source configurations (mandatory)
Option | Value | Description |
`topic` | A topic name string | The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`topics` | A comma-separated list of topics | The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`topicsPattern` | A Java regex string | The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source. |
`service.url` | A service URL of your Pulsar cluster | The Pulsar `serviceUrl` configuration. |
`admin.url` | A service HTTP URL of your Pulsar cluster | The Pulsar `serviceHttpUrl` configuration. |
Source configurations (optional)
Option | Value | Default | Description |
`startingOffsets` | The following are valid values:
| "latest" | startingOffsets option controls where a consumer reads data from.
Note:
|
`partitionDiscoveryIntervalMillis` | A long value or a string which can be converted to long | -1 | partitionDiscoveryIntervalMillis option controls whether the source discovers newly added topics or partitions match the topic options while executing the streaming job. A positive long l would trigger the discoverer run every l milliseconds, and negative values would turn off a topic or a partition discoverer. |
Create a Pulsar sink for streaming queries
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = ..... val prop = new Properties() prop.setProperty("service.url", serviceUrl) prop.setProperty("admin.url", adminUrl) prop.setProperty("flushOnCheckpoint", "true") prop.setProperty("failOnWrite", "true") props.setProperty("topic", "test-sink-topic") stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor)) env.execute() |
---|
Write a streaming table to Pulsar
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val prop = new Properties() prop.setProperty("service.url", serviceUrl) prop.setProperty("admin.url", adminUrl) prop.setProperty("flushOnCheckpoint", "true") prop.setProperty("failOnWrite", "true") props.setProperty("topic", "test-sink-topic") tEnv .connect(new Pulsar().properties(props)) .inAppendMode() .registerTableSource("sink-table") val sql = "INSERT INTO sink-table ....." tEnv.sqlUpdate(sql) env.execute() |
---|
Sink configurations (mandatory)
Option | Value | Description |
`service.url` | A service URL of your Pulsar cluster | The Pulsar `serviceUrl` configuration. |
`admin.url` | A service HTTP URL of your Pulsar cluster | The Pulsar `serviceHttpUrl` configuration. |
Sink configurations (optional)
Option | Value | Default | Description |
`topic` | A topic name string | None | The topic to be write to. If this option is not set, DataStreams or tables write to Pulsar must contain a TopicKeyExtractor that return nonNull topics or `__topic` field. |
`flushOnCheckpoint` | Whether flush all records write until checkpoint and wait for confirms. | true | At-least-once semantic is achieved when flushOnCheckpoint is set to true and checkpoint is enabled on execution environment. Otherwise, you get no write guarantee. |
`failOnWrite` | Whether fail the sink while sending records to Pulsar fail. | false | None |
Use Pulsar instance as a catalog
In YAML file:
catalogs: - name: pulsarcatalog type: pulsar default-database: tn/ns service.url: "pulsar://localhost:6650" admin.url: "http://localhost:8080" |
---|
...
Flink SQL> USE CATALOG pulsarcatalog; Flink SQL> USE `public/default`; Flink SQL> select * from topic0; |
---|