Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

Discussion thread

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Scalar Python UDF (FLIP-58) has already been supported in release 1.10 and Python UDTF will be supported in the coming release of 1.11. In release 1.10, we focused on supporting UDF features and did not make many optimizations in terms of performance. Although we have made a lot of optimizations in master, cython Cython can further greatly improve the performance of Python UDF.

Background

Cython is an optimising optimizing static compiler for both the Python programming language and the extended Cython programming language (based on Pyrex). It makes writing C extensions for Python as easy as Python itself.

...

We can write an easy example to understand the difference between Cython and your module created with Python.

This example (from cython from 

Table of Contents
Cython official web) is about the integration of the function f (x) in [a, b].

...

nCalls

Pure code time(s)

Cython Code time(s)

Speedup

10000

2.165466

0.012378

174x

50000

10.803928

0.061041

176x

100000

21.277053

0.114681

185x

200000

41.897756

0.219027

191x

500000

105.50863

0.544354

193x

1000000

218.101658

1.07485

202x

Image Added

As we can see, although the syntax of cython Cython is similar to python, cython Cython can bring huge performance improvements.

Proposed Changes

Overview

  • Introduces cython Cython implementation of coder and operations
  • Changes the progress of Doc changes for building sdist and wheel package packages from source code
  • Introduces github actions Solutions for packages building wheel package

Introduce

...

Cython Implementation of

...

Coder and

...

Operations

Workflow

...

of Processing Data

Image Added

As we can see from the workflow,

...

By optimizing the data structure and algorithm logic used by these two modules, compared to PyFlink 1.10, we have optimized about 5X.

If we use cythonCython, we can optimize 6X on the basis of master code. Compared to PyFlink 1.10, it is optimized by 30X.

...

Next, let's take a look at the test code we used and compare it with the detailed test performance data of release-1.10, master and code optimized with cythonCython.

Test Code

@udf(input_types=[DataTypes.INT(False)], result_type=DataTypes.INT(False))
def inc(x):
  return x + 1


t_env.register_function("inc", inc)
num_rows = 100000000
num_columns = 10

select_list = ["inc(c%s)" % i for i in range(num_columns)]
t_env.register_table_sink(
  "sink",
  PrintTableSink(
      ["c%s" % i for i in range(num_columns)],
      [DataTypes.INT(False)] * num_columns))

t_env.from_table_source(MultiRowColumnTableSource(num_rows, num_columns)) \
  .select(','.join(select_list)) \
  .insert_into("sink")

beg_time = time.time()
t_env.execute("perf_test")
print("consume time: " + str(time.time() - beg_time))

...

rows, columns

PyFlink 1.10

master

master-cythonCython

cython Cython speedup

(release-1.10)

cython Cython seedup

(master)

10kw, 1

2154s

441s

70s

30X

6x

10kw, 10

5697s

1221s

254s

22X

5X

Image Added


From the data in the table and diagram, we can find that using cython Cython can greatly improve our performance.

...

Docs Changes for Building from Source Code

We need to add two steps one step at the beginning to install the relational dependencies of cython and apache-beam after introducing cython optimization for compiling cython code in the progress of building sdist and wheel packages in building from source code doc page[1].

cd flink-python

# pip install dependencies

# 1. pip install apache-beam

pip install

apache-beam==2.19.0

# 2. pip install cython 

pip install cython==0.28.1

-r dev/requirements.txt


Next, we can build sdist and wheel package in flink-python directory

python setup.py sdist bdist_wheel


The sdist and wheel package will be found under ./flink-python/dist/. Either of them could be used for pip installation, such as:

python -m pip install dist/*.tar.gz

NOTE: Although we have added two front steps to build the sdist and wheel packages from source, for users, they still directly pip install apache-flink

Release Changes for Deploy Python artifacts to PyPI

Release Changes for Deploy Python artifacts to PyPI

We We need to add a step to download wheels from github actions the Artifacts page of the Azure Pipeline built results and upload corresponding wheels to PyPI[2].

#1

# 1. Downloads wheels

in github actions workflow

from Azure Artifacts

Image Added

# 2. Put the downloaded wheel packages in dist folder of flink-python module

cd flink-python

mkdir dist(optional)

## move all downloaded wheel packages to the dist folder(manually)

# 3. Run the supported script to restore executable permission

## Script files in packages downloaded from Azure will lose executable permissions

dev/restore-executable.sh

#3

#2. Unzip the packages, e.g.

unzip apche_flink-1.11.dev0-cp35-cp35m-linux_x86_64.zip

# you will get apche_flink-1.11.dev0-cp35-cp35m-linux_x86_64.whl

#3

Upload the wheel packages to PyPI, e.g.

twine upload --repository-url https://upload.pypi.org/legacy

/ apche_flink-1.11.dev0-cp35-cp35m-linux_x86_64.

/ dist/*.whl

Solutions For Packages Building

...

After investigating some mainstream Python projects, we found that there are mainly the following three solutions for building multiple cross platform wheel packages in PyFlink :

  1. Creates another project to build wheel packagepackages. Apache-beam created a beam-wheels repository for the sole purpose of building wheel packages.
  2. Introduces github actions to build wheel packages.
  3. Adds building wheel packages logic to current Travis or Azure Devops CI of Flink
  4. Introduces github actions to build wheel packages

Props

Cons

Solution 1

We can learn from beam-wheels

1. Beam have already discussed about to change this solution to github actions as solution

3

2.

We need

Solution 3

1. It won’t affect current CI logic

Need to create another repository

Solution 2

We can directly add building wheel packages logic to current Travis or Azure Devops CI

1.Github actions comes with a strong level of integration with GitHub

2.We can build our wheel packages very simply by using many action tools,such as 

(actions/setup-java,

actions/setup-python,

actions/upload-artifact)

3. It is very convenient to download the built wheel packages

It is

1. It introduces another build CI system of github actions which increases the burden of maintaining

2. Github action is still very young

Solution 3

...

The logic of building wheel package could be integrated into the current Azure CI directly

N/A

Solution 3 is preferred now as we already have built stable Azure CI in Flink and it is convenient to add the logic of building wheel packages to Azure CI.



Public Interfaces

Coder Cython Implementation

The current implementation of the coder is

...

I have configured on my test repo that every push on Master branch and Release created will trigger the workflow of building wheel packages. For saving resources, we can also configure a daily built in Master branch.

on:

  push:

    branches: [ master ]

  release:

    types: [ created ]

We can download the corresponding wheel package in the workflow page of the github action once the reconstruction is successful.The wheel packages will automatically expire after 90 days.

Public Interfaces

Coder Cython Implementation

The current implementation of the coder is a pure python implementation.We will add two python files to support cython Cython implementation of coder.

  • fast_coder_impl.pxd
  • fast_coder_impl.pyx

fast_coder_impl.pxd will define the corresponding declaration of coder and fast_coder_impl.pyx will provide specific implementation.

# fast_coder_impl.pxd

cdef class FlattenRowCoderImpl(StreamCoderImpl):

    cdef list _input_field_coders

    cdef list _output_field_coders

    cdef unsigned char* _input_field_type

    cdef unsigned char* _output_field_type

    cdef libc.stdint.int32_t _input_field_count

    cdef libc.stdint.int32_t _output_field_count

    cdef libc.stdint.int32_t _input_leading_complete_bytes_num

    cdef libc.stdint.int32_t _output_leading_complete_bytes_num

    cdef libc.stdint.int32_t _input_remaining_bits_num

    cdef libc.stdint.int32_t _output_remaining_bits_num

    cdef bint*_null_mask

    cdef unsigned char*_null_byte_search_table

    cdef char* _output_data

    cdef char* _output_row_data

    cdef size_t _output_buffer_size

    cdef size_t _output_row_buffer_size

    cdef size_t _output_pos

    cdef size_t _output_row_pos

    cdef size_t _input_pos

    cdef size_t _input_buffer_size

    cdef char* _input_data

    cdef list row

    cpdef _init_attribute(self)

    cdef _consume_input_data(self, WrapperInputElement wrapper_input_element, size_t size)

    cpdef _write_null_mask(self, value)

    cdef _read_null_mask(self)

    cdef _copy_before_data(self, WrapperFuncInputStream wrapper_stream, OutputStream out_stream)

    cdef _copy_after_data(self, OutputStream out_stream)

    cpdef _dump_field(self, unsigned char field_type, CoderType field_coder, item)

    cdef _dump_row(self)

    cdef _dump_byte(self, unsigned char val)

    cdef _dump_smallint(self, libc.stdint.int16_t v)

    cdef _dump_int(self, libc.stdint.int32_t v)

    cdef _dump_bigint(self, libc.stdint.int64_t v)

    cdef _dump_float(self, float v)

    cdef _dump_double(self, double v)

    cdef _dump_bytes(self, char*b)

    cpdef _load_row(self)

    cpdef _load_field(self, unsigned char field_type, CoderType field_coder)

    cdef unsigned char _load_byte(self) except? -1

    cdef libc.stdint.int16_t _load_smallint(self) except? -1

    cdef libc.stdint.int32_t _load_int(self) except? -1

    cdef libc.stdint.int64_t _load_bigint(self) except? -1

    cdef float _load_float(self) except? -1

    cdef double _load_double(self) except? -1

    cdef bytes _load_bytes(self)

Operation Cython Implementation

Similarly to coder, We will add two python files to support cython Cython implementation of Operation.

...

fast_operations.pxd will define the corresponding declaration of Operations and fast_operations.pyx will provide specific implementation.

# fast_operations.pxd

cdef class StatelessFunctionOperation(Operation):

    cdef Operation consumer

    cdef StreamCoderImpl _value_coder_impl

    cdef dict variable_dict

    cdef list user_defined_funcs

    cdef libc.stdint.int32_t _func_num

    cdef libc.stdint.int32_t _constant_num

    cdef object func

    cpdef generate_func(self, udfs)

    @cython.locals(func_args=str, func_name=str)

    cpdef str _extract_user_defined_function(self, user_defined_function_proto)

    @cython.locals(args_str=list)

    cpdef str _extract_user_defined_function_args(self, args)

    @cython.locals(j_type=libc.stdint.int32_t, constant_value_name=str)

    cpdef str _parse_constant_value(self, constant_value)

cdef class ScalarFunctionOperation(StatelessFunctionOperation):

    pass

cdef class TableFunctionOperation(StatelessFunctionOperation):

    pass

...

Pipeline of Building Wheel Packages

We need to add a workflow pipeline to build python wheel packages to Azure CI of Flink.

#

This workflow will build PyFlink wheel packages.

name: Build Python Wheel Package

on:

  push:

    branches: [ master ]

  release:

    types: [ created ]

env:

  VERSION: 1.11-SNAPSHOT

jobs:

  build:

    runs-on: ubuntu-latest

    strategy:

      matrix:

        java: [ '1.8' ]

    name: Build Flink - JDK - ${{ matrix.java }}

    steps:

    - uses: actions/checkout@v2

    - name: Set up JDK ${{ matrix.java }}

      uses: actions/setup-java@v1

      with:

        java-version: ${{ matrix.java }}

    - name: Build with Maven

      run: |

        export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g"

        export MAVEN_CLI_OPTS="--no-transfer-progress"

        mkdir -p ~/.m2

        mvn $MAVEN_CLI_OPTS clean install -Dmaven.javadoc.skip=true -DskipTests -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN

        rm -rf ~/.m2/repository/org/apache/flink

    - uses: actions/upload-artifact@v1

      with:

        name: maven-result

        path: flink-dist/target/flink-${{ env.VERSION }}-bin/flink-${{ env.VERSION }}

  build-python:

    runs-on: ${{ matrix.os }}

    needs: build

    strategy:

      matrix:

        os: [ubuntu-latest, macos-latest]

        python-version: [3.5, 3.6, 3.7]

    name: Build Python - Python${{ matrix.os }}/${{ matrix.python-version }}

    steps:

    - uses: actions/checkout@v2

    - uses: actions/download-artifact@v1

      with:

        name: maven-result

build-python-wheels.yml

 # 1. compile Flink source code

jobs:

- job compie

   - script: STAGE=compile ${{parameters.environment}} ./tools/azure_controller.sh compile

     displayName: Build

   - task: PublishPipelineArtifact@1

     inputs:

       path: $(CACHE_FLINK_DIR)

       artifact: FlinkCompileCacheDir-${{parameters.stage_name}}

 # 2. build wheel packages

- job: BuildWheels

    dependsOn: compile_${{parameters.stage_name}}

  strategy:

     matrix:

       linux:

         vm-label: 'ubuntu-16.04'

       mac:

         vm-label: 'macOS-10.15'

  pool:

     vmImage: $(vm-label)

  steps:

    # download artifacts

    - task: DownloadPipelineArtifact@2

      inputs:

        path: $(CACHE_FLINK_DIR)

        artifact: FlinkCompileCacheDir-${{parameters.stage_name}}

    # recreate "build-target" symlink for python tests

    - script: |

        mkdir -p

        path:

flink-dist/target/flink-$

{{ env.VERSION }}-bin/flink-${{ env.VERSION }}

(VERSION)-bin

        ln -snf $(CACHE_FLINK_DIR)/flink-dist/target/flink-$(VERSION)-bin/flink-$(VERSION) `pwd`/flink-dist/target/flink-$(VERSION)-bin/flink-$(VERSION)

      displayName: Recreate 'build-target' symlink

    - script:

    - name: Set up Python ${{ matrix.python-version }}

      uses: actions/setup-python@v1

      with:

        python-version: ${{ matrix.python-version }}

    - name: Install dependencies

      run: |

        cd flink-python

        python -m pip install --upgrade pip setuptools

        pip install wheel

        pip install apache-beam==2.19.0

        pip install cython==0.28.1

    - name: build bdist wheel

      run:

|

        cd flink-python

        python setup.py bdist_wheel

        bash dev/build-wheels.sh

      displayName: Build wheels

    -

id

task: PublishPipelineArtifact@0

      inputs:

getwheelname

      name

        artifactName:

get wheel name

'wheel_$(Agent.OS)_$(Agent.JobName)'

        targetPath: '

      run: |

        cd

flink-python/dist

        echo "::set-output name=file::$(ls *.whl)"

    - uses: actions/upload-artifact@v1

      with:

        name: ${{ steps.getwheelname.outputs.file }}

        path: flink-python/dist/${{ steps.getwheelname.outputs.file }}

'

We will include the build-python-wheels.yml into the nightly builds, so that we can collect daily build wheel packages information

/tools/azure-pipelines/build-apache-repo.yml

jobs:

- stage: cron_build

  … # other jobs

- template: build-python-wheels.yml #  Add a job of building wheel packages 

When a new release is released, we will manually trigger a nightly build on the release branch. After that, we can download the wheel packages and push them to PyPI.

Compatibility, Deprecation, and Migration Plan

...

Implementation Plan

  1. Support coder cython Cython implementation
  2. Support operation cython Cython implementation
  3. Introduce github actions to build multiple platform Add building wheel packages to Azure CI

[1] https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink

...