Statusrejected
Reasonconcerns on exposing it as public
Discussion threadhttps://lists.apache.org/thread/lsdmc2zvqddrm5nvcdh6y2qy0qqgzj7v
Vote threadN/A
JIRAhttps://issues.apache.org/jira/browse/FLINK-27405
ReleaseN/A

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-27 source introduced SourceCoordinator and SourceReader, where coordinator runs on job manager and parallel readers run on task managers. SourceCoordinatorContext is created to facilitate the communication between coordinator and reader.

SourceCoordinatorContext can potentially be useful to other operators for communications between operator coordinator and subtasks. E.g., in the shuffling support of Flink Iceberg sink [1], shuffle subtasks can calculate local traffic statistics and send local statistics to shuffle coordinator to calculate globally aggregated statistics. Shuffle coordinator can send globally aggregated statistics to shuffle subtasks for making shuffling decision.(A separate proposal for shuffling support of Flink Iceberg sink [1] will be submitted later. For more details about how to use the CoordinatorContextBase by shuffling, please check the appendix part.)

Public Interfaces

BaseCoordinatorContext and CoordinatorExecutorThreadFactory are used internally. No change to public interfaces. 

Proposed Changes



  1. Extract most of the SourceCoordinatorContext code to an abstract CoordinatorContextBase class with the following methods moved over.
protected <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage);

public void runInCoordinatorThread(Runnable runnable);

public void runInEventLoop(
            final ThrowingRunnable<Throwable> action,
            final String actionName,
            final Object... actionNameFormatParameters);

public void sendEventToOperator(int subtaskId, OperatorEvent event);
                        
public void subtaskReady(OperatorCoordinator.SubtaskGateway gateway);

public void subtaskNotReady(int subtaskIndex);      
      

     2.  Extract private class CoordinatorExecutorThreadFactory from SourceCoordinator to a public class.

Compatibility, Deprecation, and Migration Plan

This FLIP doesn’t introduce any incompatible change. There is no need for deprecation and migration.

Test Plan

We will use unit test to make sure the original function is not affected.

Rejected Alternatives

No

References

  1. https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo


Appendix

Where to use CoordinatorContextBase

In Flink Iceberg sink shuffling support [1], shuffling subtasks will use local traffic to get traffic statistics and then send it to shuffling coordinator to aggregate. Shuffling coordinator will calculate global statistics from subtasks local statistics and send it back to subtasks for making further shuffling distribution decision about how to send data to sink subtasks. At here the communication from shuffling coordinator to shuffling subtasks needs the RPC call in CoordinatorContextBase.


How to use CoordinatorContextBase in next step

ShuffleCoordinatorContext will extends from CoordinatorContextBase to reuse the RPC call logic

public class ShuffleCoordinatorContext extends CoordinatorContextBase {

  void sendDataStatisticsToSubtasks(DataStatistics dataStatistics) {
     callInCoordinatorThread(
         () -> {
           ...
         }, msg);
   }
   ...
}


ShuffleCoordinator will implement OperatorCoordinator interface and take ShuffleCoordinatorContext as input argument for its constructor.

public class ShuffleCoordinator implements OperatorCoordinator {

...

public ShuffleCoordinator(String operatorName,
                          ExecutorService coordinatorExecutor,
                          ShuffleCoordinatorContext context);
...
}


ShuffleOperator will extend from AbstractStreamOperator and implement OneInputStreamOperator and OperatorEventHandler.

public class ShuffleOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, OperatorEventHandler {
    ...
}





  • No labels

2 Comments

  1. public void subtaskReady(OperatorCoordinator.SubtaskGateway gateway); public void subtaskNotReady(int subtaskIndex);
    I think these are called 
    attemptReady and attemptFailed?
  2. runInEventLoop
    This method is part of SourceCoordinator, not SourceCoordinatorContext.