Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-td44146.html

JIRA:

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Add two interfaces to ExecutionConfig
    • setManagedMemoryUseCaseWeights
    • getManagedMemoryUseCaseWeights
  • Introduce class MemoryUseCase for argument/return type of the new interfaces

See Calculate Fractions for Use Cases for details.

In addition, this FLIP contains changes to existing public involving configuration options for python use cases. See Compatibility, Deprecation, and Migration Plan for details.

Proposed Changes

...

To be specific, the proposed interface is as follows.

@Public
public class ExecutionConfig {
    // ...
    public ExecutionConfig setManagedMemoryUseCaseWeights(Map<MemoryUseCase, Integer> weights);
    public Map<MemoryUseCase, Integer> getManagedMemoryUseCaseWeights();
}

public enum MemoryUseCase {
    BATCH_OP,
    ROCKSDB,
    PYTHON;

    public final Scope scope;

    public enum Scope {
        SLOT,
        OP
    }
}

If not explicitly specified, the following weights will be used by default.

{
    BATCH_OP : 70,
    ROCKSDB : 70,
    PYTHON : 30
}

The default weights should result in the same behavior as before in existing scenarios (without python operators). All managed memory should be used for RocksDB state backend for streaming jobs, since it’s the only occurring use case with non zero weight. Same for batch operators in batch jobs.

...

To be specific, we propose to replace Transformation.setManagedMemoryWeight with the following interface.

@Internal
public class Transformation {
    // ...
    // replacing: public void setManagedMemoryWeight(int managedMemoryWeight);
    public void declareManagedMemoryUseCase(MemoryUseCase memoryUseCase, int operatorWeight);
}

operatorWeight will be used for further computing operator fractions for per-op use cases, while ignored for per-slot use cases.

...

The calculated fractions are set to StreamConfigs as a map. Keys of the map are use cases. For a per-slot use case, the value is the fraction for the use case, which is shared by all operators in the slot. For a per-op use case, the value is the operator’s fraction for the use case, which is dedicated for the operator.

@Internal
public class StreamConfig {
    // ...
    // replacing: public void setManagedMemoryFraction(double managedMemFraction);
    public void setManagedMemoryFractions(Map<MemoryUseCase, Double> managedMemoryFractions);

    // replacing: public double getManagedMemoryFraction();
    public Map<MemoryUseCase, Double> getManagedMemoryFractions();
}

Extensibility

The proposed design is extensible to future managed memory use cases. With the introduction of enum class MemoryUseCase and usage of maps for passing weights and fractions, adding a new use case should be as simple as adding a new value to the enum.

...