Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4256

...

Release1.9

...


Phase 1 (Released in 1.3)

...

With this we end up with the following pseudo-code for the core backtracking logic, which from a given task backtracks upstream towards blocking result partitions, and from there downstream to all consumers.:


// entry point for failover strategies

onTaskFailure(task):

    containingRegion = determineFailoverRegion(task)

    failoverRegion(containingRegion)

// alternatively return collection of vertices

private failoverRegion(containingRegionregion):

    if (!hasRegionBeenScheduled(containingRegionregion)) {

       // nothing to do

       return;

    }

    resultPartitions = determineNeededResultPartitions(containingRegion)

    for (resultPartition : resultPartitions) {

       if (isPartitionStillAvailable(resultPartition)) {

           // data still available, so in theory don't have to do anything

           // exact details depend on shuffle service implementation and

           // whether we can consume data from a TM without

           // a task being deployed on it

       } else {

           producerRegion = getProducerRegion(resultPartition)

           failoverRegion(producerRegion)

       }

    }

    reschedule(containingRegion)

    // restart all consumer regions that could be affected by this failover

    // make behavior configurable?

    consumersRegions = getConsumersForRegion(containingRegion)

    for (consumerRegion : consumerRegions) {

       failoverRegion(consumerRegion)

    }


Partition life-cycle management

...