Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
There is demand within the community for an Amazon DynamoDB connector. This Jira [1] was created in March 2020 to capture interest. The Async Sink framework was introduced in Flink 1.15 under FLIP-171 [2] and a DynamoDB sink was implemented, but not merged before feature freeze [3].
[1] https://issues.apache.org/jira/browse/FLINK-16504
[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
[3] https://issues.apache.org/jira/browse/FLINK-24229
Public Interfaces
Initially we will add a DynamoDB sink via the Async Sink framework with support for batch/stream and table API. Source support can be added at a later date.
- Sink:
Proposed Changes
The new sink connector will be based on the Async Sink (FLIP-171), support both Bounded (Batch) and Unbounded (Streaming) and both DataStream and Table API/SQL.
The Flink community will create a new connector repo, similar to ElasticSearch [1]. There is a work In Progress PR [2] from Flink 1.15 which will be ported to the new repository and finalised. The repository name will be:
flink-connector-dynamodb
We already have a DynamoDB streams consumer [3] that currently lives within the Kinesis Data Streams connector [4] module. Since DynamoDB streams uses the Kinesis Data Streams API, we will leave this source connector where it is to retain backwards compatibility. Once the Kinesis connector is migrated to the new source interfaces [5] (FLIP-27) we can consider moving the DynamoDB Streams source to a new repository.
A sample using the connector is shown below:
Properties clientProperties = new Properties();
// Supports auth methods and HTTP client configs from flink-connector-aws-base
clientProperties.setProperty(AWS_ACCESS_KEY_ID, ...);
clientProperties.setProperty(AWS_SECRET_ACCESS_KEY, ...);
clientProperties.setProperty(AWS_REGION, ...);
stream.sinkTo(DynamoDbSink.<T>builder()
.setTableName(tableName)
.setPartitionKey(partitionKey)
.setSortKey(sortKey)
.setDynamoDbClientProperties(clientProperties)
.build());
[1] https://github.com/apache/flink-connector-elasticsearch
[2] https://github.com/apache/flink/pull/18518
[3] https://issues.apache.org/jira/browse/FLINK-4582
[4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java
[5] https://issues.apache.org/jira/browse/FLINK-24438
Versioning Strategy
The flink-connector-dynamodb
version will be independent of Flink. We will follow the same versioning strategy as Flink in terms of feature freeze windows, release candidates and branching/tagging. We will publish a Flink support matrix in the connector README and also update Flink documentation to reference supported connectors. The initial release of flink-connector-dynamodb
will target 1.0.0
and support Flink 1.15.x and 1.16.x (if available).
Compatibility, Deprecation, and Migration Plan
The connectors are compatible with Amazon DynamoDB. With respect to Flink, this is a new feature, no compatibility, deprecation, and migration plan is expected.
Test Plan
We will add the following tests:
- Unit test
- Integration tests that perform end to end tests against a DynamoDB localstack container
- End to end tests that hit the real Amazon DynamoDB service. These tests will be enabled when credentials are defined.
Rejected Alternatives
We considered creating the connector in a third party repository. This approach was rejected since the native Apache Flink repositories provide the best experience for Flink users.