Status
Google Doc handy
Motivation
Apache Thrift (along with protocol-buf ) is widely adopted as a de facto standard of high throughput network traffic protocol. Historically, Companies like Pinterest have been utilizing thrift to encode strongly typed Kafka messages, and persist to object storage as sequence files in the data warehouse.
Major benefits of this approach were that
- Versioned thrift schema files served as a schema registry where producers and consumers across languages could encode/decode with the latest schema.
- Minimize overhead of maintaining translation ETL jobs which flatten schema or adding additional accessory fields during ingestion
- Lower storage footprint
Other than missing out optimization comes with storage format conversion, running jobs against unsupported thrift format also poses a challenge of maintenance and upgrades flink jobs given
- lack of backward-compatible thrift encoding/decoding support in Flink
- lack of inference Table schema DDL support
Proposed Changes
Summary
Overall, we propose changes touch follow areas
- adding Flink-thrift in flink-format subproject including handling both data stream api and table api encoding/decoding as well as table ddl
- implement SupportsProjectionPushDown in KafkaDynamicSource, reconstruct valueDecodingFormat with partial deserialization
In a private forked repo, we made a few changes to support large scale Thrift formatted Flink DataStream and Flink SQL jobs in production.
Full Serialization/Deserialization
Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.
public class ThriftSerializationSchema<T extends TBase> implements SerializationSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(ThriftSerializationSchema.class);
public ThriftSerializationSchema(Class<T> recordClazz) {
}
public byte[] serialize(T element) {
byte[] message = null;
TSerializer serializer = new TSerializer();
message = serializer.serialize(element);
return message;
}
}
Over time, we observed from time to time, corrupted thrift binary payload from both kafka topic as well as restored state, causing jobs to keep restarting until engineers step in and redeploy with newer kafka timestamp. To minimize production job disruption, we introduced a corrupt tolerant way to encode/decode thrift payload.
PinterestTBaseSerializer credit to Yu Yang
public class ThriftDeserializationSchema<T extends TBase> implements DeserializationSchema {
private Class<T> thriftClazz;
private ThriftCodeGenerator codeGenerator;
public ThriftDeserializationSchema(Class<T> recordClazz, ThriftCodeGenerator codeGenerator) {
this.thriftClazz = recordClazz;
this.codeGenerator = codeGenerator;
}
@Override
public T deserialize(byte[] message) throws IOException {
TDeserializer deserializer = new TDeserializer();
T instance = null;
instance = thriftClazz.newInstance();
deserializer.deserialize(instance, message);
return instance;
}
public boolean isEndOfStream(Object nextElement) {
return false;
}
public TypeInformation<T> getProducedType() {
return new ThriftTypeInfo<>(thriftClazz, codeGenerator);
}
}
env.addDefaultKryoSerializer(classOf[Event], classOf[PinterestTBaseSerializer[Event]])
Hive MetaStore thrift dependencies shading
We noticed HMS has introduced a specific thrift library version that is not compatible with our internal flink version. This led to connectivity issues in our Flink SQL jobs. In order to avoid other users hitting the same dependency issues.
We propose shading libthrift and fb303 in the flink hive connector, and move those two packages to the project root level maven config; users could place their own customized libthrift jar as well as thrift schema jar into /lib folder during release.
TBase & Row Type Mapping
In order to support FlinkSQL workload reading from the Kafka topic, we proposed following data type mapping from Thrift Type System to Flink Row type system. We are in favor of debugging and user readability as we map enum to string type.
bool | DataTypes.BOOLEAN() |
byte | DataTypes.TINYINT() |
i16 | DataTypes.SMALLINT() |
i32 | DataTypes.INT() |
i64 | DataTypes.BIGINT() |
double | DataTypes.DOUBLE() |
string | DataTypes.STRING() |
enum | DataTypes.STRING() |
list | DataTypes.ARRAY() |
set | DataTypes.ARRAY() |
map | DataTypes.MAP() |
struct | DataTypes.Row() |
public final class TType {
public static final byte STOP = 0;
public static final byte VOID = 1;
public static final byte BOOL = 2;
public static final byte BYTE = 3;
public static final byte DOUBLE = 4;
public static final byte I16 = 6;
public static final byte I32 = 8;
public static final byte I64 = 10;
public static final byte STRING = 11;
public static final byte STRUCT = 12;
public static final byte MAP = 13;
public static final byte SET = 14;
public static final byte LIST = 15;
public static final byte ENUM = 16;
public TType() {
}
}
TBase Field to Row Field Index Matching
Handling tbase payload and converting to Row type, Flinks needs to have deterministic mapping from each field in thrift payload to specific row index. We propose using ASC sorted thrift_id indexes (marked in brown below) when we map from tbase to row and vice versa. If a field is UNSET, in order to avoid UNSET fields causing mismatches, we use NULL as a placeholder.
struct Xtruct3
{
1: string string_thing,
4: i32 changed,
9: i32 i32_thing,
11: i64 i64_thing
}
[2] https://raw.githubusercontent.com/apache/thrift/master/test/ThriftTest.thrift
Example of index matching can be found below
Xtruct3 string_thing = “boo” changed = 0 i32_thing = unset i64_thing = -1 | Row <”boo”,0,null, -1> |
Note, from runtime performance consideration, we propose having a type to sort field information cache.
Row Field to TBase Field index matching
We propose a reverse approach above during row-to thrift payload conversion. If row arity is smaller than tbase schema fields, we propose adding null as placeholder.
Example of user update thrift schema with additional fields, Flink SQL jobs were deployed with new schema and restore with state using old schema (without new_thing), adding placeholder would avoid job fall into restart loop or skip messages.
struct Xtruct3
{
1: string string_thing,
4: i32 changed,
9: i32 i32_thing,
11: i64 i64_thing,
12: string new_thing
}
Xtruct3 string_thing = “boo” changed = 0 i32_thing = unset i64_thing = -1 | Row <”boo”,0,null, -1, null> |
Handling Nested Field in Row
It’s common to have thrift schemas highly nested and keep growing as more product features are added.In order to properly decode/encode thrift payload, we propose a recursive converter from nested thrift struct to Flink Row field. Based on each sub struct class schema type mapping along with field index mapping, this recursive converter could handle very sophisticated nested fields (more than 7 levels deep, 24k characters of schema string) thrift schema
case TType.STRUCT:
return getRow((TBase) val);
public Row getRow(TBase tBase) {
List<Map.Entry<? extends TFieldIdEnum, FieldMetaData>> entries = ….
// allocate row by thrift field size
Row result = new Row(entries.size());
int i = 0;
String fieldAnnotation = tBase.getClass().toString();
for (Map.Entry<? extends TFieldIdEnum, FieldMetaData> entry : entries) {
if (tBase.isSet(entry.getKey())) {
Object val = tBase.getFieldValue(entry.getKey());
result.setField(i, getPrimitiveValue(entry.getValue().valueMetaData, val,
fieldAnnotation + entry.getKey().getFieldName()));
} else {
result.setField(i, getDefaultValue(entry.getValue().valueMetaData,
fieldAnnotation + entry.getKey().getFieldName()));
}
i++;
}
return result;
}
Table/View DDL
Thrift format Table (Kafka or FileSystem) schemas were inference based on thrift schema file definition. As users update thrift schema files and build new schema jars, those table/view definitions should evolve on the fly.
Meanwhile, it might makes more sense to separate those works to a separate FLIP proposal. In this FLIP, We propose having a helper script tool generate/regenerate thrift schema inferences fields from thrift skeleton.
Users could modify and automate table creation and updates on their own.
flink_thrift_ddl.py dummy.thrift -o dummy_ddl.sql
Public Interfaces
DataStream API
Including flink-thrift in flink-formats projects including three files to support datastream api encoding/decoding storing state with thrift format
flink-formats/flink-thrift/
ThriftDeserializationSchema
ThriftSerializationSchema
ThriftPartialDeserializationSchema
ThriftSerializationSchema
PinterestTBaseSerializer
In the flink-thrift pom.xml file, include thrift release defined in flink-parent <properties>
flink-thrift pom.xml
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
</dependency>
flink-parent pom.xml
<thrift.version>0.5.0-p6</thrift.version>
StreamExecutionEnvironment env = …
// user can opt-in to skip corrupted state
env.addDefaultKryoSerializer(Event.class, PinterestTBaseSerializer.class);
FlinkKafkaConsumer<Event> kafkaConsumer =
new FlinkKafkaConsumer<Event>(
topicName,
new ThriftDeserializationSchema<>(Event.class),
Configuration.configurationToProperties(kafkaConsumerConfig));
// user can opt-in to deserialize list of fields
FlinkKafkaConsumer<Event> kafkaConsumer =
new FlinkKafkaConsumer<Event>(
topicName,
new ThriftPartialDeserializationSchema<>(Event.class, <list of fields>),
Configuration.configurationToProperties(kafkaConsumerConfig));
FlinkKafkaProducer<Event> sink =
new FlinkKafkaProducer(topicName,
new ThriftSerializationSchema<Event>(topicName),
Configuration.configurationToProperties(kafkaProducerConfig),
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
Metrics
We propose to introduce the following Gauge metrics:
- numFailedDeserializeRecord: The number of messages that failed to deserialize to specific thrift payload
- numFailedSerializeRecord: The number of messages that failed to serialize to specific thrift payload
- numFailedKryoThriftRecord: The number of state records that failed to parse to specific thrift class if user explicit use PinterestTBaseSerializer
Table API
We propose adding ThriftRowFormatFactory,ThriftToRowConverter and RowToThriftConverter
flink-formats/flink-thrift/
ThriftRowFormatFactory
ThriftToRowConverter
RowToThriftConverter
*/
@Internal
public class ThriftValidator extends FormatDescriptorValidator {
public static final String FORMAT_TYPE_VALUE = "thrift";
public static final String FORMAT_THRIFT_CLASS = "format.thrift-struct";
}
Table DDL
We propose adding format.type thrift and thrift-struct ub table DDL
CREATE TABLE event_base_kafka_prod(
`_time` BIGINT,
`eventType` STRING,
`userId` BIGINT,
...
rowTime AS CAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARK FOR rowTime AS rowTime - INTERVAL '60' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'format.type' = 'thrift',
'format.thrift-struct'='xxx.schemas.event.Event',
...
)
Metrics
We propose to introduce the following Gauge metrics:
- numFailedToRowDeserializeRecord: The number of messages that failed to deserialize to specific thrift payload
- numFailedToThriftSerializeRow: The number of messages that failed to serialize to specific thrift payload
Schema Compatibility
We propose datastream api leverage thrift procol already able to handle adding new field scenario full compatibility
- data encode with old thrift schema, decode with new thrift schema
- data encode with new thrift schema, decode with old thrift schema
For table api implementation, we propose changes to match data stream api full compatibility behavior. However, when the user schema introduces breaking changes, we propose showing metrics of failed messages.
KafkaDynamicSource
We propose kafka tablesource implements SupportsProjectionPushDown
public class KafkaDynamicSource
implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown,
SupportsProjectionPushDown {
In implementation, we only reconstruct DeserializationSchema that takes list of fields
@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
// check valueDecodingFormat is ThriftDeserializationSchema
// if so reconstruct DeserializationSchema that does partial deserialize
}
protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
Table / View Inference DDL
TBD
Compatibility, Deprecation, and Migration Plan Test Plan
Thrift support is a less intrusive change, the user would opt-in with a set of configurations. We plan to follow best practices by adding unit tests and integration tests.
Table/View inference DDL might apply to other catalog table creations and fetching as well.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.