Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Released: TBD

...


Release


Motivation

Apache Thrift (along with protocol-buf ) is has been widely adopted as a de facto standard of high throughput network traffic protocol. Historically, Companies like Pinterest have been utilizing thrift to encode strongly typed Kafka messages, and persist to object storage as sequence files in the data warehouse. 

Major benefits of this approach were that  

  • Versioned thrift schema files served as a schema registry where producers and consumers across languages could encode/decode with the latest schema.
  • Minimize overhead of maintaining translation ETL jobs which flatten schema or adding additional accessory fields during ingestion
  • Lower storage footprint

Other than missing out optimization comes with storage format conversion, running jobs against unsupported thrift format also poses a challenge of maintenance and upgrades flink jobs given 

  • lack of backward-compatible thrift encoding/decoding support in Flink
  • lack of inference Table schema DDL support 

Proposed Changes

In a private forked repo, we made a few changes to support large scale Thrift formatted Flink DataStream and Flink SQL jobs in production. Following diagram 1 shows changes to support thrift format in Flink DataStream/SQL jobs. While we would like to discuss each area of work in a good amount of detail, we are also aware that changes such as inference DDL support or schema upgrade would require more discussions in a longer time frame.

Image Removed

Diagram 1. Project dependencies graph of support thrift format in Flink. Currently, the open source flink format doesn’t support thrift format serialization and deserialization.

Full Serialization/Deserialization

. Benefits of choosing thrift format are simplified schema management in code  and lower infra storage cost. 

Flink has no built-in support when users leverage thrift to encoding events they need to process. In this proposal, we aim to add this support to Flink.

Proposed Changes Summary

Although Thrift support can be a broad topic, we focus on thrift format encoding/decoding can be natively supported in Flink APIs (DataStream and Table as well as PyFlink)

DataStream Thrift Serialization/Deserialization

ThriftSerializationSchema and ThriftDeserializationSchema could be added Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.

...

should generate thrift java stub and add thrift schema jar dependencies.

Implementation wise should be straightforward in a few lines. 

publicclass ThriftSerializationSchema<TextendsTBase> implements SerializationSchema<T> {

 publicbyte[] serialize(T element) {

       byte[] message = null;

       TSerializer serializer = new TSerializer();

      try{

          message =

...

serializer.serialize(element);

...

      } catch(...){

         //mark fail to serialize metric

      }finally{

       return message;

     }

   }

}

setDeserializer(new ThriftDeSerializationSchema(thrift_class))


With datastream API, emitted TBase objects could be serialized/deserialized via kryo (unfortunately)


DataStream<? extends TBase> stream = env.addSource(...);

Table API Thrift Serialization/Deserialization

Unlike DataStream API, Table API / SQL requires converting byte arrays from and to Row/RowData. Certain mapping rules are required to ensure correctness and backward compatibility of such conversion

Overtime, we observed from time to time, corrupted thrift binary payload from both kafka topic as well as restored state, causing jobs to keep restarting until engineers step in and redeploy with newer kafka timestamp. To minimize production job disruption, we introduced a corrupt tolerant way to encode/decode thrift payload. 

...

Wiki Markup
public class PinterestTBaseSerializer<T extends TBase> extends Serializer<T> {
 
 private static final Logger LOG = LoggerFactory.getLogger(PinterestTBaseSerializer.class);
 
 private final TSerializer serializer  = new TSerializer();
 private final TDeserializer deserializer  = new TDeserializer();
 
 public void write(Kryo kryo, Output output, T tBase) {
   try {
     byte[] serThrift = serializer.serialize(tBase);
     output.writeInt(serThrift.length, true);
     output.writeBytes(serThrift);
   } catch (Exception t) {
     LOG.error("Failed to write due to unexpected exception", t);
   }
 }
 
 public T read(Kryo kryo, Input input, Class<T> tBaseClass) {
   try {
     T prototype = tBaseClass.newInstance();
     int tSize = input.readInt(true);
     byte[] barr = new byte[tSize];
     input.readBytes(barr);
     deserializer.deserialize(prototype, barr);
     return prototype;
   } catch (Throwable t) {
     LOG.error("Failed to read due to unexpected Throwable", t);
     return null;
   }
 }
}

env.addDefaultKryoSerializer(classOf[Event], classOf[PinterestTBaseSerializer[Event]])

Later, Teja Thotapalli and Lu Niu found the corruption was actually caused by the default checkpoint local directory pointing to EBS instead of the NVME drive in AWS, Jun Qin from Ververica has a great post that details root-causing steps. The Impact of Disks on RocksDB State Backend in Flink: A Case Study

Hive MetaStore thrift dependencies shading

We noticed HMS has introduced a specific thrift library version that is not compatible with our internal flink version. This led to connectivity issues in our Flink SQL jobs. In order to avoid other users hitting the same dependency issues. 

We propose shading  libthrift and fb303 in the flink hive connector, and move those two packages to the project root level maven config; users could place their own customized libthrift jar as well as thrift schema jar into /lib folder during release.

TBase & Row Type Mapping

In order to support FlinkSQL workload reading from the Kafka topic, we proposed following data type mapping from Thrift Type System to Flink Row type system. We are in favor of debugging and user readability as we map enum to string type.

...

 publicTType() {

 }

}

[1] Thrift Type system

TBase Field to Row Field Index Matching

Handling tbase payload and converting to Row type, Flinks needs to have deterministic mapping from each field in thrift payload to specific row index. We propose using ASC sorted thrift_id indexes (marked in brown below) when we map from tbase to row and vice versa. If a field is UNSET, in order to avoid UNSET fields causing mismatches, we use NULL as a placeholder.

...

Note, from runtime performance consideration, we propose having a type to sort field information cache.

Row Field to TBase Field index matching

We propose a reverse approach above during row-to thrift payload conversion. If row arity is smaller than tbase schema fields, we propose adding null as placeholder.

...

Xtruct3 

   string_thing = “boo”

   changed = 0

   i32_thing = unset

   i64_thing = -1

Row

<”boo”,0,null, -1, null>

Handling Nested Field in Row

It’s common to have thrift schemas highly nested and keep growing as more product features are added.In order to properly decode/encode thrift payload, we propose a recursive converter from nested thrift struct to Flink Row field. Based on each sub struct class schema type mapping along with field index mapping, this recursive converter could handle very sophisticated nested fields (more than 7 levels deep, 24k characters of schema string) thrift schema

...

nested field shall we convert to a row of fields sorted by thrift id.

Hive MetaStore dependencies conflict

Hive meta store libraries introduced a specific thrift library version that is not compatible with our internal flink version. This led to our Flink SQL jobs version conflict. We suggest user upgrade hive metastore version to match thrift version given keeping two thrift version leads to hive connector dependencies complicated to manage.

Thrift format table DDL script

Thrift format Table (Kafka or FileSystem) schemas were inference based on thrift schema file definition. As users update thrift schema files and release new schema, thrift format table definitions need to be updated.

Unfortunately, Flink has no built-in support for automated schema inference. We plan to offer a script to generate thrift schema fields from the thrift skeleton.

Users could modify and automate table creation and updates on their own.

flink_thrift_ddl_gen.py dummy.thrift -o dummy_ddl.sql


Public Interfaces

DataStream API

Including flink-thrift in flink-formats projects including three files to support datastream api encoding/decoding storing state with thrift format

ThriftDeserializationSchema

ThriftSerializationSchema


In the flink-thrift  pom.xml file, include thrift release defined in flink-parent <properties> 

flink-thrift  pom.xml

<dependency>

  <groupId>org.apache.thrift</groupId>

  <artifactId>libthrift</artifactId>

  <version>${thrift.version}</version>

 </dependency>

flink-parent pom.xml

<thrift.version>0.17</thrift.version>

KafkaSource<Event> source =
KafkaSource.<Event>builder()

...

 

...

..

...

.

...

.

...

setDeserializer(

...


                       new ThriftDeserializationSchema<>(Event.class))

...

.

...

.

...

.

...

 

...

 

...

.

...

build()

...

;

...

Metrics

We propose to introduce the following Gauge metrics:

  1. numFailedDeserializeThriftRecord: The number of messages that failed to deserialize to thrift class payload
  2. numFailedSerializeThriftRecord: The number of messages that failed to serialize to thrift binary payload

Table API

We propose adding ThriftRowFormatFactory,ThriftToRowDataConverter and RowDataToThriftConverter

ThriftRowFormatFactory

ThriftToRowConverter

RowToThriftConverter

*/

@Internal

public class ThriftValidator extends FormatDescriptorValidator {

  public static final String FORMAT_TYPE_VALUE = "thrift";

  public static final String FORMAT_THRIFT_CLASS = "format.thrift-struct";

}

protectedfinalDecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;

Table DDL

We propose adding format.type thrift and thrift-struct ub table DDL

CREATETABLEevent_base_kafka_prod(

   `_time` BIGINT,

   `eventType` STRING,

   `userId` BIGINT,

   ...

   rowTime

Hive SequenceFile Table [WIP]

We propose implementing a thrift file system format factory via extending SequenceFileReader over a thrift-encoded payload. Our read path implementation follows the batch version of merced system Scalable and reliable data ingestion at Pinterest

We propose supporting SERDE and SERDEPROPERTIES in hive connector, user could customize 

...

https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/hive/hive_dialect.html

CREATE EXTERNAL TABLE xxx BY (dt string, hr string)
ROW FORMAT SERDE 'xxx.SafeStringEnumThriftSerDe'
   WITH SERDEPROPERTIES(
 "thrift_struct" = 'xxx.schemas.event.Event'
) STORED AS SEQUENCEFILE LOCATION '...';

Parietal Thrift Deserialization 

Along with projection push-downs in kafka source and filesource, Parietal Thrift Deserialization allows Flink to skip thrift fields not used in query statements. We propose an additional change of skip construct tbase instance and instead use tbase field to row field indexing mapping directly write to each row field in one pass.

We propose implementing  SupportsProjectionPushDown to KafkaDynamicSource so that partialthriftdeseriazationschema constructor limits the list of fields a kafka source instance needs to load and deserialize to row fields. 

Image Removed

Partial Deserializer Credit to Bhalchandra Pandit

[3] Improving data processing efficiency using partial deserialization of Thrift | by Pinterest Engineering

Table/View Inference DDL [WIP]

Thrift format Table (Kafka or FileSystem) schemas were inference based on thrift schema file definition. As users update thrift schema files and build new schema jars, those table/view definitions should evolve on the fly. There are several options to consider 

Option 1: keep full schema mapping in hive metastore as flink properties

This is a less-change approach where we don’t need to run inference schema but keep a helper function or manual updater to alter tables with thrift class property definitions. 

The disadvantage of this approach: the user needs to run a script periodically and manage schema upgrade and alert thrift format table DDL from time to time. The following example shows how option 1 DDL looks like

CREATE TABLE event_base(
`_time` BIGINT,
`eventType` STRING,
`userId` BIGINT,
`objectId` BIGINT,
rowTime ASCAST(from_unixtime(_time/1000000000) as timestamp),

   WATERMARKFOR rowTime AS rowTime - INTERVAL '60'SECOND

)) WITH (

   'connector.type'='kafka',

   'connector.version'='universal',

   'format.type'='thrift',

   'format.thrift-classstruct'='..xxx.schemas.event.Event','update-mode' = 'append'
);

Option2: hide thrift fields not store in Hive metastore, only keeps computed fields that can’t infer from thrift schema

This approach splits schema into inference in the dynamically generated section (e.g thrift field) and computed section. (e.g watermark). The Pro of this section is that the user doesn’t bother to write or update thrift schema fields during a schema upgrade. Each job should have the same view of how the table schema looks based on how the thrift schema jar gets loaded. 

The disadvantage of this approach: splits table schema into two, and needs to discuss how to support inference schema support from the catalog table level. (TBD, might be worth another FLIP)

The following example shows what option 2 DDL looks like, HiveCatalog database only stores fields not available in the thrift schema. 

CREATE TABLE event_base(
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
)) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-class'='....schemas.event.Event',
'update-mode' = 'append'
);

   ...

)

Metrics

We propose to introduce the following Gauge metrics:

  1. numFailedConvertRowDataRecord: The number of messages failed to convert to specific rowdata
  2. numFailedConvertThriftRecord: The number of rows failed to convert to thrift binary payload

Schema Compatibility 

Our proposal leverage thrift protocol handles adding new fields without changing thrift id sequence scenarios backward and forward compatibility.  One caveat is when the thrift enum adding new value not defined in the old thrift schema will break the job running with the old thrift schema.

  • data encode with old thrift schema, decode with new thrift schema
  • data encode with new thrift schema, decode with old thrift schema

For table api implementation, we propose changes to match data stream api full compatibility behavior. However, when the user schema introduces breaking changes, we propose showing metrics of failed messages.

Compatibility, Deprecation, and Migration Plan Test Plan

Thrift support is a less intrusive change, the user would opt-in with a set of configurations.  We plan to follow best practices by adding unit tests and integration tests.Table/View inference DDL might apply to other catalog table creations and fetching as well. 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

References

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-11333