Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA

Unable to render Jira issues macro, execution error.

Release1.18


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This proposal is inspired by https://issues.apache.org/jira/browse/FLINK-16009

VARINT and ZIGZAG is used in the well known encoding format PROTOBUFFER, which use very little compute overhead to trade for obvious data size reduction. And Type-Length-Value format which is also the basic of the encoding is also necessary.


And The BinaryRowData structure now has below shortcomings:

  • fixedLength part 8 bytes can be redundant for data types like INT, TINYINT etc.
  • when writing long VARCHAR data longer than 7, the ensureCapacity can cause the whole array copy.
  • the large size of BinaryRowData can also cause unnecessary cost in ROCKSDB serialization, flush, merge and lower cache hit rate(larger size of single record means less cached record number with same cache size configuration) .


So I think introducing the TLV and VARINT/ ZIGZAG encoding method into the flink-table module can obviously improve the state related performance.  We can create a new kind of ROWDATA called CodedRowData.


Public Interfaces

Influenced public interface is serializer creation related. And I list it below:

  • enable creating VARLEN serializer for state

add  public TypeSerializer<T> createVarLenSerializer(ExecutionConfig config)  to

interface org.apache.flink.api.common.typeinfo.TypeInformation  

this method is to create VARLEN related Serializer. The default action is just call createSerializer and return. After that we can change MapTypeInfo, InternalTypeInfo (for ROW kind) behaviour to create CodedRowDataSerializer.

  • add a config to represent whether to use the VARLEN encoding

add a config table.exec.enable-coded-rowdata to let user decide whether to enable the feature

  • change the keySerialer creation to VARLEN serialer

org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator

org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator

need to change the way it creates key serializers. we need to decide whether to call createSerializer or createVarLenSerializer according to the config mentioned above

  • modify the table operator creation logic(NOT PUBLIC)

StreamExec* like Join, Rank need to modify the  SQL utils like  keyselector, projection to make it use CodedRowData

  • change the snapshot restoring logic(NOT PUBLIC)

TypeSerializerSnapshot needs to be able to restore encoder related class information



Proposed Changes



CodedRowRWTest

This testing class is used to test encoding and decoding single elements like sint32, sint64, stringdata etc....




CodedRowDataTest

This testing class test the codedRowData RW like the BinaryRowDataTest do.

TAG mapping

The mapping is pretty much like the 

Logical Type

WireType

CHAR | VARCHAR

WIRETYPE_LENGTH_DELIMITED

TINYINT | INTEGER | SMALLINT

WIRETYPE_VARINT

BOOLEAN

WIRETYPE_VARINT

BIGINT

WIRETYPE_VARINT

FLOAT

WIRETYPE_FIXED32

DOUBLE

WIRETYPE_FIXED64

TIMESTAMP_WITHOUT_TIME_ZONE

WIRETYPE_TIMESTAMP (7 which is not used in PB type)

ROW

WIRETYPE_LENGTH_DELIMITED

DECIMAL

WIRETYPE_DECIMAL (6 which is not used in PB type)

ARRAY

tag of the array element, this tag can appear multiple time, use WIRETYPE_NULL_VALUE = 3

to represent a null element. which is not applicable in PB

TO BE COMPLETE



compatibility change

We could create a new table config table.exec.enable-coded-rowdata to switch ON/OFF this Coded feature.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

nothing

  • If we are changing behavior how will we phase out the older behavior?

no

  • If we need special migration tools, describe them here.

no

  • When will we remove the existing behavior?

NA

Test Plan

except the unit test, I plan to test a really world job for at least 2 weeks, monitoring CPU and RSS change.


At the same time, I will compare the output data to the parallel out, make sure they are 100% same.

Test Case

 I have verify this change in my own 1.16 based flink sql environment.

This is a very simple single taskmanager job.

Two kafka streams → json_parse UDTF → left join → blackhole sink(exclude other influencing factor)

The left join stream has 21 VARCHAR row elements, the right one has 14 VARCHAR.


left kafka record is 5kb average, right kafka record is 


And I have also tried to write the joined stream to some offline bigdata system and compare them exactly the same.


I can attach some safe SQL code

simple join task
CREATE FUNCTION binlogParser AS 'x';
CREATE FUNCTION jsonString as 'x';
CREATE FUNCTION jsonParseField AS 'x';


CREATE TABLE `source_refund_order` (
  `messageKey`  VARBINARY,
  `message`  VARBINARY,
  `topic`  VARCHAR METADATA VIRTUAL,
  `partition`  INTEGER METADATA VIRTUAL,
  `offset`  BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',

);

CREATE VIEW dw_ord_refund_order AS
SELECT
   BLINK_JSON_VALUE(data, '$.a') AS a,
   BLINK_JSON_VALUE(data, '$.b') AS b,
   BLINK_JSON_VALUE(data, '$.c') AS c,
   BLINK_JSON_VALUE(data, '$.d') AS d,
   BLINK_JSON_VALUE(data, '$.e') AS e,
   BLINK_JSON_VALUE(data, '$.v') AS v,
   BLINK_JSON_VALUE(data, '$.a') AS a,
   BLINK_JSON_VALUE(data, '$.f') AS f,
   BLINK_JSON_VALUE(data, '$.g') AS g,
   BLINK_JSON_VALUE(data, '$.h') AS h,
   BLINK_JSON_VALUE(data, '$.i') AS i,
   BLINK_JSON_VALUE(data, '$.j') AS j,
   BLINK_JSON_VALUE(data, '$.k') AS k,
   BLINK_JSON_VALUE(data, '$.l') AS l,
   BLINK_JSON_VALUE(data, '$.m') AS m,
   BLINK_JSON_VALUE(data, '$.n') AS n,
   BLINK_JSON_VALUE(data, '$.o') AS o,
   BLINK_JSON_VALUE(data, '$.p') AS p,
   BLINK_JSON_VALUE(data, '$.q') AS q,
   BLINK_JSON_VALUE(data, '$.r') AS r,
   BLINK_JSON_VALUE(data, '$.s') AS s,
  `offset` as `offset`
from source_refund_order ,
  LATERAL TABLE (binlogParser (`message`)) as tmp (
    data,
    old_data,
    dts_operation_flag,
    dts_db_name,
    dts_table_name,
    dts_before_flag,
    dts_after_flag
  )where dts_operation_flag <>'D';


CREATE TABLE `source_refund_money` (
  `messageKey`  VARBINARY,
  `message`  VARBINARY,
  `topic`  VARCHAR METADATA VIRTUAL,
  `partition`  INTEGER METADATA VIRTUAL,
  `offset`  BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',

);

CREATE VIEW dw_ord_refund_money AS
SELECT
   BLINK_JSON_VALUE(data, 'a') AS a,
   BLINK_JSON_VALUE(data, '$.b') AS b,
   BLINK_JSON_VALUE(data, '$.c') AS c,
   BLINK_JSON_VALUE(data, '$.d') AS d,
   BLINK_JSON_VALUE(data, '$.e') AS e,
   BLINK_JSON_VALUE(data, '$.f') AS f,
   BLINK_JSON_VALUE(data, '$.g') AS g,
   BLINK_JSON_VALUE(data, '$.h') AS h,
   BLINK_JSON_VALUE(data, '$.i') AS i,
   BLINK_JSON_VALUE(data, '$.j') AS j,
   BLINK_JSON_VALUE(data, '$.k') AS k,
   BLINK_JSON_VALUE(data, '$.l') AS l,
   BLINK_JSON_VALUE(old_data, '$.m') AS m,
   BLINK_JSON_VALUE(old_data, '$.n') AS n,
  o as o,
  `offset` as `refund_money_offset`
from source_refund_money ,
  LATERAL TABLE (binlogParser (`message`)) as tmp (
    data,
    old_data,
    dts_operation_flag,
    dts_db_name,
    dts_table_name,
    dts_before_flag,
    dts_after_flag
  )where dts_operation_flag <>'D';

create view refund_order_money_view AS 
select *
from dw_ord_refund_order a
left join dw_ord_refund_money b 
on a.join_id=b.join_id;


CREATE TABLE `blackholesink` (
  ...
) WITH (
'connector' = 'blackhole'

);

insert into blackholesink
select 
...
  from refund_order_money_view;


Test enviroment

my environment is Aliyun based container,

kubernetes.taskmanager.cpu  = 1.0

memory setting is like below


ROCKSDB related config, missing configs are left default:

state.backend.rocksdb.block.cache-size64m
state.backend.rocksdb.checkpoint.transfer.thread.num2
state.backend.rocksdb.memory.partitioned-index-filterstrue
state.backend.rocksdb.metrics.block-cache-capacitytrue
state.backend.rocksdb.metrics.block-cache-pinned-usagetrue
state.backend.rocksdb.metrics.block-cache-usagetrue
state.backend.rocksdb.metrics.cur-size-all-mem-tablestrue
state.backend.rocksdb.metrics.mem-table-flush-pendingtrue
state.backend.rocksdb.metrics.num-running-flushestrue
state.backend.rocksdb.metrics.total-sst-files-sizetrue
state.backend.rocksdb.predefined-optionsFLASH_SSD_OPTIMIZED


Testing result

I can find below change compare to unchanged 1.16 job:


  • when start from earliest-offset,block-cache fill up is slower.

This is really easy to understand.

Encoded took about 10 min to fill the block cache

 

original binary row data took 5 min to fill the block cache


  • when start from earliest-offset,checkpoint size is about 30% smaller.

Two jobs started almost at the same time, the encoded one has much less state size

encoded job

origin binary rowdata


  • when start from earliest-offset, consuming rate is more stable and about 10% faster than the BinaryRowData based job.

encoded job took 30s less than the binary row job

binary row data job



the coding computing overhead is really small in the whole flame graph(this is a rough demo, I can collect a more accurate flame graph is necessary  )


Rejected Alternatives

TBD

  • No labels