Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • lack of backward-compatible thrift encoding/decoding support in Flink
  • lack of inference Table schema DDL support 

Proposed Changes

Summary

OverallIn a private forked repo, we made a few changes to support large scale Thrift formatted Flink DataStream and Flink SQL jobs in production. Following diagram 1 shows changes to support thrift format in Flink DataStream/SQL jobs. While we would like to discuss each area of work in a good amount of detail, we are also aware that changes such as inference DDL support or schema upgrade would require more discussions in a longer time frame.

Image Removed

Diagram 1. Project dependencies graph of support thrift format in Flink. Currently, the open source flink format doesn’t support thrift format serialization and deserialization.

Full Serialization/Deserialization

Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.

...

propose changes touch follow areas 

  • adding Flink-thrift in flink-format subproject including handling both data stream api and table api encoding/decoding as well as table ddl 
  • implement SupportsProjectionPushDown in KafkaDynamicSource, reconstruct valueDecodingFormat with partial deserialization
  • support SERDE and SERDEPROPERTIES in hive connector (WIP)
  • support catalog table / view support of schema inference and compatibility guarantee (WIP)

In a private forked repo, we made a few changes to support large scale Thrift formatted Flink DataStream and Flink SQL jobs in production. Following diagram 1 shows changes to support thrift format in Flink DataStream/SQL jobs. While we would like to discuss each area of work in a good amount of detail, we are also aware that changes such as inference DDL support or schema upgrade would require more discussions in a longer time frame.

Image Added

Diagram 1. Project dependencies graph of support thrift format in Flink. Currently, the open source flink format doesn’t support thrift format serialization and deserialization.

Full Serialization/Deserialization

Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.


publicclassThriftSerializationSchema<TextendsTBase> implementsSerializationSchema<T> {

   privatestaticfinalLoggerLOG=LoggerFactory.getLogger(ThriftSerializationSchema.class);


   publicThriftSerializationSchema(Class<T> recordClazz) {


   }


   publicbyte[] serialize(T element) {

       byte[] message=null;

       

...

   }

   publicbyte[] serialize(T element) {       byte[] message=null;       TSerializerserializer=newTSerializer();

...

This is a less-change approach where we don’t need to run inference schema but keep a helper function or manual updater to alter tables with thrift class property definitions

The disadvantage of this approach: the user needs to run a script periodically and manage schema upgrade and alert thrift format table DDL from time to time. The following example shows how option 1 DDL looks like

The disadvantage of this approach: the user needs to run a script periodically and manage schema upgrade and alert thrift format table DDL from time to time. The following example shows how option 1 DDL looks like

CREATE TABLE event_base(
`_time` BIGINT,
`eventType` STRING,
`userId` BIGINT,
`objectId` BIGINT,
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
)) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-class'='...schemas.event.Event',
'update-mode' = 'append'
);


Option2: hide thrift fields not store in Hive metastore, only keeps computed fields that can’t infer from thrift schema

This approach splits schema into inference in the dynamically generated section (e.g thrift field) and computed section. (e.g watermark). The Pro of this section is that the user doesn’t bother to write or update thrift schema fields during a schema upgrade. Each job should have the same view of how the table schema looks based on how the thrift schema jar gets loaded. 

The disadvantage of this approach: splits table schema into two, and needs to discuss how to support inference schema support from the catalog table level. (TBD, might be worth another FLIP)

The following example shows what option 2 DDL looks like, HiveCatalog database only stores fields not available in the thrift schema. 

CREATE TABLE event_base(
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
)) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-class'='....schemas.event.Event',
'update-mode' = 'append'
);

Public Interfaces

DataStream API

Including flink-thrift in flink-formats projects including three files to support datastream api encoding/decoding storing state with thrift format

ThriftDeserializationSchema

ThriftSerializationSchema

ThriftPartialDeserializationSchema

ThriftSerializationSchema

PinterestTBaseSerializer

In the flink-thrift  pom.xml file, include thrift release defined in flink-parent <properties> 

flink-thrift  pom.xml

<dependency>

  <groupId>org.apache.thrift</groupId>

  <artifactId>libthrift</artifactId>

  <version>${thrift.version}</version>

 </dependency>

flink-parent pom.xml

<thrift.version>0.5.0-p6</thrift.version>

StreamExecutionEnvironmentenv=

// user can opt-in to skip corrupted state 

env.addDefaultKryoSerializer(Event.class, PinterestTBaseSerializer.class);


FlinkKafkaConsumer<Event> kafkaConsumer=

newFlinkKafkaConsumer<Event>(

   topicName,

   newThriftDeserializationSchema<>(Event.class),

   Configuration.configurationToProperties(kafkaConsumerConfig));

// user can opt-in to deserialize list of fields 

FlinkKafkaConsumer<Event> kafkaConsumer=

newFlinkKafkaConsumer<Event>(

   topicName,

   newThriftPartialDeserializationSchema<>(Event.class, <list of fields>),

   Configuration.configurationToProperties(kafkaConsumerConfig));


FlinkKafkaProducer<Event> sink=

   newFlinkKafkaProducer(topicName,

       newThriftSerializationSchema<Event>(topicName),

       Configuration.configurationToProperties(kafkaProducerConfig),

       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

Metrics

We propose to introduce the following Gauge metrics:

  1. numFailedDeserializeRecord: The number of messages that failed to deserialize to specific thrift payload
  2. numFailedSerializeRecord: The number of messages that failed to serialize to specific thrift payload
  3. numFailedKryoThriftRecord: The number of state records that failed to parse to specific thrift class if user explicit use PinterestTBaseSerializer

Table API

We propose adding ThriftRowFormatFactory,ThriftToRowConverter and RowToThriftConverter

ThriftRowFormatFactory

ThriftToRowConverter

RowToThriftConverter

*/

@Internal

public class ThriftValidator extends FormatDescriptorValidator {

  public static final String FORMAT_TYPE_VALUE = "thrift";

  public static final String FORMAT_THRIFT_CLASS = "format.thrift-struct";

}

Table DDL

We propose adding format.type thrift and thrift-struct ub table DDL

CREATETABLEevent_base_kafka_prod(

   `_time` BIGINT,

   `eventType` STRING,

   `userId` BIGINT,

   ...

   rowTime CREATE TABLE event_base(
`_time` BIGINT,
`eventType` STRING,
`userId` BIGINT,
`objectId` BIGINT,
rowTime ASCAST(from_unixtime(_time/1000000000) as timestamp),

   WATERMARKFOR rowTime AS rowTime - INTERVAL '60'SECOND

)) WITH (

   'connector.type'='kafka',

   'connector.version'='universal',

   'format.type'='thrift',

   'format.thrift-classstruct'='xxx...schemas.event.Event','update-mode' = 'append'
);

Option2: hide thrift fields not store in Hive metastore, only keeps computed fields that can’t infer from thrift schema

This approach splits schema into inference in the dynamically generated section (e.g thrift field) and computed section. (e.g watermark). The Pro of this section is that the user doesn’t bother to write or update thrift schema fields during a schema upgrade. Each job should have the same view of how the table schema looks based on how the thrift schema jar gets loaded. 

The disadvantage of this approach: splits table schema into two, and needs to discuss how to support inference schema support from the catalog table level. (TBD, might be worth another FLIP)

The following example shows what option 2 DDL looks like, HiveCatalog database only stores fields not available in the thrift schema. 

   ...

)

Metrics

We propose to introduce the following Gauge metrics:

  1. numFailedToRowDeserializeRecord: The number of messages that failed to deserialize to specific thrift payload
  2. numFailedToThriftSerializeRow: The number of messages that failed to serialize to specific thrift payload

Schema Compatibility 

We propose datastream api leverage thrift procol already able to handle adding new field scenario full compatibility 

  • data encode with old thrift schema, decode with new thrift schema
  • data encode with new thrift schema, decode with old thrift schema

For table api implementation, we propose changes to match data stream api full compatibility behavior. However, when the user schema introduces breaking changes, we propose showing metrics of failed messages.

KafkaDynamicSource

We propose kafka tablesource implements  SupportsProjectionPushDown

publicclassKafkaDynamicSource

       implementsScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown,

                  SupportsProjectionPushDown {

In implementation, we only reconstruct DeserializationSchema that takes list of fields

@Override

publicvoidapplyProjection(int[][] projectedFields, DataType producedDataType) {

   // check valueDecodingFormat is ThriftDeserializationSchema

   // if so reconstruct DeserializationSchema that does partial deserialize

}


protectedfinalDecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;

Table / View Inference DDL 

TBDCREATE TABLE event_base(
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
)) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-class'='....schemas.event.Event',
'update-mode' = 'append'
);




Compatibility, Deprecation, and Migration Plan Test Plan

...