Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Naively, we started implementing ThriftSerializationSchema and ThriftDeserializationSchema to encode/decode thrift messages. Users could pass in to connectors in Datastream jobs.

...


public

...

classThriftSerializationSchema<TextendsTBase> implementsSerializationSchema<T> {

   privatestaticfinalLoggerLOG=LoggerFactory.getLogger(ThriftSerializationSchema.class);

...

Overtime, we observed from time to time, corrupted thrift binary payload from both kafka topic as well as restored state, causing jobs to keep restarting until engineers step in and redeploy with newer kafka timestamp. To minimize production job disruption, we introduced a corrupt tolerant way to encode/decode thrift payload. 

...


   publicThriftSerializationSchema(Class<T> recordClazz) {


   }


   publicbyte[] serialize(T element) {

       byte[] message=null;

       TSerializerserializer=newTSerializer();

       message =serializer.serialize(element);

       return message;

   }

}


Over time, we observed from time to time, corrupted thrift binary payload from both kafka topic as well as restored state, causing jobs to keep restarting until engineers step in and redeploy with newer kafka timestamp. To minimize production job disruption, we introduced a corrupt tolerant way to encode/decode thrift payload. 

PinterestTBaseSerializercredit to Yu Yang


publicclassThriftDeserializationSchema<TextendsTBase> implementsDeserializationSchema {


   privateClass<T> thriftClazz;

   privateThriftCodeGeneratorcodeGenerator;


   publicThriftDeserializationSchema(Class<T> recordClazz, ThriftCodeGenerator codeGenerator) {

       this.thriftClazz= recordClazz;

       this.codeGenerator= codeGenerator;

   }


   @Override

   publicTdeserialize(byte[] message) throwsIOException {

       TDeserializerdeserializer=newTDeserializer();

       Tinstance=null;

       instance =thriftClazz.newInstance();

       deserializer.deserialize(instance, message);

       return instance;

   }


   publicbooleanisEndOfStream(Object nextElement) {

       returnfalse;

   }


   publicTypeInformation<T> getProducedType() {

       returnnewThriftTypeInfo<>(thriftClazz, codeGenerator);

   }

}

...


env.addDefaultKryoSerializer(classOf[Event], classOf[PinterestTBaseSerializer[Event]])

...

It’s common to have thrift schemas highly nested and keep growing as more product features are added.In order to properly decode/encode thrift payload, we propose a recursive converter from nested thrift struct to Flink Row field. Based on each sub struct class schema type mapping along with field index mapping, this recursive converter could handle very sophisticated nested fields (more than 7 levels deep, 24k characters of schema string) thrift schema

...

case

...

TType.STRUCT:

...

 return

...

getRow((TBase)

...

val);

...


public

...

Row

...

getRow(TBase

...

tBase)

...

{

   List<Map.Entry<?extendsTFieldIdEnum, FieldMetaData>> entries= ….


   // allocate row by thrift field size

   Rowresult=newRow(entries.size());

...

   inti=0;

   StringfieldAnnotation=tBase.getClass().toString();

...

   for (Map.

...

Entry<?

...

extends

...

TFieldIdEnum,

...

FieldMetaData> entry

...

:

...

entries)

...

{

       if (tBase.isSet(entry.getKey()))

...

{

           Objectval=tBase.getFieldValue(entry.getKey());

...

           result.setField(i,

...

getPrimitiveValue(entry.getValue()

...

.valueMetaData, val,

             fieldAnnotation +entry.getKey().getFieldName()));

...

       } else {

           result.setField(i,

...

getDefaultValue(entry

...

.getValue().valueMetaData,

             fieldAnnotation +entry.getKey().getFieldName()));

...

       }

       i++;

...

   }

 return result;

}

Hive SequenceFile Table [WIP]

...

https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/hive/hive_dialect.html

CREATEEXTERNALTABLE xxx BY (dt string, hr string)

ROWFORMAT SERDE 'xxx.SafeStringEnumThriftSerDe'   WITH

   WITH SERDEPROPERTIES(

 "thrift_struct"='xxx.schemas.event.Event'

) STORED AS SEQUENCEFILE LOCATION '...';

Parietal Thrift Deserialization 

...