Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Another potential improvement could be eliminating expensive mathematical operators such as division and modulo, for example, when computing memory segment index.  One proposed solution that trying to improvement this problem is  Greg Hogan ’s FLINK-3722. Another possible ones could be using bitwise operators or replacing divisors with a constant that we know at compile-time [1]. From our experiment, we have found that Greg's solution is the best one, so we will base our implementation on his pull-request.

 

Sorter \ No. records

10,000

100,000

1,000,000

Original NormalizedKeySorter

4.34 ms

61.79 ms

582.88 ms

Flink-3722 NormalizedKeySorter

3.00 ms

35.90 ms

386.95 ms

DividedByConstant

3.49 ms

45.14 ms

436.22 ms

UsingBitwiseOperators

3.33 ms

42.22 ms

419.98 ms


Table 2 : Comparison between 3 approaches of optimization for division and modulo

 

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

Because these optimization techniques depend on knowledge of each execution context, for example, data-type of sorting key and endianness of machine, we would need to apply applicable optimizations on the fly. In other words, we will dynamically generate the most efficient sorter for each context on each worker.

 

External Libraries

 

In this proposal, we plan to use 2 external libraries that are the same as FLINK-3599 : Code Generation in Serializers is using, namely

  • Janino: A Java compiler that we will use to compile the generated code.
  • FreeMarker : a Java template engine.


Public Interfaces

  • A configuration option for enabling / disabling code generation feature.

Proposed Changes

1. Code generation functionalities

We plan to abstract these functionalities from Flink code. We will create 3 new classes and 1 template file to accomplish that.

Class Diagram

Image Added

SorterFactory : responsible for instantiating a sorter object. It is an interface between core flink and code generation processes.

          • Attributes

            • TemplateManager templateManager

            • Janino’s ClassLoader classLoader

            • HashMap<String, Constructor> generatedSorter

          • Methods

            • createSorter(ExecutionConfig config, TypeSerializer serializer, TypeCompartor comparator, ArrayList<MemorySegment>  memory)

This method first checks whether If code generation configuration is enabled or not. If it is not, instantiate original NormalizedKeySorter. If the feature is enabled, the method checks whether there is a constructor of the corresponding source code already in the cache using templateModel.getGeneratedCodeFilename() as a key. If the constructor is not there yet, ask TemplateManager to instantiate the sorter and cache the constructor. Otherwise use the cached constructor to instantiate the sorter.

 

SorterTemplateModel : responsible for constructing suitable code String of each optimizing section in a template file. These Strings will be exposed via getTemplateVariables() to TemplateManager.

 

          • Attributes

 

            • String templateName

            • TypeComparator typeComparator

            • ArrayList<Integer> fixedByteChunks

 

          • Methods

 

            • generatedSequenceFixedByteChunks()

Based on number of bytes of the sorting key from typeComparator and record pointer(8 bytes), this method splits the bytes into fixed-byte chunks.

            • generateSwapProcedures()

Generate a sequence of MemorySegment’s fixed-byte operators to swap 2 records.

            • generateWriteProcedures()

Due to the fact that Flink serializes records into big-endian format, it needs to reverse bytes when retrieving 2 records to compare on little-endian machine. Hence, in this case, it is more efficient to move byte reversing to serialize process instead. In essence, this method generates a sequence of operators to reverse bytes after writing a record to a MemorySegment for little endian machine.

            • generateCompareProcedures()

Generate a sequence of MemorySegment’s fixed-byte operators to retrieve keys of 2 records for unsigned comparison. If the key is not fully determined order, add compare record operators to the sequence, otherwise just “return 0”.

            • getTemplateVariables()

This method returns HashMap of the generated sequences for each section in NormalizedKeySorter. This is consumed by TemplateManager.

            • getGeneratedCodeFilename()

This method determines name of the generated sorter based on chunk size and determinant of the key, for example LongIntFullyDeterminedKeySorter, or LongLongNonFullyDeterminedKeySorter. This is used by Janino to load the generated code as well as caching.


TemplateManager : responsible for generating source code file if it hasn’t been created yet.

          • Attributes
            • String  templatePath
            • String  generatingPath ( based on Flink's temporary directory )
            • HashMap<String, Boolean> generatedSorter
          • Methods
            • getGeneratedCode( SorterTemplateModel templateModel )

Use templateModel.getTemplateVariables to render the corresponding blocks in “sorter.ftlh” template and store it at “taskmanager.temp.dirs”

 

Sequence Diagram of Code Generation 

Image Added

Code Generation Logics in SorterTemplateModel

Image Added

 

2. Changes in existing Flink code

 

Classes that use NormalizedKeySorter will be updated at locations that they instantiate NormalizedKeySorter. As a result, code generation will be done on TaskManager. Therefore, instances of generated classes won’t need to be serialized and shipped across machines. Here is the list of classes that directly instantiate NormalizedKeySorter, and therefore will be directly affected by this improvement.

 


 

Compatibility, Deprecation, and Migration Plan

This change should not affect Flink’s functionalities, but It might have small impact on job startup time. More accurate result will be provided later. Also, we will introduce a configuration option that can be used to enable or disable this code generation feature.

Test Plan

We will test the correctness by creating several models and generating corresponding code and then verify whether the generated code sorts the data properly.

Rejected Alternatives

open for discussionIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

References

  1. https://blogs.msdn.microsoft.com/devdev/2005/12/12/integer-division-by-constants/

...