Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Pandas dataframe is the de-facto tableau table data format of the Python community. Flink table is Flink’s representation of the structured table data. It would be nice to provide the ability to convert between the Flink table and Pandas dataframe in Flink Python Table API. It provides users the ability to process data using both PyFlink and Pandas seamlessly. Users could process data using one execution engine and switch to another seamlessly. For example, it may happen that users have already got a Pandas dataframe at hand and want to perform some expensive transformation of it. Then they could choose to convert it to the Flink table and leverage the power of Flink engine. Users could also choose to convert a Flink table to Pandas dataframe and perform transformation of it with the rich functionality provided by Pandas and the Python ecosystem.

...

Proposed Changes

from_pandas

The implementation basic architecture of `from_pandas` is as following:


Image Added

  1. The Pandas dataframe will be serialized using Arrow format. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the serialization of the Pandas dataframe
  2. The serialized bytes are used to construct Java ArrowDataInputFormat which extends the InputFormat. Inside ArrowDataInputFormat, it will create an ArrowReader to deserialize the serialized bytes of the Pandas dataframe. The ArrowReader which has already been introduced for vectorized Python UDF could be reused for this purpose.
  3. The ArrowDataInputFormat could then be used to construct a Flink Table object.

...

  1. The parallelism of the ArrowDataInputFormat could only be 1.

to_pandas

The implementation basic architecture of `to_pandas` is as following:

Image Added

  1. As we know that, Flink Table is just a logical representation and we have to execute it to retrieve the data which it stands for. So when `to_pandas` is called, a Flink Table API job will be constructed and executed which takes an ArrowTableSink as the sink of the Flink Table. 
  2. Inside the ArrowTableSink, it will serialize the result data into Arrow format using ArrowWriter which has already been introduced for vectorized Python UDF. The serialized data was put into an Accumulator
  3. When the job finished, the data of the table will be returned as part of the execution result(Accumulator).
  4. The serialized data of the table will be used to construct the Pandas dataframe. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the deserialization of the serialized data.

...