Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/wsz4jgdpnlyw1x781f9qpk7y416b45dj
Vote threadhttps://lists.apache.org/thread/m2cqp4mc1rt4hz0sxj2n50lxjs7m9wnl
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31854

Release


...

  1. Directly writing to redshift using JDBC driver.
  2. Flink stream write to a specified s3 path in the format and schema accepted by Redshift then use COPY command to write data into the redshift table.

    Flink connector redshift will provide users with the ability to configure different modes for the connector sink by utilizing the sink.write.mode  option in the source configuration. This configuration flexibility allows users to tailor the behaviour of the connector sink to their specific needs. Internally, the flink-connector-redshift module will intelligently select and integrate with the appropriate connectors based on the chosen write mode. If the user specifies the write mode as "jdbc" in the source configuration, the flink-connector-redshift will use custom redshift JDBC driver. This integration will enable seamless interaction with Redshift using the JDBC protocol, ensuring efficient data transfer from Flink to the Redshift database. Similarly, when the write mode is selected as a file-based operation, the flink-connector-redshift module will utilize the flink-connector-filesystem. This connector will facilitate writing data to an S3 bucket, adhering to the specific format and schema requirements outlined by Redshift's COPY command. By utilizing this connector, the flink-connector-redshift module will ensure that data is written in a manner compatible with Redshift's expectations. To provide a streamlined sink solution for Flink and Redshift integration, the flink-connector-redshift module will orchestrate and use flink-filesystem, behind the scenes. It preprocesses the data and seamlessly wraps these connectors to offer a unified sink interface for transferring data from Flink to Redshift.

Sink Connector Options

OptionRequiredDefaultTypeDescription
hostnamerequirednoneStringRedshift connection hostname
portrequired5439IntegerRedshift connection port
usernamerequirednoneStringRedshift user username
passwordrequirednoneStringRedshift user password
database-namerequireddevStringRedshift database to connect
table-namerequirednoneStringReshift table name
sink.batch-sizeoptional1000IntegerThe max flush size, over this will flush data.
sink.flush-intervaloptional1sDurationOver this flush interval mills, asynchronous threads will flush data.
sink.max-retriesoptional3IntegerThe max retry times when writing records to the database failed.
copy-
sink.write.moderequired
false
JDBC
Boolean
Enum
Using Redshift COPY command to insert/upsert or not.
Initial support for Redshift COPY and JDBC
sink.write.temp.
copy-temp-
s3-uriconditional requirednoneStringIf
the copy-
sink.write.mode =
true then
COPY then Redshift COPY command must need a S3 URI.
aws.iam-role
-arn
conditional requirednoneStringIf the copy-mode=true then then Redshift COPY command must need a IAM role. And this role must have the privilege and attache to the Redshift cluser.

Update/Delete Data Considerations: The data is updated and deleted by the primary key.

...

Code Block
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val prop = new Properties()
prop.setProperty("url","
jdbc:redshift://redshifthost:5439/database?user=<username>&password=<password>")
prop.setProperty("database", "test_db")
prop.setProperty("querytable-name", "test_sink_table")
prop.setProperty("sink.write.mode", "jdbc")

tEnv
.connect(new Redshift().properties(props))
.inAppendMode()
.registerTableSource("sink-table")

val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

Create and sink a table in COPY mode


-- register a Redshift table `t_user` in flink sql.
CREATE TABLE t_user (
    `user_id` BIGINT,
    `user_type` INTEGER,
    `language` STRING,
    `country` STRING,
    `gender` STRING,
    `score` DOUBLE,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'redshift',
    'hostname' = 'xxxx.redshift.awsamazon.com',
    'port' = '5439',
    'username' = 'awsuser',
    'password' = 'passwordxxxx',
    'database.name' = 'tutorial',
    'table.name' = 'users',
    'sink.batch.size' = '500',
    'sink.flush.interval' = '1000',
    'sink.max.retries' = '3',
    'write.mode' = 'copy',
    'copy.temp.s3.uri' = 's3://bucket-name/key/temp',
    'iam-role-arn' = 'arn:aws:iam::xxxxxxxx:role/xxxxxRedshiftS3Rolexxxxx'
);

Create and sink a table in pure JDBC mode

-- register a Redshift table `t_user` in flink sql.
CREATE TABLE t_user (
    `user_id` BIGINT,
    `user_type` INTEGER,
    `language` STRING,
    `country` STRING,
    `gender` STRING,
    `score` DOUBLE,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'redshift',
    'hostname' = 'xxxx.redshift.awsamazon.com',
    'port' = '5439',
    'username' = 'awsuser',
    'password' = 'passwordxxxx',
    'database-name' = 'tutorial',
    'table.name' = 'users',
    'sink.batch.size' = '500',
    'sink.flush.interval' = '1000',
    'sink.max.retries' = '3'
);

-- write data into the Redshift table from the table `T`
INSERT INTO t_user
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`) FROM T;


NOTE: NOTE:

Additional Features that the Flink connector for AWS Redshift can provide on top of using JDBC:

1. Integration with AWS Redshift Workload Management (WLM): AWS Redshift allows you to configure WLM to manage query prioritization and resource allocation. The Flink connector for Redshift will be agnostic to the configured WLM and utilise it for scaling in and out for the source/sink. This means that the connector can leverage the WLM capabilities of Redshift to optimize the execution of queries and allocate resources efficiently based on your defined workload priorities.

2. Abstraction of AWS Redshift Quotas and Limits: AWS Redshift imposes certain quotas and limits on various aspects such as the number of clusters, concurrent connections, queries per second, etc. The Flink connector for Redshift will provide an abstraction layer for users, allowing them to work with Redshift without having to worry about these specific limits. The connector will handle the management of connections and queries within the defined quotas and limits, abstracting away the complexity and ensuring compliance with Redshift's restrictions.

...

      1. Flink to Redshift: The Flink connects to Redshift via JDBC using a username and password.
        Redshift does not support the use of IAM roles to authenticate this connection.
        This connection can be secured using SSL; for more details, see the Encryption section below.
      2. Flink to S3: S3 acts as a middleman to store bulk data when reading from or writing to Redshift.
        Flink connects to S3 using both the Hadoop FileSystem interfaces and directly using the Amazon Java SDK's S3 client.

        This connection can be authenticated using either AWS keys or IAM roles.
      3. Default Authentication: Flink-connector-aws provides different modes of Authentication redshift connectors will use CredentialProvider.
        AWS credentials will automatically be retrieved through the
        DefaultAWSCredentialsProviderChain.
        Users can use IAM instance roles to authenticate to S3 (e.g. on EMR, or EC2).

POC : https://github.com/Samrat002/flink-connector-aws/tree/redshift-connectorRedshift Connector (TABLE API Implementation)

Limitations

  1. Parallelism in flink-connector-redshift will be limited by Quotas configured for the job in Redshift Connections Quotas and limits in Amazon Redshift - Amazon Redshift.
  2. Speed and latency in source and sink will be regulated by the latency and data transfer rate of UNLOAD and COPY feature from redshift
  3. Flink connector redshift sink will only support append-only tables. (no changelog mode support)

...