Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Unlike DataStream API, Table API / SQL requires converting byte arrays from and to Row/RowData. Certain mapping rules are required to ensure correctness and backward compatibility of such conversion.

TBase & Row Type Mapping

In order to support FlinkSQL workload reading from the Kafka topic, we proposed following data type mapping from Thrift Type System to Flink Row type system. We are in favor of debugging and user readability as we map enum to string type.

...

 publicTType() {

 }

}

[1] Thrift Type system

TBase Field to Row Field Index Matching

Handling tbase payload and converting to Row type, Flinks needs to have deterministic mapping from each field in thrift payload to specific row index. We propose using ASC sorted thrift_id indexes (marked in brown below) when we map from tbase to row and vice versa. If a field is UNSET, in order to avoid UNSET fields causing mismatches, we use NULL as a placeholder.

...

Note, from runtime performance consideration, we propose having a type to sort field information cache.

Row Field to TBase Field index matching

We propose a reverse approach above during row-to thrift payload conversion. If row arity is smaller than tbase schema fields, we propose adding null as placeholder.

...

Xtruct3 

   string_thing = “boo”

   changed = 0

   i32_thing = unset

   i64_thing = -1

Row

<”boo”,0,null, -1, null>

Handling Nested Field in Row

It’s common to have thrift schemas highly nested and keep growing as more product features are added.In order to properly decode/encode thrift payload, we propose a recursive converter from nested thrift struct to Flink Row field. Based on each sub struct class schema type mapping along with field index mapping, this recursive converter could handle very sophisticated nested fields (more than 7 levels deep, 24k characters of schema string) thrift schemacaseTType.STRUCT: returngetRow((TBase) val);publicRowgetRow(TBase tBase) {   List<Map.Entry<?extendsTFieldIdEnum, FieldMetaData>> entries= ….

   // allocate row by thrift field size

...

nested field shall we convert to a row of fields sorted by thrift id.

       if (tBase.isSet(entry.getKey())) {

...

           result.setField(i, getPrimitiveValue(entry.getValue().valueMetaData, val,

...

       } else {

           result.setField(i, getDefaultValue(entry.getValue().valueMetaData,

...

       }

       i++;

   }

 return result;

}

Hive MetaStore dependencies conflict

Hive meta store libraries introduced a specific thrift library version that is not compatible with our internal flink version. This led to our Flink SQL jobs version conflict.  We propose shading  libthrift and fb303 in the flink hive connector, and move those two packages to the project root level maven config; users could place their own customized libthrift jar as well as thrift schema jar into the /lib folder during releaseWe suggest user upgrade hive metastore version to match thrift version given keeping two thrift version leads to hive connector dependencies complicated to manage.

Thrift format table DDL script

...

flink-parent pom.xml

<thrift.version>0.5.0-p6<17</thrift.version>StreamExecutionEnvironmentenv=

// user can opt-in to skip corrupted state 

env.addDefaultKryoSerializer(Event.class, PinterestTBaseSerializer.class);

...

   topicName,

...

   Configuration.configurationToProperties(kafkaConsumerConfig));

// user can opt-in to deserialize list of fields 

...

   topicName,

...

KafkaSource<Event> source =
KafkaSource.<Event>builder()
...
.setDeserializer(
                       new ThriftDeserializationSchema<>(Event.class)

...

)
...
.build();

...

   Configuration.configurationToProperties(kafkaConsumerConfig));

...

       Configuration.configurationToProperties(kafkaProducerConfig),

       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

Metrics

We propose to introduce the following Gauge metrics:

...

  1. numFailedConvertRowDataRecord: The number of messages failed to convert to specific rowdata
  2. numFailedConvertThriftRecord: The number of rows failed to convert to thrift binary payload

Schema Compatibility 

Our proposal leverage thrift protocol handles adding new fields without changing thrift id sequence scenarios backward and forward compatibility.  One caveat is when the thrift enum adding new value not defined in the old thrift schema will break the job running with the old thrift schema.

...

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

References

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-11333