Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15776

...

Release
ReasonObsoleted by FLINK-143


Motivation

While implementing JDBC exactly once sink I found that the current abstractions (TwoPhaseCommitSinkFunction) don’t suit this use case. Having a requirement to avoid code duplication, I propose a new abstraction, with the following goals in mind:

  1. accommodate the needs of the existing Kafka sinks
  2. accommodate the needs of the new JDBC sink:
    1. commits are retried in case of transient failures instead of failing the job
    2. rollbacks are retried
    3. need to distinguish between transactions started during this run and restored from the state; ignore commit failures (with reason “unknown”) for the latter; this is a consequence of a lack of timeouts 
    4. when committing a group of transactions: an option to stop commits as soon as one failed; otherwise consistency can be violated (if the failure was transient then failed commit and all the further commits will be retried later)
    5. transaction timeouts aren’t used to ignore commit failures, as most DBs don’t support them
    6. state will probably need to include all to-commit transactions (as union list)
    7. minor API changes required
  3. accommodate the needs of other 2PC-sinks in future; these could be existing file sink, WAL; or potential DynamoDb, pulsar
  4. and non-sinks (see this question)
  5. batch jobs support in which sinks may not be running at the time when the job finishes and pre-committed checkpoints need to be committed
  6. improve testability; currently, TwoPhaseCommitSinkFunction requires a lot of mocking

...

This enables customization of various aspects independently and finer grained testing. 

Extracting 2PC Resource also allows to run it not as a Sink (might be needed for a batch jobs to commit final pre-committed transactions when Tasks are not running anymore).


Serialization can be viewed as implementation detail of StateHandler. Though API to build it or some default implementation should be provided.

...