Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Pandas dataframe is the de-facto table data format of the standard to work with tabular data in Python community. Flink PyFlink table is Flink’s representation of the structured table tabular data in Python language. It would be nice to provide the ability to convert between the Flink PyFlink table and Pandas dataframe in Flink Python PyFlink Table API . which has the following benefits:

  • It provides users the ability to


  • switch between PyFlink and Pandas seamlessly when processing data in Python language. Users could process data using one execution engine and switch to another seamlessly. For example, it may happen that users have already got a Pandas dataframe at hand and want to perform some expensive transformation of it. Then they could


  • convert it to


  • a PyFlink table and leverage the power of Flink engine. Users could also


  • convert a


  • PyFlink table to Pandas dataframe and perform transformation of it with the rich


  • functionalities provided by the Pandas


  • ecosystem.


  1. Support conversion between Flink Table and Pandas DataFrame in Flink Python Table API


  1. A Flink table which takes a source which never ends can not be converted to a Pandas dataframe
  • No third-party connectors are needed when converting between them.

Public Interfaces

We will introduce interfaces `from_pandas` in `TableEnvironment` and `to_pandas` in `Table`.


This interface is used to convert a Pandas dataframe to a Flink PyFlink table. 

The signature of from`from_pandas pandas` is as following:

class TableEnvironment(object):

    def from_pandas(self, pdf: pd.DataFrame, schema: Union[RowType,

list, tuple]

List[str], Tuple[str], List[DataType], Tuple[DataType]] = None, splits_num: int = None) -> Table:


The argument schema `schema` is used to specify the schema of the result table:

  1. If it’s not specified, the schema of the result table will be inferred from the given Pandas dataframe.
  2. If it’s specified and the type is RowType, the specified type will be used as the schema of the result table.
  3. If it’s specified as list or tuple of str, the specified list or tuple will be used as the column names of the result table and the column types are inferred from the given Pandas dataframe.
  4. If it’s specified as list or tuple of DataTypes, the specified list or tuple will be used as the column types of the result table and the column names are inferred from the given Pandas dataframe.

The argument `splits_num` is used to specify the number of splits the given Pandas dataframe will be splitted into. If not specified, the default parallelism will be used. 


This interface is used to convert a Flink PyFlink table to a Pandas dataframe.

The signature of to`to_pandas pandas` is as following:

class Table(object):

    def to_pandas(self) -> pd.DataFrame:


Proposed Changes


The basic architecture workflow of `from_pandas` is as following:

Image RemovedImage Added

  1. The Pandas dataframe will be splitted into multiple splits and serialized using into Arrow format. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the serialization of the Pandas dataframe
  2. The serialized bytes are used to construct Java ArrowDataInputFormat which extends the InputFormatArrowTableSource. Inside ArrowDataInputFormatArrow source implementation, it will create an ArrowReader to deserialize the serialized bytes of the Pandas dataframe and each instance will only handle a subset of the input splits. The ArrowReader which has already been introduced for vectorized Python UDF could be reused for this purposethe deserialization.
  3. The ArrowDataInputFormat ArrowTableSource could then be used to construct a Flink Table object.



Although the length of the Pandas dataframe is limited, ArrowTableSource is designed to support both the streaming mode and batch mode. This enables users to create a PyFlink table which could run in both the streaming mode and the batch mode. Under streaming mode, it will handle the checkpoint properly to make sure that the execution results remain correct in case of job failovers




The basic architecture workflow of `to_pandas` is as following:
Image Removed
Image Added

  1. As we know that, Flink Table is just a logical representation and we have to execute it to retrieve the data which it stands for. So when `to_pandas` is called, a Flink Table API job will be constructed and executed which takes an ArrowTableSink as the sink of the Flink Table. 
  2. Inside the ArrowTableSink, it will serialize the result data into Arrow format data using ArrowWriter which has already been introduced for vectorized Python UDF. The serialized data was will be put into an Accumulator
  3. When the job is finished, the accumulator data(the result data of the table) will be returned as part of the execution result(Accumulator).
  4. The serialized data of the table will be used to construct the Pandas dataframe. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the deserialization of the serialized data.



ArrowTableSink is designed to support both the streaming mode and batch mode. This enables users to convert a PyFlink table which runs in both the batch mode and the streaming mode(the size of the source data is limited in this case) to Pandas dataframe. Under streaming mode, it will handle properly the checkpoint to make sure that the execution results remain correct in case of job failover.


# Suppose there is already a Pandas dataframe
pdf = xxx

row_type = DataTypes.ROW(

    [DataTypes.FIELD("f1", DataTypes.INT()),

     DataTypes.FIELD("f2", DataTypes.FLOAT()),

     DataTypes.FIELD("f3", DataTypes.BOOLEAN()),

     DataTypes.FIELD("f4", DataTypes.STRING())])

# convert the Pandas dataframe to


PyFlink table
table = t_env.from_pandas(pdf, row_type)

# perform


operations on the


PyFlink table
filtered_table = table.filter("f1 > 0")

# convert the result


PyFlink table to Pandas dataframe
filtered_pdf = filtered_table.to_pandas()

# perform operations using Pandas dataframe API


Compatibility, Deprecation, and Migration Plan
