Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: remove type inference and file systems section

...

  • adding Flink-thrift in flink-format subproject including handling both data stream api and table api encoding/decoding as well as table ddl 
  • implement SupportsProjectionPushDown in KafkaDynamicSource, reconstruct valueDecodingFormat with partial deserialization
  • support SERDE and SERDEPROPERTIES in hive connector (WIP)
  • support catalog table / view support of schema inference and compatibility guarantee (WIP)

In a private forked repo, we made a few changes to support large scale Thrift formatted Flink DataStream and Flink SQL jobs in production. Following diagram 1 shows changes to support thrift format in Flink DataStream/SQL jobs. While we would like to discuss each area of work in a good amount of detail, we are also aware that changes such as inference DDL support or schema upgrade would require more discussions in a longer time frame.

Image Removed

Diagram 1. Project dependencies graph of support thrift format in Flink. Currently, the open source flink format doesn’t support thrift format serialization and deserialization.

Full Serialization/Deserialization

Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.

...

Full Serialization/Deserialization

Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.


publicclassThriftSerializationSchema<TextendsTBase> implementsSerializationSchema<T> {

   privatestaticfinalLoggerLOG=LoggerFactory.getLogger(ThriftSerializationSchema.class);


   publicThriftSerializationSchema(Class<T> recordClazz) {


   }


   publicbyte

...

   }

   publicbyte[] serialize(T element) {

       byte[] message=null;

...

env.addDefaultKryoSerializer(classOf[Event], classOf[PinterestTBaseSerializer[Event]])

Later, Teja Thotapalli and Lu Niu found the corruption was actually caused by the default checkpoint local directory pointing to EBS instead of the NVME drive in AWS, Jun Qin from Ververica has a great post that details root-causing steps. The Impact of Disks on RocksDB State Backend in Flink: A Case Study

Hive MetaStore thrift dependencies shading

Hive MetaStore thrift dependencies shading

We noticed HMS has introduced a specific thrift library version that is not compatible with our internal flink version. This led to connectivity issues in our Flink SQL jobs. In order to avoid other users hitting the same dependency issues.We noticed HMS has introduced a specific thrift library version that is not compatible with our internal flink version. This led to connectivity issues in our Flink SQL jobs. In order to avoid other users hitting the same dependency issues. 

We propose shading  libthrift and fb303 in the flink hive connector, and move those two packages to the project root level maven config; users could place their own customized libthrift jar as well as thrift schema jar into /lib folder during release.

...

       i++;

   }

 return result;

}

Hive SequenceFile Table [WIP]

We propose implementing a thrift file system format factory via extending SequenceFileReader over a thrift-encoded payload. Our read path implementation follows the batch version of merced system Scalable and reliable data ingestion at Pinterest

We propose supporting SERDE and SERDEPROPERTIES in hive connector, user could customize 

...

https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/hive/hive_dialect.html

...

   WITH SERDEPROPERTIES(

...


Table/View DDL

Thrift format Table (Kafka or FileSystem) schemas were inference based on thrift schema file definition. As users update thrift schema files and build new schema jars, those table/view definitions should evolve on the fly.

Meanwhile, it might makes more sense to separate those works to a separate FLIP proposal. In this FLIP, We  propose having a helper script tool generate/regenerate thrift schema inferences fields from thrift skeleton.

Users could modify and automate table creation and updates on their own.

flink_thrift_ddl.py dummy.thrift -o dummy_ddl.sql

) STORED AS SEQUENCEFILE LOCATION '...';

Parietal Thrift Deserialization 

Along with projection push-downs in kafka source and filesource, Parietal Thrift Deserialization allows Flink to skip thrift fields not used in query statements. We propose an additional change of skip construct tbase instance and instead use tbase field to row field indexing mapping directly write to each row field in one pass.

We propose implementing  SupportsProjectionPushDown to KafkaDynamicSource so that partialthriftdeseriazationschema constructor limits the list of fields a kafka source instance needs to load and deserialize to row fields. 

Image Removed

Partial Deserializer Credit to Bhalchandra Pandit

[3] Improving data processing efficiency using partial deserialization of Thrift | by Pinterest Engineering

Table/View Inference DDL [WIP]

Thrift format Table (Kafka or FileSystem) schemas were inference based on thrift schema file definition. As users update thrift schema files and build new schema jars, those table/view definitions should evolve on the fly. There are several options to consider 

Option 1: keep full schema mapping in hive metastore as flink properties

This is a less-change approach where we don’t need to run inference schema but keep a helper function or manual updater to alter tables with thrift class property definitions. 

The disadvantage of this approach: the user needs to run a script periodically and manage schema upgrade and alert thrift format table DDL from time to time. The following example shows how option 1 DDL looks like

CREATE TABLE event_base(
`_time` BIGINT,
`eventType` STRING,
`userId` BIGINT,
`objectId` BIGINT,
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
)) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-class'='...schemas.event.Event',
'update-mode' = 'append'
);

Option2: hide thrift fields not store in Hive metastore, only keeps computed fields that can’t infer from thrift schema

This approach splits schema into inference in the dynamically generated section (e.g thrift field) and computed section. (e.g watermark). The Pro of this section is that the user doesn’t bother to write or update thrift schema fields during a schema upgrade. Each job should have the same view of how the table schema looks based on how the thrift schema jar gets loaded. 

The disadvantage of this approach: splits table schema into two, and needs to discuss how to support inference schema support from the catalog table level. (TBD, might be worth another FLIP)

The following example shows what option 2 DDL looks like, HiveCatalog database only stores fields not available in the thrift schema. 

CREATE TABLE event_base(
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
)) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-class'='....schemas.event.Event',
'update-mode' = 'append'
);

Public Interfaces

DataStream API

...