Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It’s common to have thrift schemas highly nested and keep growing as more product features are added.In order to properly decode/encode thrift payload, we propose a recursive converter from nested thrift struct to Flink Row field. Based on each sub struct class schema type mapping along with field index mapping, this recursive converter could handle very sophisticated nested fields (more than 7 levels deep, 24k characters of schema string) thrift schemacaseTType.STRUCT: returngetRow((TBase) val);publicRowgetRow(TBase tBase) {   List<Map.Entry<?extendsTFieldIdEnum, FieldMetaData>> entries= ….

   // allocate row by thrift field size

...

nested field shall we convert to a row of fields sorted by thrift id.

       if (tBase.isSet(entry.getKey())) {

...

           result.setField(i, getPrimitiveValue(entry.getValue().valueMetaData, val,

...

       } else {

           result.setField(i, getDefaultValue(entry.getValue().valueMetaData,

...

       }

       i++;

   }

 return result;

}

Hive MetaStore dependencies conflict

...

flink-parent pom.xml

<thrift.version>0.5.0-p6<17</thrift.version>StreamExecutionEnvironmentenv=

// user can opt-in to skip corrupted state 

env.addDefaultKryoSerializer(Event.class, PinterestTBaseSerializer.class);

...

   topicName,

...

   Configuration.configurationToProperties(kafkaConsumerConfig));

// user can opt-in to deserialize list of fields 

...

   topicName,

...

KafkaSource<Event> source =
KafkaSource.<Event>builder()
...
.setDeserializer(
                       new ThriftDeserializationSchema<>(Event.class

...

   Configuration.configurationToProperties(kafkaConsumerConfig));

...

))
...
.build();

...

       Configuration.configurationToProperties(kafkaProducerConfig),

       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

Metrics

We propose to introduce the following Gauge metrics:

...