Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Load Action is primarily suitable for importing local files.

SCENARIO

Analysts and business development personnel often encounter temporary small-scale data scenarios in their work. Faced with one or multiple CSV files, these files typically have small data volumes and are scattered across various systems. The traditional process for handling these files involves first uploading them to OSS storage, then using Flink or Hive SQL for table creation and data import, before analysis can begin. This process is not only complex and tedious but also requires specific prior knowledge. To simplify this process, we have introduced the 'Load Action' service. This service enables the rapid import of small-scale data, directly creating Paimon tables, without the dependence on any other data processing engines.

...

Code Block
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_load


Overview

How to Load

Users only need to execute an HTTP request with the dataset-related table creation information to complete:

  • Verification of user import permissions.
  • Creation of a Paimon table according to the table creation information.
  • Reading of the uploaded file, data segmentation according to specified separators, followed by batch data import.
  • Execution of filtering and adjustment of column positions.

Image Added


Signature parameters

Code Block
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://load_host:http_port/api/{db}/{table}/_load

Signature parameters


  • user/passwd: To verify user identity and import permissions.
  • label: Identifier of the import task.
  • column_separator: Used to specify the column separator in the import file, default is \t.
  • line_delimiter: Used to specify the newline character in the import file, default is \n.
  • where: Filter condition specified for the import task.
  • columns: Names of the column fields of the data to be imported.
  • format: Specifies the format of the data to be imported, supports csv and json, default is csv.

...

It's possible to add a SQL parameter in the Header, which can replace the previous parameters such as column_separator, line_delimiter, where, columns, etc., for convenience of use.

Code Block
curl --location-trusted -u user:passwd [-H "sql: ${load_sql}"...] -T data.file -XPUT http://load_host:http_port/api/_http_load


# -- load_sql
# insert into db.table (col, ...) select stream_col, ... from http_load("property1"="value1");


# http parameters
# (
#     "column_separator" = ",",
#     "format" = "CSV",
#     ...
# )

Return Results

Since Load Action is a synchronous import method, the results of the import are directly returned to the user through the return value created for the import.

Example:

Code Block
{
  "Status": "Success",
  "NumberTotalRows": 1000000,
  "NumberFilteredRows": 1,
  "NumberUnselectedRows": 0,
  "LoadTimeMs": 2144
}

...

code
Code Block
public interface WriteStrategy extends Seriali
zableSerializable {

    void writer(BatchTableWrite batchTableWrite, String content, String columnSeparator)
            throws Exception;

    Schema retrieveSchema() throws Exception;

}
/**
 * Establishes and manages connections.
 */
public interface ConnectionManager {

    /**
     * Retrieves a connection from a channel.
     *
     * @param channel The channel from which to get the connection.
     * @return The retrieved connection.
     */
    Connection get(Channel channel);

    /**
     * Removes and closes a connection from a channel.
     *
     * @param channel The channel from which to remove and close the connection.
     * @return The connection that was removed and closed.
     */
    Connection removeAndClose(Channel channel);

    /**
     * Adds a new connection.
     *
     * @param connection The connection to add.
     */
    void add(Connection connection);

    /**
     * Retrieves the number of connections.
     *
     * @return The number of active connections.
     */
    int getConnNum();

    /**
     * Initializes the connection manager.
     */
    void init();

    /**
     * Destroys the connection manager, releasing resources.
     */
    void destroy();
}


Compatibility, Deprecation, and Migration Plan

No

Test Plan

UT and IT

Rejected Alternatives

...

This is a new additional feature.