Status
Current state:
Discussion thread:
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Motivation
Scalar Python UDF (FLIP-58) has already been supported in release 1.10 and Python UDTF will be supported in the coming release of 1.11. In release 1.10, we focused on supporting UDF features and did not make many optimizations in terms of performance. Although we have made a lot of optimizations in master, cython can further greatly improve the performance of Python UDF.
Background
Cython is an optimising static compiler for both the Python programming language and the extended Cython programming language (based on Pyrex). It makes writing C extensions for Python as easy as Python itself.
Examples
We can write an easy example to understand the difference between Cython and your module created with Python.
This example (from cython official web) is about the integration of the function f (x) in [a, b].
# Pure Python Code def f(x): return x ** 2 - x def integrate_f(a, b, N): s = 0 dx = (b - a) / N for i in range(N): s += f(a + i * dx) return s * dx |
# Cython Code cdef double f(double x): return x ** 2 - x cpdef integrate_f(double a, double b, int N): cdef int i cdef double s, dx s = 0 dx = (b - a) / N for i in range(N): s += f(a + i * dx) return s * dx |
Check out the table below which shows how much speed Cython gave us for different number function calls. We got over 150X speedup with Cython!
nCalls | Pure code time(s) | Cython Code time(s) | Speedup |
10000 | 2.165466 | 0.012378 | 174x |
50000 | 10.803928 | 0.061041 | 176x |
100000 | 21.277053 | 0.114681 | 185x |
200000 | 41.897756 | 0.219027 | 191x |
500000 | 105.50863 | 0.544354 | 193x |
1000000 | 218.101658 | 1.07485 | 202x |
As we can see, although the syntax of cython is similar to python, cython can bring huge performance improvements.
Proposed Changes
Overview
- Introduces cython implementation of coder and operations
- Changes the progress of building sdist and wheel package from source code
- Introduces github actions for building wheel package
Introduce cython implementation of coder and operations
Workflow Of Processing Data
As we can see from the workflow,
- PVM is bottleneck because PVM is much slower than JVM in performance
- The bundle_processor is part of Beam Portability Framework utilized by PyFlink. Beam has done a lot of optimizations on it, so this part is not the part we need to optimize.
- We can optimize the Coder and Python User-Defined Operations modules to improve our performance.
By optimizing the data structure and algorithm logic used by these two modules, compared to PyFlink 1.10, we have optimized about 5X.
If we use cython, we can optimize 6X on the basis of master code. Compared to PyFlink 1.10, it is optimized by 30X.
Performance Improvements
Next, let's take a look at the test code we used and compare it with the detailed test performance data of release-1.10, master and code optimized with cython.
Test Code
@udf(input_types=[DataTypes.INT(False)], result_type=DataTypes.INT(False))
|
End To End Performance comparison
Check out the table below which shows how much speed Cython gave us for different rows and columns test data. When there is only one column of data, the inc func is called once for each row of data, so all the overhead lies in the framework. When there are ten columns of data, the inc func will be called ten times for each row of data, Therefore, compared with the case of 10 rows and one column, the more time is spent in calculation, so the end-to-end promotion multiple is not as large as that of 10 rows and one column.
rows, columns | PyFlink 1.10 | master | master-cython | cython speedup (release-1.10) | cython seedup (master) |
10kw, 1 | 2154s | 441s | 70s | 30X | 6x |
10kw, 10 | 5697s | 1221s | 254s | 22X | 5X |
From the data in the table and diagram, we can find that using cython can greatly improve our performance.
Doc Changes for Building Sdist And Wheel Package From Source Code
Docs Changes for Building from Source Code
We need to add two steps at the beginning to install the dependencies of cython and apache-beam after introducing cython optimization in the progress of building sdist and wheel packages in building from source code doc page[1].
# 1. pip install apache-beam pip install apache-beam==2.19.0 # 2. pip install cython pip install cython==0.28.1 |
Next, we can build sdist and wheel package in flink-python directory
python setup.py sdist bdist_wheel |
The sdist and wheel package will be found under ./flink-python/dist/. Either of them could be used for pip installation, such as:
python -m pip install dist/*.tar.gz |
NOTE: Although we have added two front steps to build the sdist and wheel packages from source, for users, they still directly pip install apache-flink
Release Changes for Deploy Python artifacts to PyPI
We need to add a step to download wheels from github actions and upload corresponding wheels to PyPI[2].
#1. Downloads wheels in github actions workflow #2. Unzip the packages, e.g. unzip apche_flink-1.11.dev0-cp35-cp35m-linux_x86_64.zip # you will get apche_flink-1.11.dev0-cp35-cp35m-linux_x86_64.whl #3 Upload the wheel packages to PyPI, e.g. twine upload --repository-url https://upload.pypi.org/legacy/ apche_flink-1.11.dev0-cp35-cp35m-linux_x86_64.whl |
Solutions For Packages Building
After the introduction of cython, in addition to the sdist package installation, we will also provide the wheel package installation method.
After investigating some mainstream Python projects, we found that there are mainly the following three solutions for building multiple platform wheel packages in PyFlink :
- Creates another project to build wheel package.Apache-beam created a beam-wheels repository for the sole purpose of building wheel packages.
- Adds building wheel packages logic to current Travis or Azure Devops CI of Flink
- Introduces github actions to build wheel packages
Props | Cons | |
Solution 1 | We can learn from beam-wheels | 1. Beam have already discussed about to change this solution to github actions as solution 3 2. We need to create another repository |
Solution 2 | We can directly add building wheel packages logic to current Travis or Azure Devops CI | |
Solution 3 | 1. It won’t affect current CI logic 2. We can build our wheel packages very simply by using many action tools,such as (actions/setup-java, actions/setup-python, actions/upload-artifact) 3. It is very convenient to download the built wheel packages | It is very young |
Solution 3 is prefered now as it won’t affect current CI logic and we have learned that many projects have introduced github actions such as spark, arrow and beam.
I have configured on my test repo that every push on Master branch and Release created will trigger the workflow of building wheel packages. For saving resources, we can also configure a daily built in Master branch.
on: push: branches: [ master ] release: types: [ created ] |
We can download the corresponding wheel package in the workflow page of the github action once the reconstruction is successful.The wheel packages will automatically expire after 90 days.
Public Interfaces
Coder Cython Implementation
The current implementation of the coder is a pure python implementation.We will add two python files to support cython implementation of coder.
- fast_coder_impl.pxd
- fast_coder_impl.pyx
fast_coder_impl.pxd will define the corresponding declaration of coder and fast_coder_impl.pyx will provide specific implementation.
# fast_coder_impl.pxd cdef class FlattenRowCoderImpl(StreamCoderImpl): cdef list _input_field_coders cdef list _output_field_coders cdef unsigned char* _input_field_type cdef unsigned char* _output_field_type cdef libc.stdint.int32_t _input_field_count cdef libc.stdint.int32_t _output_field_count cdef libc.stdint.int32_t _input_leading_complete_bytes_num cdef libc.stdint.int32_t _output_leading_complete_bytes_num cdef libc.stdint.int32_t _input_remaining_bits_num cdef libc.stdint.int32_t _output_remaining_bits_num cdef bint*_null_mask cdef unsigned char*_null_byte_search_table cdef char* _output_data cdef char* _output_row_data cdef size_t _output_buffer_size cdef size_t _output_row_buffer_size cdef size_t _output_pos cdef size_t _output_row_pos cdef size_t _input_pos cdef size_t _input_buffer_size cdef char* _input_data cdef list row cpdef _init_attribute(self) cdef _consume_input_data(self, WrapperInputElement wrapper_input_element, size_t size) cpdef _write_null_mask(self, value) cdef _read_null_mask(self) cdef _copy_before_data(self, WrapperFuncInputStream wrapper_stream, OutputStream out_stream) cdef _copy_after_data(self, OutputStream out_stream) cpdef _dump_field(self, unsigned char field_type, CoderType field_coder, item) cdef _dump_row(self) cdef _dump_byte(self, unsigned char val) cdef _dump_smallint(self, libc.stdint.int16_t v) cdef _dump_int(self, libc.stdint.int32_t v) cdef _dump_bigint(self, libc.stdint.int64_t v) cdef _dump_float(self, float v) cdef _dump_double(self, double v) cdef _dump_bytes(self, char*b) cpdef _load_row(self) cpdef _load_field(self, unsigned char field_type, CoderType field_coder) cdef unsigned char _load_byte(self) except? -1 cdef libc.stdint.int16_t _load_smallint(self) except? -1 cdef libc.stdint.int32_t _load_int(self) except? -1 cdef libc.stdint.int64_t _load_bigint(self) except? -1 cdef float _load_float(self) except? -1 cdef double _load_double(self) except? -1 cdef bytes _load_bytes(self) |
Operation Cython Implementation
Similarly to coder, We will add two python files to support cython implementation of Operation.
- fast_operations.pxd
- fast_operations.pyx
fast_operations.pxd will define the corresponding declaration of Operations and fast_operations.pyx will provide specific implementation.
# fast_operations.pxd cdef class StatelessFunctionOperation(Operation): cdef Operation consumer cdef StreamCoderImpl _value_coder_impl cdef dict variable_dict cdef list user_defined_funcs cdef libc.stdint.int32_t _func_num cdef libc.stdint.int32_t _constant_num cdef object func cpdef generate_func(self, udfs) @cython.locals(func_args=str, func_name=str) cpdef str _extract_user_defined_function(self, user_defined_function_proto) @cython.locals(args_str=list) cpdef str _extract_user_defined_function_args(self, args) @cython.locals(j_type=libc.stdint.int32_t, constant_value_name=str) cpdef str _parse_constant_value(self, constant_value) cdef class ScalarFunctionOperation(StatelessFunctionOperation): pass cdef class TableFunctionOperation(StatelessFunctionOperation): pass |
Workflow of Building Wheel Packages
We need to add a workflow to build python wheel packages
# This workflow will build PyFlink wheel packages. name: Build Python Wheel Package on: push: branches: [ master ] release: types: [ created ] env: VERSION: 1.11-SNAPSHOT jobs: build: runs-on: ubuntu-latest strategy: matrix: java: [ '1.8' ] name: Build Flink - JDK - ${{ matrix.java }} steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} uses: actions/setup-java@v1 with: java-version: ${{ matrix.java }} - name: Build with Maven run: | export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g" export MAVEN_CLI_OPTS="--no-transfer-progress" mkdir -p ~/.m2 mvn $MAVEN_CLI_OPTS clean install -Dmaven.javadoc.skip=true -DskipTests -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN rm -rf ~/.m2/repository/org/apache/flink - uses: actions/upload-artifact@v1 with: name: maven-result path: flink-dist/target/flink-${{ env.VERSION }}-bin/flink-${{ env.VERSION }} build-python: runs-on: ${{ matrix.os }} needs: build strategy: matrix: os: [ubuntu-latest, macos-latest] python-version: [3.5, 3.6, 3.7] name: Build Python - Python${{ matrix.os }}/${{ matrix.python-version }} steps: - uses: actions/checkout@v2 - uses: actions/download-artifact@v1 with: name: maven-result path: flink-dist/target/flink-${{ env.VERSION }}-bin/flink-${{ env.VERSION }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v1 with: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | cd flink-python python -m pip install --upgrade pip setuptools pip install wheel pip install apache-beam==2.19.0 pip install cython==0.28.1 - name: build bdist wheel run: | cd flink-python python setup.py bdist_wheel - id: getwheelname name: get wheel name run: | cd flink-python/dist echo "::set-output name=file::$(ls *.whl)" - uses: actions/upload-artifact@v1 with: name: ${{ steps.getwheelname.outputs.file }} path: flink-python/dist/${{ steps.getwheelname.outputs.file }} |
Compatibility, Deprecation, and Migration Plan
This FLIP won’t destroy compatibility.
Implementation Plan
- Support coder cython implementation
- Support operation cython implementation
- Introduce github actions to build multiple platform wheel packages
[1] https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink