Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In flink-connector-redshift, the concept of a connector source allows users to configure different modes for data retrieval. This functionality is achieved by utilizing the ``scan.read.mode`mode` option available in the source configuration. This flexible configuration empowers users to adapt the connector's behaviour based on their specific requirements.

...

  1. Configuration:

    i) Source configurations (mandatory)


    Option

    Value

    Description
    scan.read.mode
    jdbc, unload

    hostname 

    <redshifthost>


    database 

    <name_of_database>

    table.name 
    <name_of_table>


    temp.s3.path 

    s3n://path/to/temp/data

    If read mode is unload  then `temp.s3.path` becomes mandatory
    username <username>
    password <password>




    ii) Source configurations (optional)

    Option

    Value

    Default

    Description

    scan.query
    String


    Code Block
    SELECT * from <Table_Name> limit <record_size>;



    This will help optimise the data read from the source itself.
    portInteger5439
    record.sizePositive Integer1000


...

  1. Directly writing to redshift using JDBC driver.
  2. Flink stream write to a specified s3 path in the format and schema accepted by Redshift then use COPY command to write data into the redshift table.

    Flink connector redshift will provide users with the ability to configure different modes for the connector sink by utilizing the sink.write.mode
    option   option in the source configuration. This configuration flexibility allows users to tailor the behaviour of the connector sink to their specific needs. Internally, the flink-connector-redshift module will intelligently select and integrate with the appropriate connectors based on the chosen write mode. If the user specifies the write mode as "jdbc" in the source configuration, the flink-connector-redshift module will internally leverage the capabilities of the flink-connector-jdbc. This integration will enable seamless interaction with Redshift using the JDBC protocol, ensuring efficient data transfer from Flink to the Redshift database. Similarly, when the write mode is selected as a file-based operation, the flink-connector-redshift module will utilize the flink-connector-filesystem. This connector will facilitate writing data to an S3 bucket, adhering to the specific format and schema requirements outlined by Redshift's COPY command. By utilizing this connector, the flink-connector-redshift module will ensure that data is written in a manner compatible with Redshift's expectations. To provide a streamlined sink solution for Flink and Redshift integration, the flink-connector-redshift module will orchestrate and utilize these two connectors, flink-connector-jdbc and flink-filesystem, behind the scenes. It preprocesses the data and seamlessly wraps these connectors to offer a unified sink interface for transferring data from Flink to Redshift.

In summary, the flink-connector-redshift module offers users the flexibility to configure different modes for the connector sink. By intelligently utilizing the appropriate connectors, such as flink-connector-jdbc for direct JDBC-based interaction and flink-connector-filesystem for file-based operations, the module ensures efficient data transfer to Redshift. Through its preprocessing capabilities and seamless integration, the flink-connector-redshift module acts as a comprehensive sink solution, facilitating the smooth transfer of data from Flink to Redshift.

...


Option

Value

Description

host

<redshifthost> 


database

database_name

 table-nametable_name
sink.write.modejdbc/ copy
aws-iam-rolearn:aws:iam::123456789010:role/redshift_iam_role
username<username>
password<password>

...

Code Block
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = .....

val prop = new Properties()
prop.setProperty("url","
jdbc:redshift://redshifthost:5439/database?user=<username>&password=<password>")
prop.setProperty("database", "test_db")
prop.setProperty("table-name", "test_sink_table")
prop.setProperty("sink.write.mode", "jdbc")


stream.addSink(new FlinkRedshiftSink(prop))
env.execute()

...

Code Block
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val prop = new Properties()
prop.setProperty("url","
jdbc:redshift://redshifthost:5439/database?user=<username>&password=<password>")
prop.setProperty("database", "test_db")
prop.setProperty("tablequerytable-name", "test_sink_table")
prop.setProperty("sink.write.mode", "jdbc")

tEnv
.connect(new Redshift().properties(props))
.inAppendMode()
.registerTableSource("sink-table")

val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

...

Additional Features that the Flink connector for AWS Redshift can provide on top of using JDBC:

1. Integration with AWS Redshift Workload Management (WLM): AWS Redshift allows you to configure WLM to manage query prioritization and resource allocation. The Flink connector for Redshift will be agnostic to the configured WLM and utilise it for scaling in and out for the source/sink. This means that the connector can leverage the WLM capabilities of Redshift to optimize the execution of queries and allocate resources efficiently based on your defined workload priorities.

2. Abstraction of AWS Redshift Quotas and Limits: AWS Redshift imposes certain quotas and limits on various aspects such as the number of clusters, concurrent connections, queries per second, etc. The Flink connector for Redshift will provide an abstraction layer for users, allowing them to work with Redshift without having to worry about these specific limits. The connector will handle the management of connections and queries within the defined quotas and limits, abstracting away the complexity and ensuring compliance with Redshift's restrictions.

...