Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors:  Dian Fu, Jincheng Sun

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-17146

Release1.11

Status

Current state["Under Discussion"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Pandas dataframe is the de-facto table data format of the standard to work with tabular data in Python community. Flink PyFlink table is Flink’s representation of the structured table tabular data in Python language. It would be nice to provide the ability to convert between the Flink PyFlink table and Pandas dataframe in Flink Python PyFlink Table API . which has the following benefits:

  • It provides users the ability to

...

  • switch between PyFlink and Pandas seamlessly when processing data in Python language. Users could process data using one execution engine and switch to another seamlessly. For example, it may happen that users have already got a Pandas dataframe at hand and want to perform some expensive transformation of it. Then they could

...

  • convert it to

...

  • a PyFlink table and leverage the power of Flink engine. Users could also

...

  • convert a

...

  • PyFlink table to Pandas dataframe and perform transformation of it with the rich

...

  • functionalities provided by the Pandas

...

  • ecosystem.

Goals

  1. Support conversion between Flink Table and Pandas DataFrame in Flink Python Table API

Non-Goals

...

  • No intermediate connectors are needed when converting between them.

Public Interfaces

We will introduce interfaces `from_pandas` in `TableEnvironment` and `to_pandas` in `Table`.

from_pandas

This interface is used to convert a Pandas dataframe to a Flink PyFlink table. 

The signature of from`from_pandas pandas` is as following:

class TableEnvironment(object):

    def from_pandas(self, pdf: pd.DataFrame, schema: Union[RowType

, list, tuple

, List[str], Tuple[str], List[DataType], Tuple[DataType]] = None, splits_num: int = None) -> Table:

        pass


The argument schema `schema` is used to specify the schema of the result table:

  1. If it’s not specified, the schema of the result table will be inferred from the given Pandas dataframe.
  2. If it’s specified and the type is RowType, the specified type will be used as the schema of the result table.
  3. If it’s specified as list or tuple of str, the specified list or tuple will be used as the column names of the result table and the column types are inferred from the given Pandas dataframe.
  4. If it’s specified as list or tuple of DataTypes, the specified list or tuple will be used as the column types of the result table and the column names are inferred from the given Pandas dataframe.


The argument `splits_num` is used to specify the number of splits the given Pandas dataframe will be splitted into. If not specified, the default parallelism will be used. 

to_pandas

This interface is used to convert a Flink PyFlink table to a Pandas dataframe.

The signature of to`to_pandas pandas` is as following:

class Table(object):

    def to_pandas(self) -> pd.DataFrame:

        pass

Proposed Changes

from_pandas

The basic architecture workflow of `from_pandas` is as following:



Image RemovedImage Added

  1. The Pandas dataframe will be splitted into multiple splits and serialized using into Arrow format. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the serialization of the Pandas dataframe.
  2. The serialized bytes are used to construct Java ArrowDataInputFormat which extends the InputFormatArrowTableSource. Inside ArrowDataInputFormatArrow source implementation, it will create an ArrowReader to deserialize the serialized bytes of the Pandas dataframe and each instance the source will only handle a subset of the input splits. The ArrowReader which has already been introduced for vectorized Python UDF could be reused for this purposethe deserialization.
  3. The ArrowDataInputFormat ArrowTableSource could then be used to construct a Flink Table object.

...

ArrowTableSource

Although the length of the Pandas dataframe is limited, ArrowTableSource is designed to support both the streaming mode and batch mode. This enables users to create a PyFlink table which could run in both the streaming mode and the batch mode. Under streaming mode, it will handle the checkpoint properly(e.g. store the split ids which have already been processed into the state) to make sure that the execution results remain correct in case of job failover

...

.

to_pandas

The basic architecture workflow of `to_pandas` is as following:
Image Removed
Image Added

  1. As we know that, Flink Table is just a logical representation and we have to execute it to retrieve the data which it stands for. So when `to_pandas` is called, a Flink Table API job will be constructed and executed which takes an ArrowTableSink as the sink of the Flink Table. 
  2. Inside the ArrowTableSink, it will serialize the result data into Arrow format data using ArrowWriter which has already been introduced for vectorized Python UDF. The serialized data was put into an Accumulator. 
  3. When During execution of the job finished, the result data of the table will be returned as part of the execution result(Accumulator).fetched to the client continuously. (Note: It shares the same design on how to retrieve the execution results as Table.collect which is still under discussion. This is just the implementation details and will not be discussed in this design and we can discuss it in the discussion about Table.collect. In this design we will only focus on Flink/Pandas conversion related things)
  4. The serialized data of the table will be used to construct the Pandas dataframe. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the deserialization of the serialized data.

...

ArrowTableSink

ArrowTableSink is designed to support both the streaming mode and batch mode. This enables users to convert a PyFlink table which runs in both the batch mode and the streaming mode(the size of the source data is limited in this case) to Pandas dataframe. Under streaming mode, it will handle the checkpoint properly(the results will be buffered and only be sent to the client when a checkpoint is finished or job finished) to make sure that the execution results remain correct in case of job failover.

Example

# Suppose there is already a Pandas dataframe
pdf = xxx
...

row_type = DataTypes.ROW(

    [DataTypes.FIELD("f1", DataTypes.INT()),

     DataTypes.FIELD("f2", DataTypes.FLOAT()),

     DataTypes.FIELD("f3", DataTypes.BOOLEAN()),

     DataTypes.FIELD("f4", DataTypes.STRING())])

# convert the Pandas dataframe to PyFlink table
table = t_env.from_pandas(pdf, row_type)

# perform operations on the PyFlink table
filtered_table = table.filter("f1 > 0")

# convert the result PyFlink table to Pandas dataframe
filtered_pdf = filtered_table.to_pandas()

# perform operations using Pandas dataframe API

filtered_pdf.count()


Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

N/A

Rejected Alternatives

N/A