Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31854

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Background and Motivation


Brief Introduction about Redshift  

...


Overall, the Flink Redshift Connector is a powerful tool for businesses looking to perform real-time data analysis and integrate their Flink and Redshift workflows. By using the connector, businesses can take advantage of the best of both worlds, leveraging the real-time processing power of Flink and the scalability and cost-effectiveness of Redshift.

Proposed Change

We are putting forward a proposal to develop a submodule called "flink-connector-redshift" within the existing framework of "flink-connector-aws" By integrating it into "flink-connector-aws," we can leverage the authentication and essential utilities already present in the AWS-specific modules. This will streamline the development process and enable direct utilization of these resources.


Scope


Phase 1 

  • Integrate with Flink Sink API (FLIP-171)

...

  • Integrate with Table API (FLIP-95)
    1. Build upon Flink New DynamicTableSource and DynamicTableSinkFactory Interfaces.
    2. Build upon Flink New DynamicTableSink and DynamicTableSinkFactory Interfaces 


Overall Design


I) Transactional Guarantees

For general information on Redshift's transactional guarantees, please refer to the documentation on
managing concurrent write operations and serializable isolation. According to the Redshift documentation, although any of the four transaction isolation levels can be used, Amazon Redshift processes all isolation levels as serializable. Amazon Redshift also supports a default automatic commit behavior in which each separately-executed SQL command commits individually. As a result, individual commands such as COPY and UNLOAD are atomic and transactional, while explicit BEGIN and END should only be necessary to enforce the atomicity of multiple commands/queries. When reading from or writing to Redshift, this library reads and writes data in S3. Both Flink and Redshift produce partitioned output, which is stored in multiple files in S3. According to the Amazon S3 Data Consistency Model documentation, S3 bucket listing operations are eventually-consistent. Therefore, flink-connector-redshift needs to take special measures to avoid missing or incomplete data due to this source of eventual-consistency.


II) Guarantees Reading Data From Redshift

  1. Appending to an existing table : In the COPY command, this library will use copy-manifests to guard against certain eventually-consistent S3 operations. As a result, appending to existing tables has the same atomic and transactional properties as regular Redshift COPY commands.

  2. Inserting rows into an existing table : When inserting rows into Redshift, this library uses the COPY command and specifies manifests to guard against certain eventually-consistent S3 operations. As a result, flink-connector-redshiftappends to existing tables have the same atomic and transactional properties as regular Redshift COPY commands.

  3. Creating a new table : Creating a new table is a two-step process, consisting of a CREATE TABLE command followed by a COPY command to append the initial set of rows. Both of these operations are performed in a single transaction.

  4. Overwriting an existing table : By default, this library uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table, and appending rows to it. Upon configuring required options in flink-connector-redshift, the connector will commit the DELETE TABLE command before appending rows to the new table. This sacrifices the atomicity of the overwrite operation but reduces the amount of staging space that Redshift needs during the overwrite.

  5. Querying Redshift tables :Queries use Redshift's UNLOAD command to execute a query and save its results to S3. This library uses unload-manifests to guard against certain eventually-consistent S3 operations. As a result, queries from the Redshift data source for Flink should have the same consistency properties as regular Redshift queries.


III) Column Encoding

To customize compression encoding for specific columns when creating a table, the connector provides a way to set the
encoding column metadata field. Please refer to the available encodings in the Amazon Redshift documentation.
Redshift allows descriptions to be attached to columns, which can be viewed in most query tools. You can specify a description for individual columns by setting the description column metadata field. This can be done using the COMMENT command.


IV) Source Design

Flink connector redshift will offer 2 modes to read from source :-

...

  1. Configuration:
    i) Source configurations (mandatory)


    Option

    Value

    Description
    read.modejdbc, unload

    hostname

    <redshifthost>


    database

    <name_of_database>


    table

    <name_of_table>


    temp.s3.path

    s3n://path/to/temp/data

    If read mode is unload then `temp.s3.path` becomes mandatory
    username<username>
    password<password>




    ii) Source configurations (optional)


    Option

    Value

    Default

    Description

    query
    String


    Code Block
    SELECT * from <Table_Name> limit <record_size>;



    This will help optimizing the data read from the source itself.
    portInteger5439
    record.sizePositive Integer1000


  2. Sink Design
    Flink connector redshift will offer 2 modes to write to Redshift :-
    a) Directly writing to redshift using JDBC driver.
    b) Flink stream write to a specified s3 path in the format and schema accepted by redshift then use COPY command to write data into redshift table.

    Flink connector redshift will provide users with the ability to configure different modes for the connector sink by utilizing the write.mode option in the source configuration. This configuration flexibility allows users to tailor the behavior of the connector sink to their specific needs.

    Internally, the flink-connector-redshift module will intelligently select and integrate with the appropriate connectors based on the chosen write mode. If the user specifies the write mode as "jdbc" in the source configuration, the flink-connector-redshift module will internally leverages the capabilities of the flink-connector-jdbc. This integration will enable seamless interaction with Redshift using the JDBC protocol, ensuring efficient data transfer from Flink to the Redshift database.

    Similarly, when the write mode is selected as a file-based operation, the flink-connector-redshift module will utilize the flink-connector-filesystem. This connector will facilitate writing data to an S3 bucket, adhering to the specific format and schema requirements outlined by Redshift's COPY command. By utilizing this connector, the flink-connector-redshift module will ensure that data is written in a manner compatible with Redshift's expectations.

    To provide a streamlined sink solution for Flink and Redshift integration, the flink-connector-redshift module will orchestrate and utilize these two connectors, flink-connector-jdbc and flink-filesystem, behind the scenes. It preprocesses the data and seamlessly wraps these connectors to offer a unified sink interface for transferring data from Flink to Redshift.

    In summary, the flink-connector-redshift module offers users the flexibility to configure different modes for the connector sink. By intelligently utilizing the appropriate connectors, such as flink-connector-jdbc for direct JDBC-based interaction and flink-connector-filesystem for file-based operations, the module ensures efficient data transfer to Redshift. Through its preprocessing capabilities and seamless integration, the flink-connector-redshift module acts as a comprehensive sink solution, facilitating the smooth transfer of data from Flink to Redshift.


           i) Sink configurations (mandatory)



Option

Value

Description

host

<redshifthost> 


database

database_name

tabletable_name
write.modejdbc/ copy
aws-iam-rolearn:aws:iam::123456789010:role/redshift_iam_role
username<username>
password<password>

          ii) Sink configurations (optional)

Option

Value

Default

Description
port
5439








          
 
               iii) Sample Source Code:   
    • Sample DataStream API Code:

...

Code Block
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val prop = new Properties()
prop.setProperty("url","
jdbc:redshift://redshifthost:5439/database?user=<username>&password=<password>")
prop.setProperty("database", "test_db")
prop.setProperty("table", "test_sink_table")
prop.setProperty("write.mode", "jdbc")

tEnv
.connect(new Redshift().properties(props))
.inAppendMode()
.registerTableSource("sink-table")

val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()


V) Authentication to S3 and Redshift

The use of this library involves several connections which must be authenticated / secured, all of which are illustrated in the following diagram:

...

Reference from above diagram
→ Redshift Configured Connections:- Configuring a connection for JDBC driver version 2.1 for Amazon Redshi...
→ SSL enable in Redshift :- Configuring authentication and SSL - Amazon Redshift

This library reads and writes data to S3 when transferring data to/from Redshift. As a result, it requires AWS credentials with read and write access to a S3 bucket (specified using the temp.s3.path configuration parameter).


The following describes how each connection can be authenticated:

  1. Flink to Redshift: The Flink connects to Redshift via JDBC using a username and password.
    Redshift does not support the use of IAM roles to authenticate this connection.
    This connection can be secured using SSL; for more details, see the Encryption section below.
  2. Flink to S3: S3 acts as a middleman to store bulk data when reading from or writing to Redshift.
    Flink connects to S3 using both the Hadoop FileSystem interfaces and directly using the Amazon Java SDK's S3 client.

    This connection can be authenticated using either AWS keys or IAM roles.
  3. Default Authentication: Flink-connector-aws provides different mode of Authentication redshift connector will use CredentialProvider.
    AWS credentials will automatically be retrieved through the
    DefaultAWSCredentialsProviderChain.
    Users can use IAM instance roles to authenticate to S3 (e.g. on EMR, or EC2).



Limitations

  1. Parallelism in flink-connector-redhsift will be limited by Quotas configured for the job in Redshift Connections Quotas and limits in Amazon Redshift - Amazon Redshift.
  2. Speed and latency in source and sink will be regulated by the latency and data transfer rate of UNLOAD and COPY feature from redshift

Compatibility, Deprecation, and Migration Plan

The flink-connector-redshift will be compatible with respect to Flink source and sink interface.
This is new connector(feature) no compatibility, deprecation, and migration plan is expected.

Test Plan

Rejected Alternatives

N/A