Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...


Vote thread

...


JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-17688

...

Release1.12


Table of Contents

Motivation

...

The FlinkKinesisConsumer is configured via Java Properties allowing arbitrary properties to be supported without breaking existing applications. The properties object is passed to the constructor of the consumer. The following properties will be added to support the two consumption options. Property keys are inline with current Flink naming conventions and will be exposed via constants:

KeyDescriptionTypeDefault
flink.stream.recordpublisherSelect RecordPublisher mechanism (efo|polling)stringpolling
flink.stream.efo.consumernameThe name of the consumer to register with KDSstring*see below
flink.stream.
registerstreamconsumer
efo.
maxretriesMaximum number of attempts after recoverable exceptionint10
registrationDetermines how and when consumer de-/registration is performed (lazy|eager|none)stringlazy
flink.stream.
registerstreamconsumer
efo.
backoff
consumerarn.
baseThe base
<stream-name>The Consumer ARN for a given stream namestring
flink.stream.registerstreamconsumer.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.registerstreamconsumer.backoff.baseThe base backoff time between attemptslong200
flink.stream.registerstreamconsumer.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.registerstreamconsumer.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.stream.deregisterstreamconsumer.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.deregisterstreamconsumer.backoff.baseThe base backoff time between attemptslong200
flink.stream.deregisterstreamconsumer.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.deregisterstreamconsumer.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.stream.liststreamconsumers.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.liststreamconsumers.backoff.baseThe base backoff time between attemptslong200
flink.stream.liststreamconsumers.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.liststreamconsumers.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.shard.subscribetoshard.maxretriesMaximum number of attempts after recoverable exceptionint5
flink.shard.subscribetoshard.backoff.baseThe base backoff time between attemptslong1000
flink.shard.subscribetoshard.backoff.maxThe maximum backoff time between attemptslong2000
flink.shard.subscribetoshard.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5

 * No default. Provided by customer application

...

A consumer name must be provided by the user application within the configuration property map. The presence of this value will be validated when EFO consumption is enabled. The consumer name should be unique for each independent consumer of a given stream; reusing an active consumer when calling SubscribeToShard will result in active subscriptions being terminated. Therefore the customer would need to set this configuration carefully should they require multiple Flink EFO consumers (applications), reading from the same stream.

...

Registration

...

/De-registration Configuration

Flink does not yet provide a single entry and exit point in which to perform consumer registration and de-registration. This leaves the following strategies, all of which will be offered via different configuration options:

  • Registration
    • Lazy: Performed during task startup (default option)
      • Pro: Registration is performed from within the tasks running on Flink Task Managers during startup
      • Con: Parallel tasks competing to register may result in an increased start-up time
    • Eager: Performed when the FlinkKinesisConsumer is constructed during application main method
      • Pro: Registration is performed once per Flink job
      • Con: Client execution environment may not have access to external resources
    • None: Performed externally by the user from the AWS CLI (user supplies consumer ARN to connector)
      • Pro: Application start-up time will be reduced
      • Con: Additional configuration to supply and up-front setup to be performed by customer
  • De-registration
    • Lazy/Eager: Performed during task tear-down (default option)
      • Pro: De-registration is performed from within the Flink cluster
      • Con: Parallel tasks competing to de-register may result in an increased tear-down time
    • None: Performed externally by the user (user supplies consumer ARN to connector)
      • Pro: Application tear-down time will be reduced
      • Con: Consumer is still registered once the job ends and user would need to manually de-register to avoid additional cost

Properties will be validated, and de-/registration will be performed based on the supplied configuration:

flink.stream.efo.registrarflink.stream.efo.consumerarn.<stream-name>RegistrationDeregistration
lazyNot requiredtask startuptask tear-down
eagerNot requiredFlinkKinesisConsumer constructortask tear-down
noneRequirednonenone

Stream Consumer Registration

The user will select their registration strategy based on the application they are deploying. Generally speaking, applications with high levels of parallelism would benefit from an eager/none registration to reduce quota contention and application startup/tear-down time.

Stream consumer registration can be invoked:

  • Lazy: During task start-up, in a multi threaded/process environment. RegisterStreamConsumer will be invoked from tasks running on Task Manager slots during startup. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to register the consumer. The first task to invoke the service will win, therefore tasks will poll ListStreamConsumers and attempt to register when not found. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time

...

  • will be calculated using the configuration values specified when creating the consumer.
  • Eager: Within the FlinkKinesisConsumer constructor from a single threaded environment. The Consumer ARN(s) will be passed into the tasks via the connector configuration.
  • None: The user application will supply the Consumer ARN(s) with the connector properties when registration is disabled.

Image Added

Ingestion

Most of the existing functionality within the KinesisDataFetcher and ShardConsumer can be reused. The code that consumes records from KDS will be abstracted out to support EFO in addition to the existing Polling mechanism.

In the current implementation a ShardConsumer is responsible for pulling data from KDS and passing it onto the Flink application, via the KinesisDataFetcher. A shard consumer is comprised of the following components:

...

Sequence Diagrams



Tear Down

De-registration is enabled by default or when setting registration to lazy or eager within the connector properties. DeregisterStreamConsumer will be invoked from tasks running on Task Manager slots during tear down. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to deregister the consumer. The first task to invoke the service will win, therefore calls will poll ListStreamConsumers and attempt to deregister when the consumer is active. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.

Image RemovedImage Added

Error Handling

...

The following phased approach will be taken to deliver EFO support:

  •  [Documentation] FLIP created, reviewed and approvedcreated, reviewed and approved
  •  [Jira sub task/Pull Request] Improved test coverage for existing Polling consumption implementation 
  •  [Jira sub task/Pull Request] Refactor existing code support RecordPublisher. A single PollingRecordPubisher will be included. Behaviour of consumer will be unchanged. Include relevant documentation.relevant documentation.
  •  [Jira sub task/Pull Request] Add AWS SDK v2.x dependency along with FanOutKinesisProxy
  •  [Jira sub task/Pull Request] Configuration validation and deserialisation for Fan Out consumers
  •  [Jira sub task/Pull Request] Stream consumer registration and de-registration strategies
  •  [Jira sub task/Pull Request] Add FanOutRecordPublisher to add EFO support. Include relevant documentation and end to end tests
  •  [Sample Applications] Publish sample applications and additional documentation.

...

EFO requires AWS SDK v2.x in order to utilise the HTTP/2 and invoke SubscribeToShard. The existing FlinkKinesisConsumer implementation is using AWS SDK v1.x. This design proposes using AWS SDK v1.x and v2.x side by side [5]. It is not currently possible to remove AWS SDK v1.x from the Flink Kinesis Connectors project due to Kinesis Producer Library (KPL) and DynamoDBStreamConsumer not yet supporting AWS v2.x. Therefore to minimise change and reduce risk AWS SDK v2.x will only be used by the Fan Out Kinesis Proxy.

Register Stream Consumer(s) During Connector Initialisation

Stream consumer registration was considered to be performed in the FlinkKinesisConsumer constructor, eliminating the competing request issue. This approach could end up executing code within client application. Flink aim to keep as much code server side as possible. A potential issue is that the client may not have access to the KDS, resulting in a non-recoverable errorAWS v2.x. Therefore to minimise change and reduce risk AWS SDK v2.x will only be used by the Fan Out Kinesis Proxy.

Waiting for the refactored Source Interfaces

...