Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • lack of backward-compatible thrift encoding/decoding support in Flink
  • lack of inference Table schema DDL support 

Proposed Changes

Compatibility, Deprecation, and Migration Plan

Test Plan

In a private forked repo, we made a few changes to support large scale Thrift formatted Flink DataStream and Flink SQL jobs in production. Following diagram 1 shows changes to support thrift format in Flink DataStream/SQL jobs. While we would like to discuss each area of work in a good amount of detail, we are also aware that changes such as inference DDL support or schema upgrade would require more discussions in a longer time frame.

Image Added

Diagram 1. Project dependencies graph of support thrift format in Flink. Currently, the open source flink format doesn’t support thrift format serialization and deserialization.

Full Serialization/Deserialization

Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.

Wiki Markup
public class ThriftSerializationSchema<T extends TBase> implements SerializationSchema<T> {
 
   private static final Logger LOG = LoggerFactory.getLogger(ThriftSerializationSchema.class);
 
   public ThriftSerializationSchema(Class<T> recordClazz) {
 
   }
 
   public byte[] serialize(T element) {
       byte[] message = null;
       TSerializer serializer = new TSerializer();
       message = serializer.serialize(element);
       return message;
   }
}


public class ThriftDeserializationSchema<T extends TBase> implements DeserializationSchema {
 
   private Class<T> thriftClazz;
   private ThriftCodeGenerator codeGenerator;
 
   public ThriftDeserializationSchema(Class<T> recordClazz, ThriftCodeGenerator codeGenerator) {
       this.thriftClazz = recordClazz;
       this.codeGenerator = codeGenerator;
   }
 
   @Override
   public T deserialize(byte[] message) throws IOException {
       TDeserializer deserializer = new TDeserializer();
       T instance = null;
       instance = thriftClazz.newInstance();
       deserializer.deserialize(instance, message);
       return instance;
   }
 
   public boolean isEndOfStream(Object nextElement) {
       return false;
   }
 
   public TypeInformation<T> getProducedType() {
       return new ThriftTypeInfo<>(thriftClazz, codeGenerator);
   }
}

Overtime, we observed from time to time, corrupted thrift binary payload from both kafka topic as well as restored state, causing jobs to keep restarting until engineers step in and redeploy with newer kafka timestamp. To minimize production job disruption, we introduced a corrupt tolerant way to encode/decode thrift payload. 

PinterestTBaseSerializercredit to Yu Yang

Wiki Markup
public class PinterestTBaseSerializer<T extends TBase> extends Serializer<T> {
 
 private static final Logger LOG = LoggerFactory.getLogger(PinterestTBaseSerializer.class);
 
 private final TSerializer serializer  = new TSerializer();
 private final TDeserializer deserializer  = new TDeserializer();
 
 public void write(Kryo kryo, Output output, T tBase) {
   try {
     byte[] serThrift = serializer.serialize(tBase);
     output.writeInt(serThrift.length, true);
     output.writeBytes(serThrift);
   } catch (Exception t) {
     LOG.error("Failed to write due to unexpected exception", t);
   }
 }
 
 public T read(Kryo kryo, Input input, Class<T> tBaseClass) {
   try {
     T prototype = tBaseClass.newInstance();
     int tSize = input.readInt(true);
     byte[] barr = new byte[tSize];
     input.readBytes(barr);
     deserializer.deserialize(prototype, barr);
     return prototype;
   } catch (Throwable t) {
     LOG.error("Failed to read due to unexpected Throwable", t);
     return null;
   }
 }
}

env.addDefaultKryoSerializer(classOf[Event], classOf[PinterestTBaseSerializer[Event]])

Later, Teja Thotapalli and Lu Niu found the corruption was actually caused by the default checkpoint local directory pointing to EBS instead of the NVME drive in AWS, Jun Qin from Ververica has a great post that details root-causing steps. The Impact of Disks on RocksDB State Backend in Flink: A Case Study

Hive MetaStore thrift dependencies shading

We noticed HMS has introduced a specific thrift library version that is not compatible with our internal flink version. This led to connectivity issues in our Flink SQL jobs. In order to avoid other users hitting the same dependency issues. 

We propose shading  libthrift and fb303 in the flink hive connector, and move those two packages to the project root level maven config; users could place their own customized libthrift jar as well as thrift schema jar into /lib folder during release.

TBase & Row Type Mapping

In order to support FlinkSQL workload reading from the Kafka topic, we proposed following data type mapping from Thrift Type System to Flink Row type system. We are in favor of debugging and user readability as we map enum to string type.

bool

DataTypes.BOOLEAN()

byte

DataTypes.TINYINT()

i16

DataTypes.SMALLINT()

i32

DataTypes.INT()

i64

DataTypes.BIGINT()

double

DataTypes.DOUBLE()

string

DataTypes.STRING()

enum

DataTypes.STRING()

list

DataTypes.ARRAY()

set

DataTypes.ARRAY()

map

DataTypes.MAP()

struct

DataTypes.Row()

publicfinalclassTType {

 publicstaticfinalbyteSTOP=0;

 publicstaticfinalbyteVOID=1;

 publicstaticfinalbyteBOOL=2;

 publicstaticfinalbyteBYTE=3;

 publicstaticfinalbyteDOUBLE=4;

 publicstaticfinalbyteI16=6;

 publicstaticfinalbyteI32=8;

 publicstaticfinalbyteI64=10;

 publicstaticfinalbyteSTRING=11;

 publicstaticfinalbyteSTRUCT=12;

 publicstaticfinalbyteMAP=13;

 publicstaticfinalbyteSET=14;

 publicstaticfinalbyteLIST=15;

 publicstaticfinalbyteENUM=16;


 publicTType() {

 }

}

[1] Thrift Type system

TBase Field to Row Field Index Matching

Handling tbase payload and converting to Row type, Flinks needs to have deterministic mapping from each field in thrift payload to specific row index. We propose using ASC sorted thrift_id indexes (marked in brown below) when we map from tbase to row and vice versa. If a field is UNSET, in order to avoid UNSET fields causing mismatches, we use NULL as a placeholder.

struct Xtruct3

{

  1:  string string_thing,

  4:  i32    changed,

  9:  i32    i32_thing,

  11: i64    i64_thing

}

[2] https://raw.githubusercontent.com/apache/thrift/master/test/ThriftTest.thrift

Example of index matching can be found below

Xtruct3 

   string_thing = “boo”

   changed = 0

   i32_thing = unset

   i64_thing = -1

Row

<”boo”,0,null, -1>

Note, from runtime performance consideration, we propose having a type to sort field information cache.

Row Field to TBase Field index matching

We propose a reverse approach above during row-to thrift payload conversion. If row arity is smaller than tbase schema fields, we propose adding null as placeholder.

Example of user update thrift schema with additional fields, Flink SQL jobs were deployed with new schema and restore with state using old schema (without new_thing), adding placeholder would avoid job fall into restart loop or skip messages.

struct Xtruct3

{

  1:  string string_thing,

  4:  i32    changed,

  9:  i32    i32_thing,

  11: i64    i64_thing,

  12: string new_thing

}

Xtruct3 

   string_thing = “boo”

   changed = 0

   i32_thing = unset

   i64_thing = -1

Row

<”boo”,0,null, -1, null>

Handling Nested Field in Row

It’s common to have thrift schemas highly nested and keep growing as more product features are added.In order to properly decode/encode thrift payload, we propose a recursive converter from nested thrift struct to Flink Row field. Based on each sub struct class schema type mapping along with field index mapping, this recursive converter could handle very sophisticated nested fields (more than 7 levels deep, 24k characters of schema string) thrift schema

Wiki Markup
case TType.STRUCT:
 return getRow((TBase) val);
 
public Row getRow(TBase tBase) {
   List<Map.Entry<? extends TFieldIdEnum, FieldMetaData>> entries = ….
 
   // allocate row by thrift field size
   Row result = new Row(entries.size());
   int i = 0;
   String fieldAnnotation = tBase.getClass().toString();
   for (Map.Entry<? extends TFieldIdEnum, FieldMetaData> entry : entries) {
       if (tBase.isSet(entry.getKey())) {
           Object val = tBase.getFieldValue(entry.getKey());
           result.setField(i, getPrimitiveValue(entry.getValue().valueMetaData, val,
             fieldAnnotation + entry.getKey().getFieldName()));
       } else {
           result.setField(i, getDefaultValue(entry.getValue().valueMetaData,
             fieldAnnotation + entry.getKey().getFieldName()));
       }
       i++;
   }
 return result;
}

Hive SequenceFile Table [WIP]

We propose implementing a thrift file system format factory via extending SequenceFileReader over a thrift-encoded payload. Our read path implementation follows the batch version of merced system Scalable and reliable data ingestion at Pinterest

We propose supporting SERDE and SERDEPROPERTIES in hive connector, user could customize 

Image Added

https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/hive/hive_dialect.html

CREATE EXTERNAL TABLE xxx BY (dt string, hr string)
ROW FORMAT SERDE 'xxx.SafeStringEnumThriftSerDe'
   WITH SERDEPROPERTIES(
 "thrift_struct" = 'xxx.schemas.event.Event'
) STORED AS SEQUENCEFILE LOCATION '...';

Parietal Thrift Deserialization 

Along with projection push-downs in kafka source and filesource, Parietal Thrift Deserialization allows Flink to skip thrift fields not used in query statements. We propose an additional change of skip construct tbase instance and instead use tbase field to row field indexing mapping directly write to each row field in one pass.

We propose implementing  SupportsProjectionPushDown to KafkaDynamicSource so that partialthriftdeseriazationschema constructor limits the list of fields a kafka source instance needs to load and deserialize to row fields. 

Image Added

Partial Deserializer Credit to Bhalchandra Pandit

[3] Improving data processing efficiency using partial deserialization of Thrift | by Pinterest Engineering

Table/View Inference DDL [WIP]

Thrift format Table (Kafka or FileSystem) schemas were inference based on thrift schema file definition. As users update thrift schema files and build new schema jars, those table/view definitions should evolve on the fly. There are several options to consider 

Option 1: keep full schema mapping in hive metastore as flink properties

This is a less-change approach where we don’t need to run inference schema but keep a helper function or manual updater to alter tables with thrift class property definitions. 

The disadvantage of this approach: the user needs to run a script periodically and manage schema upgrade and alert thrift format table DDL from time to time. The following example shows how option 1 DDL looks like

CREATE TABLE event_base(
`_time` BIGINT,
`eventType` STRING,
`userId` BIGINT,
`objectId` BIGINT,
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
)) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-class'='...schemas.event.Event',
'update-mode' = 'append'
);


Option2: hide thrift fields not store in Hive metastore, only keeps computed fields that can’t infer from thrift schema

This approach splits schema into inference in the dynamically generated section (e.g thrift field) and computed section. (e.g watermark). The Pro of this section is that the user doesn’t bother to write or update thrift schema fields during a schema upgrade. Each job should have the same view of how the table schema looks based on how the thrift schema jar gets loaded. 

The disadvantage of this approach: splits table schema into two, and needs to discuss how to support inference schema support from the catalog table level. (TBD, might be worth another FLIP)

The following example shows what option 2 DDL looks like, HiveCatalog database only stores fields not available in the thrift schema. 

CREATE TABLE event_base(
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
)) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-class'='....schemas.event.Event',
'update-mode' = 'append'
);



Compatibility, Deprecation, and Migration Plan Test Plan

Thrift support is a less intrusive change, the user would opt-in with a set of configurations.  We plan to follow best practices by adding unit tests and integration tests.

Table/View inference DDL might apply to other catalog table creations and fetching as well. Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.