Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread

JIRA

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Background and Motivation

...

Code Block
languagejava
linenumberstrue
public class PinotSinkFunction<T> extends RichSinkFunction<T> {
  public void open(Configuration parameters) throws Exception {
     SegmentWriter writer = new ...
  }

  public void invoke(T value, Context context) throws Exception {
     GenericRow row = pinotRowConverter.concert(value);
     writer.write(row) ;
     if(checkThreshod()) {
       writer.flush();
     }
  }

  public void close() throws Exception {
     writer.flush();
     writer.close();
  }
}



Connector Options


Option

Required

Default

Type

Description

connector

Y

none

string

The connector to use, here shall be ‘pinot’

table-name

Y

none

string

name of the pinot table

url

Y

none

string

URL of the Pinot controller

sink.buffer-flush.max-size

N

5mb

string

maximum size in memory of buffered rows for creating a segment. 

sink.buffer-flush.max-rows

N

1000

int

maximum number of rows to buffer for each segment creation

sink.parallelism

N

none

int

Defines the parallelism of the Pinot sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

segment-name.type

N

simple

string

the type of name generator to use. Following values are supported -

  • simple - this is the default spec.
  • normalizedDate - use this type when the time column in your data is in the String format instead of epoch time.
  • fixed - configure the segment name by the user.

segment-name.name

N

none

string

For fixed SegmentNameGenerator. Explicitly set the segment name.

segment.name.postfix

N

none

string

For simple SegmentNameGenerator. 

Postfix will be appended to all the segment names.

segment.name.prefix

N

none

string

For normalizedDate SegmentNameGenerator. 

The Prefix will be prepended to all the segment names.


Schema and Data Type Mapping

...

Note by default, Pinot transforms null values coming from the data source to a default value determined by the type of the corresponding column (or as specified in the schema), per the Pinot guide.


Flink SQL type

Pinot type

Default value for null

TINYINT

Integer

0

SMALLINT

Integer

0

INT

Integer

0

BIGINT

Long

0

DECIMAL

Not supported

Not supported

FLOAT

Float

0.0

BOOLEAN

Integer

0

DATE

Stores the number of days since epoch as an Integer value

0

TIME

Stores the milliseconds since epoch as Long value.

0

Timestamp

Stores the milliseconds since epoch as Long value.

0

STRING

String

“null”

BYTES

Bytes

byte[0]

ARRAY

Array

default value of array type

Rejected Alternatives

An alternative to the Pinot sink could be to use Hive as a data source and Kafka batch as a sink, and then have Pinot ingests from the Kafka topic. However, this does not work for the following reasons:

  • The segments are managed differently in Pinot’s realtime table and offline table. The realtime segments are grouped using Kafka offsets, whereas the offline segments are split based on the time range. As a result, The realtime segments cannot be replaced if the job reruns.
  • Also, it’s less efficient to use Kafka the intermediate storage for the batch processing, comparing to the direct segment creation and uploads.

Document

Widget Connector
urlhttps://docs.google.com/document/d/1GVoFHOHSDPs1MEDKEmKguKwWMqM1lwQKj2e64RAKDf8/edit#heading=h.uvocz0dwkepo