Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

In Flink 1.2.0, NormalizedKeySorter is used during sort-approach operations regardless of the type of sorting key and execution environment. As a result,  in some situations, more efficient operations can be used instead of generic ones. One of examples is Unsafe.copyMemory that allows Flink to copy memory in any length, however, it checks memory alignment first before actually copying. Thus, it consumes unnecessary CPU cycles when copying data that we already know it always fit into memory segment, such situation is swapping 2 records during sorting. Here, we can leverage knowledge of sorting key from TypeComparator and generate custom fixed-byte operators to manipulate those records instead of using copyMemoryBased on the same knowledge, we can also unroll loop during key comparison as well as remove unnecessary branching if key can fully determined sort order. 

...

Execution environment is also important. Because Flink serializes the key of a record to MemorySegment in big-endian format regardless endianness of the worker, when retrieving it back for comparison it need to reverse the bytes first. In this case, we compensate this at serialization process in which we will need to do byte-reversing only once for each record instead of every comparison. This means that the number of byte-reversing will become O(n) instead of O(n log n).

...

 

Sorter \ No. records

10,000

100,000

1,000,000

Original NormalizedKeySorter

4.34 ms

61.79 ms

582.88 ms

Flink-3722 NormalizedKeySorter

3.00 ms

35.90 ms

386.95 ms

DividedByConstant

3.49 ms

45.14 ms

436.22 ms

UsingBitwiseOperators

3.33 ms

42.22 ms

419.98 ms


Table 2 : Comparison between 3 approaches of optimization for division and modulo

 

Because these optimization techniques depend on knowledge of each execution context, for example, data-type of sorting key and endianness of machine, we would need to apply applicable optimizations on the fly. In other words, we will dynamically generate the most efficient sorter for each context on each worker. 

External Libraries 

In this proposal, we plan to use 2 external libraries that are the same as FLINK-3599 : Code Generation in Serializers is using, namely

  • Janino: A minimal Java compiler that we will use to compile the generated code.
  • FreeMarker : a Java template engine.


...

This method first checks whether If code generation configuration is enabled or not. If it is not, instantiate original NormalizedKeySorter. If the feature is enabled, the method checks whether there is a constructor of the corresponding source code already in the cache using templateModel.getGeneratedCodeFilename() as a key. If the constructor is not there yet, ask TemplateManager to instantiate the sorter and cache the constructor. Otherwise use the cached constructor to instantiate the sorter.

SorterTemplateModel : responsible for constructing suitable code String of each optimizing section in a template file. These Strings will be exposed via getTemplateVariables() to TemplateManager.

          • Attributes
            • String templateName

            • TypeComparator typeComparator

            • ArrayList<Integer> fixedByteChunks

...

Based on number of bytes of the sorting key from typeComparator and record pointer(8 bytes), this method splits the bytes into fixed-byte chunks.

            • generateSwapProcedures()

Generate a sequence of MemorySegment’s fixed-byte operators to swap 2 records.

...

Due to the fact that Flink serializes records into big-endian format, it needs to reverse bytes when retrieving 2 records to compare on little-endian machine. Hence, in this case, it is more efficient to move byte reversing to serialize process instead. In essence, this method generates a sequence of operators to reverse bytes after writing a record to a MemorySegment for little endian machine.

            • generateCompareProcedures()

Generate a sequence of MemorySegment’s fixed-byte operators to retrieve keys of 2 records for unsigned comparison. If the key is not fully determined order, add compare record operators to the sequence, otherwise just “return 0”.

...

This method returns HashMap of the generated sequences for each section in NormalizedKeySorter. This is consumed by TemplateManager.

            • getGeneratedCodeFilename()

This method determines name of the generated sorter based on chunk size and determinant of the key, for example LongIntFullyDeterminedKeySorter, or LongLongNonFullyDeterminedKeySorter. This is used by Janino to load the generated code as well as caching.

...

          • Methods
            • getGeneratedCode( SorterTemplateModel templateModel )

Use templateModel.getTemplateVariables to render the corresponding blocks in “sorter.ftlh” template and store it at “taskmanagertaskmanager.temp.dirs”dirs

sorter-template.ftlh : generic sorter template ( based on NormalizedKeySorter code )

...

2. Changes in existing Flink code

Classes that use NormalizedKeySorter will be updated at locations that they instantiate NormalizedKeySorter. As a result, code generation will be done on TaskManager. Therefore, instances of generated classes won’t need to be serialized and shipped across machines. Here is the list of classes that directly instantiate NormalizedKeySorter, and therefore will be directly affected by this improvement.

...