Status
Current state: Under Discussion
Discussion thread:
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Background and Motivation
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class PinotSinkFunction<T> extends RichSinkFunction<T> { public void open(Configuration parameters) throws Exception { SegmentWriter writer = new ... } public void invoke(T value, Context context) throws Exception { GenericRow row = pinotRowConverter.concert(value); writer.write(row) ; if(checkThreshod()) { writer.flush(); } } public void close() throws Exception { writer.flush(); writer.close(); } } |
Connector Options
Option | Required | Default | Type | Description |
connector | Y | none | string | The connector to use, here shall be ‘pinot’ |
table-name | Y | none | string | name of the pinot table |
url | Y | none | string | URL of the Pinot controller |
sink.buffer-flush.max-size | N | 5mb | string | maximum size in memory of buffered rows for creating a segment. |
sink.buffer-flush.max-rows | N | 1000 | int | maximum number of rows to buffer for each segment creation |
sink.parallelism | N | none | int | Defines the parallelism of the Pinot sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
segment-name.type | N | simple | string | the type of name generator to use. Following values are supported -
|
N | none | string | For fixed SegmentNameGenerator. Explicitly set the segment name. | |
segment.name.postfix | N | none | string | For simple SegmentNameGenerator. Postfix will be appended to all the segment names. |
segment.name.prefix | N | none | string | For normalizedDate SegmentNameGenerator. The Prefix will be prepended to all the segment names. |
Schema and Data Type Mapping
...
Note by default, Pinot transforms null values coming from the data source to a default value determined by the type of the corresponding column (or as specified in the schema), per the Pinot guide.
Flink SQL type | Pinot type | Default value for null |
TINYINT | Integer | 0 |
SMALLINT | Integer | 0 |
INT | Integer | 0 |
BIGINT | Long | 0 |
DECIMAL | Not supported | Not supported |
FLOAT | Float | 0.0 |
BOOLEAN | Integer | 0 |
DATE | Stores the number of days since epoch as an Integer value | 0 |
TIME | Stores the milliseconds since epoch as Long value. | 0 |
Timestamp | Stores the milliseconds since epoch as Long value. | 0 |
STRING | String | “null” |
BYTES | Bytes | byte[0] |
ARRAY | Array | default value of array type |
Rejected Alternatives
An alternative to the Pinot sink could be to use Hive as a data source and Kafka batch as a sink, and then have Pinot ingests from the Kafka topic. However, this does not work for the following reasons:
- The segments are managed differently in Pinot’s realtime table and offline table. The realtime segments are grouped using Kafka offsets, whereas the offline segments are split based on the time range. As a result, The realtime segments cannot be replaced if the job reruns.
- Also, it’s less efficient to use Kafka the intermediate storage for the batch processing, comparing to the direct segment creation and uploads.
Document
Widget Connector url https://docs.google.com/document/d/1GVoFHOHSDPs1MEDKEmKguKwWMqM1lwQKj2e64RAKDf8/edit#heading=h.uvocz0dwkepo