Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

Discussion thread

JIRA:

...

Page properties



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This FLIP introduces new public APIs.

  • Add two interfaces to ExecutionConfig
    • setManagedMemoryUseCaseWeights
    • getManagedMemoryUseCaseWeights
  • Introduce class MemoryUseCase for argument/return type of the new interfaces

See Calculate Fractions  configuration option `taskmanager.memory.managed.consumer-weights`. See Configure Weights for Use Cases for details.

In addition, this FLIP contains changes to existing public involving configuration options for python use cases. See Compatibility, Deprecation, and Migration Plan for details.

Proposed Changes

This proposal extends the fraction based managed memory to work with various use cases at same time. Managed memory is first distributed to all use cases. Then for per-op use cases, the distributed memory is further distributed to the operators. The resulting memory distribution plan will be converted into fractions of the slot’s total managed memory, which can be accessed at execution time for allocating/reserving memory accordingly.

Calculate Fractions for Use Cases

...

...

During the compiling stage, StreamJobGraphGenerator should first calculate a fraction for each managed memory use case, indicating what proportion of the slot’s managed memory each use case should take.

...

  • f_u is the fraction of use case u,
  • w_u is the weight of use case u, and
  • D is the set of all use cases that are declared by operators in the slot.

Configure Weights for Use Cases
Anchor
config
config

We propose to configure weights of managed memory use cases via ExecutionConfig. This allows Flink to execute jobs of different sets of weights with the same cluster configuration.The weights can be configured as a map, where keys are the use cases and values are their managed memory weights. configuration options. 

  • `taskmanager.memory.managed.consumer-weights`: A map of managed memory use cases and their integer weights. ATM, valid use cases are:
    • DATAPROC : This is a combined configuration option for RocksDB and batch operators, which are guaranteed not mixed.
    • PYTHON

Exposing the weights through configuration options will allow users to tune the weights easily without touching the codes. The limitation it that, users cannot specify different weights for jobs running in the same Flink cluster, which should be fair enough as the first step. If later we see that specifying different weights for jobs is needed, we can expose the weights via ExecutionConfig, so that user can overwrite the weights for a specific job.

To be specific, the proposed interface is as follows.

...

...

If not explicitly specified, the following weights will be used by default.

{
   

BATCH_OP : 70,
    ROCKSDB

DATAPROC : 70,
    PYTHON : 30
}

The default weights should result in the same behavior as before in existing scenarios (without python operators). All managed memory should be used for RocksDB state backend for streaming jobs, since it’s the only occurring use case with non zero weight. Same for batch operators in batch jobs.

...

To be specific, we propose to replace Transformation.setManagedMemoryWeight with the following interface.

@Internal
public class Transformation {
    // ...
    // replacing: public void setManagedMemoryWeight(int managedMemoryWeight);
    public void declareManagedMemoryUseCase(MemoryUseCase memoryUseCase, int operatorWeight);
}

public enum MemoryUseCase {
    BATCH_OP,
    ROCKSDB,
    PYTHON;

    public final Scope scope;

    public enum Scope {
        SLOT,
        OP
    }
}

operatorWeight will be used for further computing operator fractions for per-op use cases, while ignored for per-slot use cases.

...

The calculated fractions are set to StreamConfigs as a map. Keys of the map are use cases. For a per-slot use case, the value is the fraction for the use case, which is shared by all operators in the slot. For a per-op use case, the value is the operator’s fraction for the use case, which is dedicated for the operator.

@Internal
public class StreamConfig {
    // ...
    // replacing: public void setManagedMemoryFraction(double managedMemFraction);
    public void setManagedMemoryFractions(Map<MemoryUseCase, Double> managedMemoryFractions);

    // replacing: public double getManagedMemoryFraction();
    public Map<MemoryUseCase, Double> getManagedMemoryFractions();
}

Extensibility

The proposed design is extensible to future managed memory use cases. With the introduction of enum class MemoryUseCase and usage of maps for passing weights and fractions, adding a new use case should be as simple as adding a new value to the enum.

Implementation Steps

...

  1. Introduce the memory weights configuration option.
  2. Implement the new fraction calculation logic. This also means migrating the batch operator use cases.
  3. Make RocksDB respect the calculated fraction.
  4. Make python processes uses memory manager's share resources, with respect to the calculated fraction.

Compatibility, Deprecation, and Migration Plan
Anchor
compatibility
compatibility

...

  • Changes to public APIs are all additions. No modifications or removals.
  • The changes should not require any configuration changes for use cases without python UDFs.
  • The changes do not affect DataSet use cases, where fractions are configured, calculated and passed to task execution in a completely different code path.

Test Plan

TBDThe changes introduced by this FLIP should be verified with unit tests.

Rejected Alternatives

The alternative described in this section is not yet rejected. It is still open to discussion.

This FLIP proposes to first distribute managed memory to use cases, according to their configured weights and whether they are declared by operators. Then further distribute memory for per-op use cases.

...

  • Users do not need to understand how operators share slots.
  • Changing of slot sharing groups will not require changing of memory weights.
  • For jobs splited split into multiple slot sharing groups, it allows independent memory distribution over different slots.

...

  • It requires defining weights consistently across various use cases.
    • It would be hard to decide the relative relations between different use cases, such as “if rocksdb uses 100MB, then the python process should use 50MB”.
  • Memory configuration for different use cases are closely coupled.
    • It would be hard for memory tuning, if changing the weight of one operator for one use case will result in memory changes for all other operators and use cases.

IMO, the The benefits of the alternative approach are strongly based on the assumption that operator weights are properly configured, which is unlikely. Therefore, I personally would be in favor of we have chosen the proposed approach, which might be less optimal but easier to use.

...