Discussion thread | here (<- link to https://lists.apache.org/list.html?dev@flink.apache.org) |
---|---|
Vote thread | here (<- link to https://lists.apache.org/list.html?dev@flink.apache.org) |
JIRA |
|
Release | 1.18 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This proposal is inspired by https://issues.apache.org/jira/browse/FLINK-16009
VARINT and ZIGZAG is used in the well known encoding format PROTOBUFFER, which use very little compute overhead to trade for obvious data size reduction. And Type-Length-Value format which is also the basic of the encoding is also necessary.
And The BinaryRowData structure now has below shortcomings:
- fixedLength part 8 bytes can be redundant for data types like INT, TINYINT etc.
- when writing long VARCHAR data longer than 7, the ensureCapacity can cause the whole array copy.
- the large size of BinaryRowData can also cause unnecessary cost in ROCKSDB serialization, flush, merge and lower cache hit rate(larger size of single record means less cached record number with same cache size configuration) .
So I think introducing the TLV and VARINT/ ZIGZAG encoding method into the flink-table module can obviously improve the state related performance. We can create a new kind of ROWDATA called CodedRowData.
Public Interfaces
Influenced public interface is serializer creation related. And I list it below:
enable creating VARLEN serializer for state
add public TypeSerializer<T> createVarLenSerializer(ExecutionConfig config) to
interface org.apache.flink.api.common.typeinfo.TypeInformation
this method is to create VARLEN related Serializer. The default action is just call createSerializer and return. After that we can change MapTypeInfo, InternalTypeInfo (for ROW kind) behaviour to create CodedRowDataSerializer.
add a config to represent whether to use the VARLEN encoding
add a config table.exec.enable-coded-rowdata to let user decide whether to enable the feature
change the keySerialer creation to VARLEN serialer
org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator
org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator
need to change the way it creates key serializers. we need to decide whether to call createSerializer or createVarLenSerializer according to the config mentioned above
modify the table operator creation logic(NOT PUBLIC)
StreamExec* like Join, Rank need to modify the SQL utils like keyselector, projection to make it use CodedRowData
change the snapshot restoring logic(NOT PUBLIC)
TypeSerializerSnapshot needs to be able to restore encoder related class information
Proposed Changes
CodedRowRWTest
This testing class is used to test encoding and decoding single elements like sint32, sint64, stringdata etc....
CodedRowDataTest
This testing class test the codedRowData RW like the BinaryRowDataTest do.
TAG mapping
The mapping is pretty much like the
Logical Type | WireType |
CHAR | VARCHAR | WIRETYPE_LENGTH_DELIMITED |
TINYINT | INTEGER | SMALLINT | WIRETYPE_VARINT |
BOOLEAN | WIRETYPE_VARINT |
BIGINT | WIRETYPE_VARINT |
FLOAT | WIRETYPE_FIXED32 |
DOUBLE | WIRETYPE_FIXED64 |
TIMESTAMP_WITHOUT_TIME_ZONE | WIRETYPE_TIMESTAMP (7 which is not used in PB type) |
ROW | WIRETYPE_LENGTH_DELIMITED |
DECIMAL | WIRETYPE_DECIMAL (6 which is not used in PB type) |
ARRAY | tag of the array element, this tag can appear multiple time, use WIRETYPE_NULL_VALUE = 3 to represent a null element. which is not applicable in PB |
TO BE COMPLETE |
compatibility change
We could create a new table config table.exec.enable-coded-rowdata to switch ON/OFF this Coded feature.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
nothing
- If we are changing behavior how will we phase out the older behavior?
no
- If we need special migration tools, describe them here.
no
- When will we remove the existing behavior?
NA
Test Plan
except the unit test, I plan to test a really world job for at least 2 weeks, monitoring CPU and RSS change.
At the same time, I will compare the output data to the parallel out, make sure they are 100% same.
Test Case
I have verify this change in my own 1.16 based flink sql environment.
This is a very simple single taskmanager job.
Two kafka streams → json_parse UDTF → left join → blackhole sink(exclude other influencing factor)
The left join stream has 21 VARCHAR row elements, the right one has 14 VARCHAR.
left kafka record is 5kb average, right kafka record is
And I have also tried to write the joined stream to some offline bigdata system and compare them exactly the same.
I can attach some safe SQL code
Test enviroment
my environment is Aliyun based container,
kubernetes.taskmanager.cpu = 1.0
memory setting is like below
ROCKSDB related config, missing configs are left default:
state.backend.rocksdb.block.cache-size | 64m |
state.backend.rocksdb.checkpoint.transfer.thread.num | 2 |
state.backend.rocksdb.memory.partitioned-index-filters | true |
state.backend.rocksdb.metrics.block-cache-capacity | true |
state.backend.rocksdb.metrics.block-cache-pinned-usage | true |
state.backend.rocksdb.metrics.block-cache-usage | true |
state.backend.rocksdb.metrics.cur-size-all-mem-tables | true |
state.backend.rocksdb.metrics.mem-table-flush-pending | true |
state.backend.rocksdb.metrics.num-running-flushes | true |
state.backend.rocksdb.metrics.total-sst-files-size | true |
state.backend.rocksdb.predefined-options | FLASH_SSD_OPTIMIZED |
Testing result
I can find below change compare to unchanged 1.16 job:
when start from earliest-offset,block-cache fill up is slower.
This is really easy to understand.
Encoded took about 10 min to fill the block cache
original binary row data took 5 min to fill the block cache
when start from earliest-offset,checkpoint size is about 30% smaller.
Two jobs started almost at the same time, the encoded one has much less state size
encoded job
origin binary rowdata
when start from earliest-offset, consuming rate is more stable and about 10% faster than the BinaryRowData based job.
encoded job took 30s less than the binary row job
binary row data job
the coding computing overhead is really small in the whole flame graph(this is a rough demo, I can collect a more accurate flame graph is necessary )
Rejected Alternatives
TBD