...
To customize compression encoding for specific columns when creating a table, the connector provides a way to set the encoding
column metadata field. Please refer to the available encodings in the Amazon Redshift documentation.
Redshift allows descriptions to be attached to columns, which can be viewed in most query tools. You can specify a description for individual columns by setting the description
column metadata field. This can be done using the COMMENT
command.
IV) Datatype Mapping
Flink Type | Redshift Type |
---|---|
CHAR | VARCHAR |
VARCHAR | VARCHAR |
STRING | VARCHAR |
BOOLEAN | Boolean |
BYTES | Not supported |
DECIMAL | Decimal |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DATE | Date |
TIME | Timestamp |
TIMESTAMP | Timestamp |
TIMESTAMP_LTZ | Timestamp |
INTERVAL_YEAR_MONTH | Int32 |
INTERVAL_DAY_TIME | Int64 |
ARRAY | Not supported |
MAP | Not supported |
ROW | Not supported |
MULTISET | Not supported |
RAW | Not supported |
V) Source Design
Flink connector redshift will offer 2 modes to read from the source:-
...
Configuration:
Source Connector Options
Option Required Default Type Description hostname required none String Redshift connection hostname port required 5439 Integer Redshift connection port username required none String Redshift user username password required none String Redshift user password database.name required dev String Redshift database to connect table.name required none String Reshift table name source.batch.size optional 1000 Integer The max flush size, over this will flush data. source.flush.interval optional 1s Duration Over this flush interval mills, asynchronous threads will flush data. source.max.retries optional 3 Integer The max retry times when writing records to the database failed. scan.read.mode required false Boolean Using Redshift UNLOAD command. unload.temp.s3.path conditional required none String If the unload-mode=true then then Redshift UNLOAD command must need a S3 URI. iam-role-arn conditional required none String If the unload.mode=true then then Redshift UNLOAD command must need a IAM role. And this role must have the privilege and attache to the Redshift cluser
VI) Sink Design
Flink connector Redshift will offer 2 modes to write to Redshift:-
- Directly writing to redshift using JDBC driver.
- Flink stream write to a specified s3 path in the format and schema accepted by Redshift then use
COPY
command to write data into the redshift table.
Flink connector redshift will provide users with the ability to configure different modes for the connector sink by utilizing thesink.write.mode
option in the source configuration. This configuration flexibility allows users to tailor the behaviour of the connector sink to their specific needs. Internally, the flink-connector-redshift module will intelligently select and integrate with the appropriate connectors based on the chosen write mode. If the user specifies the write mode as "jdbc" in the source configuration, the flink-connector-redshift will use custom redshift JDBC driver. This integration will enable seamless interaction with Redshift using the JDBC protocol, ensuring efficient data transfer from Flink to the Redshift database. Similarly, when the write mode is selected as a file-based operation, the flink-connector-redshift module will utilize the flink-connector-filesystem. This connector will facilitate writing data to an S3 bucket, adhering to the specific format and schema requirements outlined by Redshift's COPY command. By utilizing this connector, the flink-connector-redshift module will ensure that data is written in a manner compatible with Redshift's expectations. To provide a streamlined sink solution for Flink and Redshift integration, the flink-connector-redshift module will orchestrate and use flink-filesystem, behind the scenes. It preprocesses the data and seamlessly wraps these connectors to offer a unified sink interface for transferring data from Flink to Redshift.
Sink Connector Options
Option | Required | Default | Type | Description |
---|---|---|---|---|
hostname | required | none | String | Redshift connection hostname |
port | required | 5439 | Integer | Redshift connection port |
username | required | none | String | Redshift user username |
password | required | none | String | Redshift user password |
database-name | required | dev | String | Redshift database to connect |
table-name | required | none | String | Reshift table name |
sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data. |
sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data. |
sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed. |
copy-mode | required | false | Boolean | Using Redshift COPY command to insert/upsert or not. |
copy-temp-s3-uri | conditional required | none | String | If the copy-mode=true then then Redshift COPY command must need a S3 URI. |
iam-role-arn | conditional required | none | String | If the copy-mode=true then then Redshift COPY command must need a IAM role. And this role must have the privilege and attache to the Redshift cluser. |
Update/Delete Data Considerations: The data is updated and deleted by the primary key.
...
- Flink to Redshift: The Flink connects to Redshift via JDBC using a username and password.
Redshift does not support the use of IAM roles to authenticate this connection.
This connection can be secured using SSL; for more details, see the Encryption section below. - Flink to S3: S3 acts as a middleman to store bulk data when reading from or writing to Redshift.
Flink connects to S3 using both the Hadoop FileSystem interfaces and directly using the Amazon Java SDK's S3 client.
This connection can be authenticated using either AWS keys or IAM roles. - Default Authentication: Flink-connector-aws provides different modes of Authentication redshift connectors will use CredentialProvider.
AWS credentials will automatically be retrieved through the DefaultAWSCredentialsProviderChain.
Users can use IAM instance roles to authenticate to S3 (e.g. on EMR, or EC2).
- Flink to Redshift: The Flink connects to Redshift via JDBC using a username and password.
Proof Of Concept POC : https://github.com/Samrat002/flink-connector-aws/tree/redshift-connector
...