Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15776

...

Release
ReasonObsoleted by FLINK-143


Motivation

While implementing JDBC exactly once sink I found that the current abstractions (TwoPhaseCommitSinkFunction) don’t suit this use case. Having a requirement to avoid code duplication, I propose a new abstraction, with the following goals in mind:

  1. accommodate the needs of the existing Kafka sinks
  2. accommodate the needs of the new JDBC sink:
    1. commits are retried in case of transient failures instead of failing the job
    2. rollbacks are retried
    3. need to distinguish between transactions started during this run and restored from the state; ignore commit failures (with reason “unknown”) for the latter; this is a consequence of a lack of timeouts 
    4. when committing a group of transactions: an option to stop commits as soon as one failed; otherwise consistency can be violated (if the failure was transient then failed commit and all the further commits will be retried later)
    5. transaction timeouts aren’t used to ignore commit failures, as most DBs don’t support them
    6. state will probably need to include all to-commit transactions (as union list)
    7. minor API changes required
  3. accommodate the needs of other 2PC-sinks in future; these could be existing file sink, WAL; or potential DynamoDb, pulsar
  4. and non-sinks (see this question)
  5. batch jobs support in which sinks may not be running at the time when the job finishes and pre-committed checkpoints need to be committed
  6. improve testability; currently, TwoPhaseCommitSinkFunction requires a lot of mocking

Public Interfaces

All new abstractions (see below) should eventually become public (@Public or @PublicEvolving) to allow extension for different use cases.

Proposed Changes

Introduce a new set of abstractions to interact with two-phase commit systems. The main idea is a more fine-grained separation of concerns:

...

This enables customization of various aspects independently and finer grained testing. 

Extracting 2PC Resource also allows to run it not as a Sink (might be needed for a batch jobs to commit final pre-committed transactions when Tasks are not running anymore).


Serialization can be viewed as implementation detail of StateHandler. Though API to build it or some default implementation should be provided.

FlinkKafkaProducer state

Currently, transactions are effectively grouped by subtask that produced them.

...

In general, changes to existing state should be minimized.

Write-Ahead-Log sink over 2PC

Current WAL sink implementation has some similarities with 2PC and therefore can reuse parts of it. However, it is much simpler in sense that it has nothing to rollback: there are no external transactions and current state handle is discarded automatically if not closed properly.

...

- if checkpoint was already taken then NEW data is discarded

Naming

  • Epoch vs TwoPhaseCommit vs Transaction
    • Transaction is not necessarily serial; and for files there is no transaction
    • TwoPhaseCommit is long and  ambiguous (internally 2PC is used for checkpoints)
    • Epoch is too general

Diagram

Compatibility, Deprecation, and Migration Plan

  • TwoPhaseCommitSinkFunction can be either
    • ported to a new abstraction
    • left intact, having a new abstraction in a “stealth mode”, for some “incubation period”
    • deprecated and eventually removed
  • depending on TwoPhaseCommitSinkFunction, KafkaProducerSinks can be either left intact or rebased to a new function
    • state compatibility needs to be provided for KafkaTransactionState (depending on de/serialization)

Test Plan

Existing Kafka tests and new JDBC tests shouldn’t be affected and should cover the changes.

Rejected Alternatives

  1. Use TwoPhaseCommitSinkFunction with minimal modifications and losing some functionality from JDBC sink: 
    1. adds technical debt by having more code based on the wrong abstraction
    2. cuts the JDBC functionality
    3. still requires to (slightly) change the existing abstraction
  2. Use two different implementations for now; gather some feedback; probably, design other 2pc sinks to understand their needs; provide the abstraction based on this.
    1. duplication of logic in the intermediate state
    2. lack of confidence that it won’t become an eventual state?