Status
...
Page properties | |
---|---|
|
...
...
...
...
...
JIRA: TBD
|
...
Motivation
Apache Thrift (along with protocol-buf ) is has been widely adopted as a de facto standard of high throughput network traffic protocol. Historically, Companies like Pinterest have been utilizing thrift to encode strongly typed Kafka messages. Benefits of choosing thrift format are simplified schema management in code and lower infra storage cost.
Flink has no built-in support when users leverage thrift to encoding events they need to process. In this proposal, and persist to object storage as sequence files in the data warehouse.
Major benefits of this approach were that
- Versioned thrift schema files served as a schema registry where producers and consumers across languages could encode/decode with the latest schema.
- Minimize overhead of maintaining translation ETL jobs which flatten schema or adding additional accessory fields during ingestion
- Lower storage footprint
Other than missing out optimization comes with storage format conversion, running jobs against unsupported thrift format also poses a challenge of maintenance and upgrades flink jobs given
- lack of backward-compatible thrift encoding/decoding support in Flink
- lack of inference Table schema DDL support
Proposed Changes
Compatibility, Deprecation, and Migration Plan
Test Plan
we aim to add this support to Flink.
Proposed Changes Summary
Although Thrift support can be a broad topic, we focus on thrift format encoding/decoding can be natively supported in Flink APIs (DataStream and Table as well as PyFlink)
DataStream Thrift Serialization/Deserialization
ThriftSerializationSchema and ThriftDeserializationSchema could be added to encode/decode thrift messages. Users should generate thrift java stub and add thrift schema jar dependencies.
Implementation wise should be straightforward in a few lines.
publicclass ThriftSerializationSchema<TextendsTBase> implements SerializationSchema<T> {
publicbyte[] serialize(T element) {
byte[] message = null;
TSerializer serializer = new TSerializer();
try{
message = serializer.serialize(element);
} catch(...){
//mark fail to serialize metric
}finally{
return message;
}
}
}
setDeserializer(new ThriftDeSerializationSchema(thrift_class))
With datastream API, emitted TBase objects could be serialized/deserialized via kryo (unfortunately)
DataStream<? extends TBase> stream = env.addSource(...);
Table API Thrift Serialization/Deserialization
Unlike DataStream API, Table API / SQL requires converting byte arrays from and to Row/RowData. Certain mapping rules are required to ensure correctness and backward compatibility of such conversion.
TBase & Row Type Mapping
In order to support FlinkSQL workload reading from the Kafka topic, we proposed following data type mapping from Thrift Type System to Flink Row type system. We are in favor of debugging and user readability as we map enum to string type.
bool | DataTypes.BOOLEAN() |
byte | DataTypes.TINYINT() |
i16 | DataTypes.SMALLINT() |
i32 | DataTypes.INT() |
i64 | DataTypes.BIGINT() |
double | DataTypes.DOUBLE() |
string | DataTypes.STRING() |
enum | DataTypes.STRING() |
list | DataTypes.ARRAY() |
set | DataTypes.ARRAY() |
map | DataTypes.MAP() |
struct | DataTypes.Row() |
publicfinalclassTType {
publicstaticfinalbyteSTOP=0;
publicstaticfinalbyteVOID=1;
publicstaticfinalbyteBOOL=2;
publicstaticfinalbyteBYTE=3;
publicstaticfinalbyteDOUBLE=4;
publicstaticfinalbyteI16=6;
publicstaticfinalbyteI32=8;
publicstaticfinalbyteI64=10;
publicstaticfinalbyteSTRING=11;
publicstaticfinalbyteSTRUCT=12;
publicstaticfinalbyteMAP=13;
publicstaticfinalbyteSET=14;
publicstaticfinalbyteLIST=15;
publicstaticfinalbyteENUM=16;
publicTType() {
}
}
TBase Field to Row Field Index Matching
Handling tbase payload and converting to Row type, Flinks needs to have deterministic mapping from each field in thrift payload to specific row index. We propose using ASC sorted thrift_id indexes (marked in brown below) when we map from tbase to row and vice versa. If a field is UNSET, in order to avoid UNSET fields causing mismatches, we use NULL as a placeholder.
struct Xtruct3
{
1: string string_thing,
4: i32 changed,
9: i32 i32_thing,
11: i64 i64_thing
}
[2] https://raw.githubusercontent.com/apache/thrift/master/test/ThriftTest.thrift
Example of index matching can be found below
Xtruct3 string_thing = “boo” changed = 0 i32_thing = unset i64_thing = -1 | Row <”boo”,0,null, -1> |
Note, from runtime performance consideration, we propose having a type to sort field information cache.
Row Field to TBase Field index matching
We propose a reverse approach above during row-to thrift payload conversion. If row arity is smaller than tbase schema fields, we propose adding null as placeholder.
Example of user update thrift schema with additional fields, Flink SQL jobs were deployed with new schema and restore with state using old schema (without new_thing), adding placeholder would avoid job fall into restart loop or skip messages.
struct Xtruct3
{
1: string string_thing,
4: i32 changed,
9: i32 i32_thing,
11: i64 i64_thing,
12: string new_thing
}
Xtruct3 string_thing = “boo” changed = 0 i32_thing = unset i64_thing = -1 | Row <”boo”,0,null, -1, null> |
Handling Nested Field in Row
It’s common to have thrift schemas highly nested and keep growing as more product features are added.In order to properly decode/encode thrift payload, we propose a recursive converter from nested thrift struct to Flink Row field. nested field shall we convert to a row of fields sorted by thrift id.
Hive MetaStore dependencies conflict
Hive meta store libraries introduced a specific thrift library version that is not compatible with our internal flink version. This led to our Flink SQL jobs version conflict. We suggest user upgrade hive metastore version to match thrift version given keeping two thrift version leads to hive connector dependencies complicated to manage.
Thrift format table DDL script
Thrift format Table (Kafka or FileSystem) schemas were inference based on thrift schema file definition. As users update thrift schema files and release new schema, thrift format table definitions need to be updated.
Unfortunately, Flink has no built-in support for automated schema inference. We plan to offer a script to generate thrift schema fields from the thrift skeleton.
Users could modify and automate table creation and updates on their own.
flink_thrift_ddl_gen.py dummy.thrift -o dummy_ddl.sql
Public Interfaces
DataStream API
Including flink-thrift in flink-formats projects including three files to support datastream api encoding/decoding storing state with thrift format
flink-formats/flink-thrift/
ThriftDeserializationSchema
ThriftSerializationSchema
In the flink-thrift pom.xml file, include thrift release defined in flink-parent <properties>
flink-thrift pom.xml
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
</dependency>
flink-parent pom.xml
<thrift.version>0.17</thrift.version>
KafkaSource<Event> source =
KafkaSource.<Event>builder()
...
.setDeserializer(
new ThriftDeserializationSchema<>(Event.class))
...
.build();
...
Metrics
We propose to introduce the following Gauge metrics:
- numFailedDeserializeThriftRecord: The number of messages that failed to deserialize to thrift class payload
- numFailedSerializeThriftRecord: The number of messages that failed to serialize to thrift binary payload
Table API
We propose adding ThriftRowFormatFactory,ThriftToRowDataConverter and RowDataToThriftConverter
flink-formats/flink-thrift/
ThriftRowFormatFactory
ThriftToRowConverter
RowToThriftConverter
*/
@Internal
public class ThriftValidator extends FormatDescriptorValidator {
public static final String FORMAT_TYPE_VALUE = "thrift";
public static final String FORMAT_THRIFT_CLASS = "format.thrift-struct";
}
protectedfinalDecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
Table DDL
We propose adding format.type thrift and thrift-struct ub table DDL
CREATETABLEevent_base_kafka_prod(
`_time` BIGINT,
`eventType` STRING,
`userId` BIGINT,
...
rowTime ASCAST(from_unixtime(_time/1000000000) as timestamp),
WATERMARKFOR rowTime AS rowTime - INTERVAL '60'SECOND
) WITH (
'connector.type'='kafka',
'connector.version'='universal',
'format.type'='thrift',
'format.thrift-struct'='xxx.schemas.event.Event',
...
)
Metrics
We propose to introduce the following Gauge metrics:
- numFailedConvertRowDataRecord: The number of messages failed to convert to specific rowdata
- numFailedConvertThriftRecord: The number of rows failed to convert to thrift binary payload
Schema Compatibility
Our proposal leverage thrift protocol handles adding new fields without changing thrift id sequence scenarios backward and forward compatibility. One caveat is when the thrift enum adding new value not defined in the old thrift schema will break the job running with the old thrift schema.
- data encode with old thrift schema, decode with new thrift schema
- data encode with new thrift schema, decode with old thrift schema
For table api implementation, we propose changes to match data stream api full compatibility behavior. However, when the user schema introduces breaking changes, we propose showing metrics of failed messages.
Compatibility, Deprecation, and Migration Plan Test Plan
Thrift support is a less intrusive change, the user would opt-in with a set of configurations. We plan to follow best practices by adding unit tests and integration tests.Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
References
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-11333