...
Page properties | |||||||
---|---|---|---|---|---|---|---|
|
Google Doc handy
Motivation
Apache Thrift (along with protocol-buf ) is has been widely adopted as a de facto standard of high throughput network traffic protocol. Historically, Companies like Pinterest have been utilizing thrift to encode strongly typed Kafka messages, and persist to object storage as sequence files in the data warehouse.
Major benefits of this approach were that
- Versioned thrift schema files served as a schema registry where producers and consumers across languages could encode/decode with the latest schema.
- Minimize overhead of maintaining translation ETL jobs which flatten schema or adding additional accessory fields during ingestion
- Lower storage footprint
Other than missing out optimization comes with storage format conversion, running jobs against unsupported thrift format also poses a challenge of maintenance and upgrades flink jobs given
- lack of backward-compatible thrift encoding/decoding support in Flink
- lack of inference Table schema DDL support
Proposed Changes
Summary
Overall, we propose changes touch follow areas
- adding Flink-thrift in flink-format subproject including handling both data stream api and table api encoding/decoding as well as table ddl
- implement SupportsProjectionPushDown in KafkaDynamicSource, reconstruct valueDecodingFormat with partial deserialization
In a private forked repo, we made a few changes to support large scale Thrift formatted Flink DataStream and Flink SQL jobs in production.
Full Serialization/Deserialization
. Benefits of choosing thrift format are simplified schema management in code and lower infra storage cost.
Flink has no built-in support when users leverage thrift to encoding events they need to process. In this proposal, we aim to add this support to Flink.
Proposed Changes Summary
Although Thrift support can be a broad topic, we focus on thrift format encoding/decoding can be natively supported in Flink APIs (DataStream and Table as well as PyFlink)
DataStream Thrift Serialization/Deserialization
ThriftSerializationSchema and ThriftDeserializationSchema could be added Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.should generate thrift java stub and add thrift schema jar dependencies.
Implementation wise should be straightforward in a few lines.
publicclass ThriftSerializationSchema<publicclassThriftSerializationSchema<TextendsTBase> implementsSerializationSchema< SerializationSchema<T> { privatestaticfinalLoggerLOG=LoggerFactory.getLogger(ThriftSerializationSchema.class); publicThriftSerializationSchema(Class<T> recordClazz) {
}
public publicbyte[] serialize(T element) {
...
TSerializer serializer = new TSerializer();
try{
message message = serializer.serialize(element);
return message;
}
}
Over time, we observed from time to time, corrupted thrift binary payload from both kafka topic as well as restored state, causing jobs to keep restarting until engineers step in and redeploy with newer kafka timestamp. To minimize production job disruption, we introduced a corrupt tolerant way to encode/decode thrift payload.
...
} catch(...){
//mark fail to serialize metric
}finally{
return message;
}
}
}
setDeserializer(new ThriftDeSerializationSchema(thrift_class))
With datastream API, emitted TBase objects could be serialized/deserialized via kryo (unfortunately)
DataStream<? extends TBase> stream = env.addSource(...);
Table API Thrift Serialization/Deserialization
Unlike DataStream API, Table API / SQL requires converting byte arrays from and to Row/RowData. Certain mapping rules are required to ensure correctness and backward compatibility of such conversion
...
}
@Override
...
deserializer.deserialize(instance, message);
return instance;
}
...
}
...
}
}
env.addDefaultKryoSerializer(classOf[Event], classOf[PinterestTBaseSerializer[Event]])
Hive MetaStore thrift dependencies shading
We noticed HMS has introduced a specific thrift library version that is not compatible with our internal flink version. This led to connectivity issues in our Flink SQL jobs. In order to avoid other users hitting the same dependency issues.
We propose shading libthrift and fb303 in the flink hive connector, and move those two packages to the project root level maven config; users could place their own customized libthrift jar as well as thrift schema jar into /lib folder during release.
TBase & Row Type Mapping
...
i++;
}
return result;
}
...
Hive MetaStore dependencies conflict
Hive meta store libraries introduced a specific thrift library version that is not compatible with our internal flink version. This led to our Flink SQL jobs version conflict.
We propose shading libthrift and fb303 in the flink hive connector, and move those two packages to the project root level maven config; users could place their own customized libthrift jar as well as thrift schema jar into the /lib folder during release.
Thrift format table DDL script
Thrift format Table (Kafka or FileSystem) schemas were inference based on thrift schema file definition. As users update thrift schema files and build release new schema jars, those thrift format table /view definitions should evolve on the flydefinitions need to be updated.
Meanwhile, it might makes more sense to separate those works to a separate FLIP proposal. In this FLIP, We propose having a helper script tool generate/regenerate thrift schema inferences fields from Unfortunately, Flink has no built-in support for automated schema inference. We plan to offer a script to generate thrift schema fields from the thrift skeleton.
Users could modify and automate table creation and updates on their own.
flink_thrift_ddl_gen.py dummy.thrift -o dummy_ddl.sql
Public Interfaces
DataStream API
...
ThriftSerializationSchema
ThriftPartialDeserializationSchema
ThriftSerializationSchema
PinterestTBaseSerializer
In the flink-thrift pom.xml file, include thrift release defined in flink-parent <properties>
...
newFlinkKafkaConsumer<Event>(
topicName,
newThriftPartialDeserializationSchemaThriftDeserializationSchema<>(Event.class, <list of fields>),
Configuration.configurationToProperties(kafkaConsumerConfig));
...
We propose to introduce the following Gauge metrics:
- numFailedDeserializeRecordnumFailedDeserializeThriftRecord: The number of messages that failed to deserialize to specific thrift class payload
- numFailedSerializeRecordnumFailedSerializeThriftRecord: The number of messages that failed to serialize to specific thrift payloadnumFailedKryoThriftRecord: The number of state records that failed to parse to specific thrift class if user explicit use PinterestTBaseSerializerbinary payload
Table API
We propose adding ThriftRowFormatFactory,ThriftToRowConverter ThriftToRowDataConverter and RowToThriftConverterRowDataToThriftConverter
flink-formats/flink-thrift/
...
public static final String FORMAT_THRIFT_CLASS = "format.thrift-struct";
}
KafkaDynamicSource
We propose kafka tablesource implements SupportsProjectionPushDown
publicclassKafkaDynamicSource
implementsScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown,
SupportsProjectionPushDown {
In implementation, we only reconstruct DeserializationSchema that takes list of fields
@Override
publicvoidapplyProjection(int[][] projectedFields, DataType producedDataType) {
// check valueDecodingFormat is ThriftDeserializationSchema
// if so reconstruct DeserializationSchema that does partial deserialize
}
protectedfinalDecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
Table DDL
We propose adding format.type thrift and thrift-struct ub table DDL
...
We propose to introduce the following Gauge metrics:
- numFailedToRowDeserializeRecordnumFailedConvertRowDataRecord: The number of messages that failed to deserialize convert to specific thrift payloadrowdatanumFailedToThriftSerializeRow
- numFailedConvertThriftRecord: The number of messages that rows failed to serialize convert to specific thrift binary payload
Schema Compatibility
We propose datastream api leverage thrift procol already able to handle adding new field scenario full compatibility Our proposal leverage thrift protocol handles adding new fields without changing thrift id sequence scenarios backward and forward compatibility. One caveat is when the thrift enum adding new value not defined in the old thrift schema will break the job running with the old thrift schema.
- data encode with old thrift schema, decode with new thrift schema
- data encode with new thrift schema, decode with old thrift schema
For table api implementation, we propose changes to match data stream api full compatibility behavior. However, when the user schema introduces breaking changes, we propose showing metrics of failed messages.
KafkaDynamicSource
We propose kafka tablesource implements SupportsProjectionPushDown
...
SupportsProjectionPushDown {
In implementation, we only reconstruct DeserializationSchema that takes list of fields
@Override
...
// check valueDecodingFormat is ThriftDeserializationSchema
// if so reconstruct DeserializationSchema that does partial deserialize
}
...
Table / View Inference DDL
TBD
Compatibility, Deprecation, and Migration Plan Test Plan
Thrift support is a less intrusive change, the user would opt-in with a set of configurations. We plan to follow best practices by adding unit tests and integration tests.Table/View inference DDL might apply to other catalog table creations and fetching as well.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.