You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state["Under Discussion"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Pandas dataframe is the de-facto table data format of the Python community. Flink table is Flink’s representation of the structured table data. It would be nice to provide the ability to convert between the Flink table and Pandas dataframe in Flink Python Table API. It provides users the ability to process data using both PyFlink and Pandas seamlessly. Users could process data using one execution engine and switch to another seamlessly. For example, it may happen that users have already got a Pandas dataframe at hand and want to perform some expensive transformation of it. Then they could choose to convert it to the Flink table and leverage the power of Flink engine. Users could also choose to convert a Flink table to Pandas dataframe and perform transformation of it with the rich functionality provided by Pandas and the Python ecosystem.

Goals

  1. Support conversion between Flink Table and Pandas DataFrame in Flink Python Table API

Non-Goals

  1. A Flink table which takes a source which never ends can not be converted to a Pandas dataframe

Public Interfaces

We will introduce interfaces `from_pandas` in `TableEnvironment` and `to_pandas` in `Table`.

from_pandas

This interface is used to convert a Pandas dataframe to a Flink table. 

The signature of from_pandas is as following:

class TableEnvironment(object):

    def from_pandas(self, pdf: pd.DataFrame, schema: Union[RowType, list, tuple] = None) -> Table:

        pass


The argument schema is used to specify the schema of the result table:

  1. If it’s not specified, the schema of the result table will be inferred from the given Pandas dataframe.
  2. If it’s specified and the type is RowType, the specified type will be used as the schema of the result table.
  3. If it’s specified as list or tuple of str, the specified list or tuple will be used as the column names of the result table and the column types are inferred from the given Pandas dataframe.

to_pandas

This interface is used to convert a Flink table to a Pandas dataframe.

The signature of to_pandas is as following:

class Table(object):

    def to_pandas(self) -> pd.DataFrame:

        pass


Proposed Changes

from_pandas

The basic architecture of `from_pandas` is as following:



  1. The Pandas dataframe will be serialized using Arrow format. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the serialization of the Pandas dataframe
  2. The serialized bytes are used to construct Java ArrowDataInputFormat which extends the InputFormat. Inside ArrowDataInputFormat, it will create an ArrowReader to deserialize the serialized bytes of the Pandas dataframe. The ArrowReader which has already been introduced for vectorized Python UDF could be reused for this purpose.
  3. The ArrowDataInputFormat could then be used to construct a Flink Table object.


NOTE:

  1. The parallelism of the ArrowDataInputFormat could only be 1.

to_pandas

The basic architecture of `to_pandas` is as following:

  1. As we know that, Flink Table is just a logical representation and we have to execute it to retrieve the data which it stands for. So when `to_pandas` is called, a Flink Table API job will be constructed and executed which takes an ArrowTableSink as the sink of the Flink Table. 
  2. Inside the ArrowTableSink, it will serialize the result data into Arrow format using ArrowWriter which has already been introduced for vectorized Python UDF. The serialized data was put into an Accumulator
  3. When the job finished, the data of the table will be returned as part of the execution result(Accumulator).
  4. The serialized data of the table will be used to construct the Pandas dataframe. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the deserialization of the serialized data.


NOTE:

  1. The parallelism of the ArrowTableSink could only be 1.

Example

# Suppose there is already a Pandas dataframe
pdf = xxx
...

row_type = DataTypes.ROW(

    [DataTypes.FIELD("f1", DataTypes.INT()),

     DataTypes.FIELD("f2", DataTypes.FLOAT()),

     DataTypes.FIELD("f3", DataTypes.BOOLEAN()),

     DataTypes.FIELD("f4", DataTypes.STRING())])

# convert the Pandas dataframe to Flink table
table = t_env.from_pandas(pdf, row_type)

# perform some operations on the Flink table
filtered_table = table.filter("f1 > 0")

# convert the result Flink table to Pandas dataframe
filtered_pdf = filtered_table.to_pandas()


Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

N/A

Rejected Alternatives

N/A

  • No labels