Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-27 source introduced SourceCoordinator and SourceReader, where coordinator runs on job manager and parallel readers run on task managers. SourceCoordinatorContext is created to facilitate the communication between coordinator and reader.
SourceCoordinatorContext can potentially be useful to other operators for communications between operator coordinator and subtasks. E.g., in the shuffling support of Flink Iceberg sink [1], shuffle subtasks can calculate local traffic statistics and send local statistics to shuffle coordinator to calculate globally aggregated statistics. Shuffle coordinator can send globally aggregated statistics to shuffle subtasks for making shuffling decision.(A separate proposal for shuffling support of Flink Iceberg sink [1] will be submitted later. For more details about how to use the CoordinatorContextBase by shuffling, please check the appendix part.)
Public Interfaces
BaseCoordinatorContext and CoordinatorExecutorThreadFactory are used internally. No change to public interfaces.
Proposed Changes
- Extract most of the SourceCoordinatorContext code to an abstract CoordinatorContextBase class with the following methods moved over.
protected <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage); public void runInCoordinatorThread(Runnable runnable); public void runInEventLoop( final ThrowingRunnable<Throwable> action, final String actionName, final Object... actionNameFormatParameters); public void sendEventToOperator(int subtaskId, OperatorEvent event); public void subtaskReady(OperatorCoordinator.SubtaskGateway gateway); public void subtaskNotReady(int subtaskIndex);
2. Extract private class CoordinatorExecutorThreadFactory from SourceCoordinator to a public class.
Compatibility, Deprecation, and Migration Plan
This FLIP doesn’t introduce any incompatible change. There is no need for deprecation and migration.
Test Plan
We will use unit test to make sure the original function is not affected.
Rejected Alternatives
No
References
Appendix
Where to use CoordinatorContextBase
In Flink Iceberg sink shuffling support [1], shuffling subtasks will use local traffic to get traffic statistics and then send it to shuffling coordinator to aggregate. Shuffling coordinator will calculate global statistics from subtasks local statistics and send it back to subtasks for making further shuffling distribution decision about how to send data to sink subtasks. At here the communication from shuffling coordinator to shuffling subtasks needs the RPC call in CoordinatorContextBase.
How to use CoordinatorContextBase in next step
ShuffleCoordinatorContext will extends from CoordinatorContextBase to reuse the RPC call logic
public class ShuffleCoordinatorContext extends CoordinatorContextBase { void sendDataStatisticsToSubtasks(DataStatistics dataStatistics) { callInCoordinatorThread( () -> { ... }, msg); } ... }
ShuffleCoordinator will implement OperatorCoordinator interface and take ShuffleCoordinatorContext as input argument for its constructor.
public class ShuffleCoordinator implements OperatorCoordinator { ... public ShuffleCoordinator(String operatorName, ExecutorService coordinatorExecutor, ShuffleCoordinatorContext context); ... }
ShuffleOperator will extend from AbstractStreamOperator and implement OneInputStreamOperator and OperatorEventHandler.
public class ShuffleOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, OperatorEventHandler { ... }
2 Comments
Maximilian Michels
Maximilian Michels