Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Hide the choice of specific implementation of ApplicationRunner via configuration, not in source code. 
  2. Define a unified API to allow user to specify describe the processing logic in high- and low-level API in all environment (I.e. all ApplicationRunners) w/ all lifecycle methods  

  3. Expand the ApplicationRunner to run Define a set of standard application execution methods to deploy both low- and high-level APIs application in YARN and standalone environments. 

  4. Define a standard processor life-cycle aware API to allow user’s customized logic to be injected before and after start/stop the processors in both YARN and standalone environments. 

Note that we need to define the following concepts clearly: 

  1. Application’s execution methods in runtime are the functions to ApplicationRunner defines a set of standard execution methods to change the deployment status of an application in runtime instance (I.e. run/status/kill/waitForFinish) ApplicationRunner’s execution methods are the deployment environment specific implementation to the above API methods 
  2. Application’s processor lifecycle aware methods are the user-defined functions to inject customized logic to be executed before or after we start or stop the stream processing logic defined by in the user application (I.e. beforeStart/afterStart/beforeStop/afterStop are called when we start/stop the StreamProcessors in local host) 

...

  1. Define a unified API ApplicationBase as a single entry point for users to implement all user-customized logic, for both high-level API and low-level API 
    1. User implements a single describe() method to implement all user processing logic before creating the runtime application instance 

      1. Sub-classes StreamApplication and TaskApplication provide specific describe(methods for high-level API and low-level API, respectively 

  2. Define a unified API class ApplicationSpecApplicationDescriptor to contain 

    1. High- and low-level processing logic defined via ApplicationBase.describe() . Sub-class StreamAppDescriptor and TaskAppDescriptor are used for high- and low-level APIs respectively.

    2. User User implemented ProcessorLifecycleListener class that includes customized logic to be invoked before and after starting/stopping the StreamProcessor(s) in the user application 

      1. Methods are beforeStart/afterStart/beforeStop/afterStop 

    3. Other used-defined objects in an application (e.g. configuration and context)

  3. Expand ApplicationRunner with a mandatory constructor with ApplicationDescriptor object as parameter

    1. An ApplicationRunner is now constructed with an ApplicationDescriptor as the parameter

      1. ApplicationDescriptor contains

    Define the class ApplicationRuntime to represent a runtime instance of a user application, with a set of standard execution methods to change the deployment status of the runtime application instance in different environments (I.e. YARN and standalone)

    1. A runtime instance of user application is defined as a bundle of both 

      1. ApplicationSpec that contains all user customized logic. 

      2. ApplicationRunner that deploys  deploys and runs the user code. This would be instantiated from the configuration, not exposed to user at all. 

      The set of standard execution methods for an ApplicationRuntime object includes run/kill/status/waitForFinish
      1.  

A high-level overview of the proposed changes is illustrated below: 
Image RemovedImage Added

Figure-1: high-level user programming model 

Image AddedImage Removed

Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication in LocalApplicationRunner as an example). 

...

  1. Defined a unified ApplicationBase interface for both high- and low-level APIs in different deployment environments. All user code is now implemented in one of the sub-classes (I.e. StreamApplicaiton or TaskApplication).  
    1. All processing logic is implemented in the standard describe() method in either StreamApplication or TaskApplication. 

    2. All user customized logic to start/stop contextual objects in their application process are in standard lifecycle listener methods (I.e. beforeStart/afterStart/beforeStop/afterStop) defined in ProcessorLifecycleListener. 

  2. Construction of ApplicationRunnerConstruction of ApplicationRuntime object is implemented by Samza framework code, which hides: 

    1. Choice of a specific ApplicationRunner for different environment via configuration 

    2. Association of a user application to the specific instance of ApplicationRunner  as the parameter to the constructor

    3. Initialization of user processing logic before ApplicationRunner executes the application  when constructing the ApplicationDescriptor object

    4. Invoking user-defined lifecycle listener methods when run/kill the application via ApplicationRunner in local process (I.e. LocalApplicationRunner*) 

      1. Note that RemoteApplicationRunner only submit the application w/o launching the StreamProcessors. Hence, lifecycle listener methods are not invoked in RemoteApplicationRunner. 

      2. Note this is also pending on one refactor item that we need to refactor LocalContainerRunner s.t. 

        1. It implements run/kill/status w/ proper async implementation 

        2. It launches StreamProcessor instead of directly running SamzaContainer 

...

The reason is: when deploying via RemoteApplicationRunner , in the NodeManagerYARN, we  we will run the managed main() method implemented by SamzaSamza in the NodeManager, which don’t have the ability to invoke customized constructor for user application. Hence, we expect a default constructor implemented by any user application. This is the same behavior as we expected today from any user implementing a high- or low-level application.  

...

  1. The user application class implements a default constructor w/o any parameters 

  2. Creation of ApplicationRuntimeApplicationRunner in main is using Samza provided methods (I.e. ApplicationRuntimesApplicationRunners.getApplicationRuntimegetApplicationRunner()) 

Simple code examples of high- and low-level API applications: 

Code Block
public class PageViewCounterExample implements StreamApplication {

  // local execution mode
  public static void main(String[] args) {
    CommandLine cmdLine = new CommandLine();
    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
    ApplicationRuntimeApplicationRunner appRuntimerunner = ApplicationRuntimesApplicationRunners.getApplicationRuntimegetApplicationRunner(ApplicationClassUtils.fromConfig(config), config);
    appRuntimerunner.startrun();
    appRuntimerunner.waitForFinish();
  }

  @Override
  public void describe(StreamApplicationSpecStreamAppDescriptor graphappDesc) {
      MessageStream<PageViewEvent> pageViewEvents = null;
      // TODO: replace "pageViewEventStream" with pveStreamDescriptor when SEP-14 is implemented
      pageViewEvents = graphappDesc.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
      OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
          graphappDesc.getOutputStream("pageViewEventPerMemberStream",
              KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));

      SupplierFunction<Integer> initialValue = () -> 0;
      FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
      pageViewEvents
          .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn,
              null, null)
              .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
              .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
          .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
          .sendTo(pageViewEventPerMemberStream);
  }
}

...

Code Block
public class TaskApplicationExample implements TaskApplication {

  public static void main(String[] args) {
    CommandLine cmdLine = new CommandLine();
    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
    ApplicationRuntimeApplicationRunner appRuntimerunner = ApplicationRuntimes.getApplicationRuntimegetApplicationRunner(new TaskApplicationExample(), config);
    appRuntimerunner.startrun();
    appRuntimerunner.waitForFinish();
  }

  @Override
  public void describe(TaskApplicationSpecTaskAppDescriptor appBuilderappDesc) {
    // add input and output streams
    //  appBuilder.addInputStreams(Collections.singletonList(TODO: replace "myinput"));
  with inputStreamDescriptor and "myoutput" with outputStreamDescriptor when SEP-14 is implemented
    appDesc.addInputStreams(Collections.singletonList("myinput"));
    appBuilderappDesc.addOutputStreams(Collections.singletonList("myoutput"));
    TableDescriptor td = new RocksDbTableDescriptor("mytable");
    appBuilderappDesc.addTables(Collections.singletonList(td));
    // create the task factory based on configuration
    appBuilderappDesc.setTaskFactory(TaskFactoryUtil.createTaskFactory(appBuilder.getConfig()));
  }

}

...

Code Block
public interface ApplicationBase<T extends ApplicationSpec>ApplicationDescriptor> { 
  void describe(T appSpecappDesc); 
} 

StreamApplicationextends ApplicationBase with a typed describe() method for high-level user application 

Code Block
public interface StreamApplication extends ApplicationBase<StreamApplicationSpec>ApplicationBase<StreamAppDescriptor> { 
}

TaskApplicationextends ApplicationBase with a typed describe() method to initialize the low-level user application 

Code Block
public interface TaskApplication extends ApplicationBase<TaskApplicationSpec>ApplicationBase<TaskAppDescriptor> { 
}

ProcessorLifecycleListenerdefines the unified processor lifecycle aware methods to allow users to inject customized logic before/after start/stop the StreamProcessor(s) in an application 

...

Code Block
public interface ProcessorLifecycleListener {
  /**
  default void* beforeStart() {} 
  default void afterStart() {} 
  default void beforeStop() {} User defined initialization before a StreamProcessor is started
   *
   * @param pc the {@link ProcessorContext} in which this callback method is invoked.
   */
  default void afterStopbeforeStart(ProcessorContext pc) {}

  
}

BSamza framework implemented runtime objects 

Samza framework generates two sets of runtime classes that are directly exposed to the user. One set of classes are ApplicationSpec that includes all user-defined logic and configuration for an application; the other set of classes are runnable application instances that are a bundle of the ApplicationSpec and the corresponding ApplicationRunner. 

ApplicationSpec classes  

ApplicationSpecthis is a base interface for both high- and low-level applications. 

/**
   * User defined callback after a StreamProcessor is started
   *
   * @param pc the {@link ProcessorContext} in which this callback method is invoked.
   */
  default void afterStart(ProcessorContext pc) {}

  /**
   * User defined callback before a StreamProcessor is stopped
   *
   * @param pc the {@link ProcessorContext} in which this callback method is invoked.
   */
  default void beforeStop(ProcessorContext pc) {}

  /**
   * User defined callback after a StreamProcessor is stopped
   *
   * @param pc the {@link ProcessorContext} in which this callback method is invoked.
   * @param t the error causing the stop of the StreamProcessor. null value of this parameter indicates a successful completion.
   */
  default void afterStop(ProcessorContext pc, Throwable t) {}
}


/**
 * The context for a StreamProcessor. This is a stub class, just include the method to identify the current StreamProcessor.
 *
 */
public interface ProcessorContext extends Serializable {
  String getProcessorId();
}


B
Samza framework implemented runtime objects 

Samza framework generates two sets of runtime classes that are directly exposed to the user. One set of classes are ApplicationDescriptor that includes all user-defined logic and configuration for an application; the other set of classes are ApplicationRunner class. 

ApplicationDescriptor classes  

ApplicationDescriptorthis is a base interface for both high- and low-level applications. 


Code Block
public interface ApplicationDescriptor<T extends ApplicationBase> { 
  /** 
   * Get the global unique application ID in the runtime process 
   * @return globally unique application ID 
   */ 
  String getGlobalAppId(); 
 
  /** 
   * Get the user defined {@link Config} 
   * @return config object 
   */ 
  Config getConfig(); 
 
  /** 
   * Sets the {@link ContextManager} for this application. 
   * <p> 
   * The provided {@link ContextManager} can be used to setup shared context between the operator functions 
   * within a task instance 
   * 
   * @param contextManager the {@link ContextManager} to use for the {@link StreamApplicationSpec} 
   * @return the {@link StreamApplicationSpec} with {@code contextManager} set as its {@link ContextManager} 
   */ 
  ApplicationDescriptor<T> withContextManager(ContextManager contextManager); 

Code Block
public interface ApplicationSpec<T extends ApplicationBase> { 
  /** 
   * GetSets the global unique application ID in the runtime processlifecycle listener for user customized logic before and after starting/stopping 
   * @returnStreamProcessors globallyin uniquethe application ID 
   */  
  StringApplicationDescriptor<T> getGlobalAppIdwithProcessorLifecycleListener(ProcessorLifecycleListener listener); 
 
  /** 
   * Get the user defined {@link Config} 
   * @return config object 
   */ 
  Config getConfig(} 

StreamAppDescriptorthis extends ApplicationDescriptor for a high-level application, including all methods to describe a high-level application in StreamGraph. 

Code Block
public interface StreamAppDescriptor extends ApplicationDescriptor<StreamApplication>, StreamGraph { 
} 


Task
AppDescriptorthis extends ApplicationDescriptor for a low-level application, including the user-defined TaskFactory and the corresponding list of input and output streams and tables. 

Code Block
public interface TaskAppDescriptor extends ApplicationSpec<TaskApplication> { 
 
  void setTaskFactory(TaskFactory factory); 
 
  /** 
   * Sets the {@link ContextManager} for this {@link ApplicationSpec}.// TODO: the following two interface methods depend on SEP-14
  void addInputStreams(List<InputStreamDescriptor> inputStreams);  
  void * <p>addOutputStreams(List<OutputStreamDescriptor> outputStreams); 
 
  *void addTables(List<TableDescriptor> tables); 
 
} 


ApplicationRunner
 classes 

ApplicationRunner 

This is an interface class that defines the standard execution methods to deploy an application. It is used by users and not intend to be implemented by users. 

Code Block
public interface ApplicationRunner {
  /**The provided {@link ContextManager} can be used to setup shared context between the operator functions 
   * within a task instance 
   * 
   * @paramStart contextManager the {@link ContextManager} to use fora runtime instance of the {@link StreamApplicationSpec} application
   */
   * @return the {@link StreamApplicationSpec} with {@code contextManager} set as its {@link ContextManager} void run();

  /**
   * Stop a runtime instance of the application
   */ 
  ApplicationSpec<T>void withContextManagerkill(ContextManager contextManager); 


  /** 
   * SetsGet the lifecycle listener for user customized logic before and after starting/stopping {@link ApplicationStatus} of a runtime instance of the application
   * StreamProcessors in@return the runtime status of the application 
   */  
  ApplicationSpec<T>ApplicationStatus withProcessorLifecycleListenerstatus(ProcessorLifecycleListener listener);

 
} 

StreamApplicationSpecthis extends ApplicationSpec for a high-level application, including all methods to describe a high-level application in StreamGraph. 

Code Block
public interface StreamApplicationSpec extends ApplicationSpec<StreamApplication>, StreamGraph { 
} 

...

 /**
   * Wait the runtime instance of the application to complete.
   * This method will block until the application completes.
   */
  void waitForFinish();

  /**
   * Wait the runtime instance of the application to complete with a {@code timeout}
   *
   * @param timeout the time to block to wait for the application to complete
   * @return true if the application completes within timeout; false otherwise
   */
  boolean waitForFinish(Duration timeout);

  /**
   * Method to add a set of customized {@link MetricsReporter}s in the application runtime instance
   *
   * @param metricsReporters the map of customized {@link MetricsReporter}s objects to be used
   */
  void addMetricsReporters(Map<String, MetricsReporter> metricsReporters);

}


ApplicationRunners 

Samza framework provided factory class to allow instantiation of ApplicationRunner for user applications. 

Code Block
public class ApplicationRunners {

  private ApplicationRunners() {

  }

  public static final ApplicationRunner getApplicationRunner(ApplicationBase userApp, Config config) {
    if (userApp instanceof StreamApplication) {
      return getRunner(new StreamAppDescriptorImpl((StreamApplication) userApp, config));
    }
    if (userApp instanceof TaskApplication) {
      return getRunner(new TaskAppDescriptorImpl((TaskApplication) userApp, config));
    }
    throw new IllegalArgumentException(String.format("User application instance has to be either StreamApplicationFactory or TaskApplicationFactory. "
        + "Invalid userApp class %s.", userApp.getClass().getName()));
  }

...
  /**
   * Static method to get the {@link ApplicationRunner}
   *
   * @param appDesc  {@link AppDescriptorImpl} object that contains all user-customized application logic and configuration
   * @return  the configure-driven {@link ApplicationRunner} to run the user-defined stream applications
   */
  public static ApplicationRunner getRunner(AppDescriptorImpl appDesc) {
    AppRunnerConfig appRunnerCfg = new AppRunnerConfig(appDesc.getConfig());
    try {
      Class<?> runnerClass = Class.forName(appRunnerCfg.getAppRunnerClass());
      if (ApplicationRunner.class.isAssignableFrom(runnerClass)) {
        // mandate AppDescritorImpl as the parameter to constructor
        Constructor<?> constructor = runnerClass.getConstructor(AppDescriptorImpl.class); // *sigh*
        return (ApplicationRunner) constructor.newInstance(appDesc);
      }
    } catch (Exception e) {
      throw new ConfigException(String.format("Problem in loading ApplicationRunner class %s",
          appRunnerCfg.getAppRunnerClass()), e);
    }
Code Block
public interface TaskApplicationSpec extends ApplicationSpec<TaskApplication> { 
 
  void setTaskFactory(TaskFactory factory); 
 
  void addInputStreams(List<String> inputStreams); 
 
  void addOutputStreams(List<String> outputStreams); 
 
  void addTables(List<TableDescriptor> tables); 
 
} 

...

ApplicationRuntime 

This is an interface class that defines the life-cycle methods available in an runtime instance of application. It is used by users and not intend to be implemented by users. 

Code Block
public interface ApplicationRuntime { 
  /** 
   * Start an application 
   */ 
  void start(); 
 
  /** 
   * Stop an application 
   */ 
  void stop(); 
 
  /** 
   * Get the {@link ApplicationStatus} of an application 
   * @return the runtime status of the application 
   */ 
  ApplicationStatus status(); 
 
  /** 
   * Wait the application to complete. 
   * This method will block until the application completes. 
   */ 
  void waitForFinish(); 
 
  /** 
   * Wait the application to complete with a {@code timeout} 
   *  
  * @param timeout the time to block to wait for the application to complete 
   * @return true if the application completes within timeout; false otherwise 
   */ 
  boolean waitForFinish(Duration timeout); 
 
  /** 
   * Method to add a set of customized {@link MetricsReporter}s in the application 
   * 
   * @param metricsReporters the map of customized {@link MetricsReporter}s objects to be used 
   */ 
  void addMetricsReporters(Map<String, MetricsReporter> metricsReporters); 
 
} 

ApplicationRuntimes 

Samza framework provided factory class to allow instantiation of ApplicationRuntime for user applications. 

Code Block
public class ApplicationRuntimes { 
 
  private ApplicationRuntimes() { 
 
  } 
 
  public static final ApplicationRuntime getApplicationRuntime(LifecycleAwareApplication userApp, Config config) { 
    if (userApp instanceof StreamApplication) { 
      return new AppRuntimeImpl(new StreamAppSpecImpl((StreamApplication) userApp, config)); 
    } 
    if (userApp instanceof TaskApplication) { 
      return new AppRuntimeImpl(new TaskAppSpecImpl((TaskApplication) userApp, config)); 
    } 
    throw new IllegalArgumentExceptionConfigException(String.format("User
 application instance has to be either StreamApplicationFactory or TaskApplicationFactory. "Class 
%s does not extend ApplicationRunner properly",
   + "Invalid userApp class %s.", userAppappRunnerCfg.getClassgetAppRunnerClass().getName())); 
  } 
} 


Implementation and Test Plan

...

  1. ApplicationBase/StreamApplication/TaskApplication interfaces and user code examples for high- and low-level APIs. Interface classes of StreamApplication and TaskApplication don’t have default implementation. The main effort is to write user code examples implementing those interfaces. We need to port all existing high-level user code examples in samza-test module and also add low-level user code examples. 

  2. Implementation of runtime public API classes: AppSpecDescriptorImpl/StreamAppSpecDescriptorImplTaskAppSpecDescriptorImplApplicationRuntimes/ ApplicationRuntimeApplicationRunners. Those classes are implemented by Samza framework and directly used by users. Hence, it needs both implementation and user code examples. 

  3. Internal implementation of ApplicationRunners: implementation of ApplicationRunners need to be refactored to support running different ApplicationSpecApplicationDescriptor, based on whether the ApplicationSpecthe ApplicationDescriptor is StreamApplicationSpecStreamAppDescriptor or TaskApplicationSpecTaskAppDescriptor. All ApplicationRunner classes need to be refactored to support TaskApplicationSpecTaskAppDescriptor. 

  4. Implementation of local application runners need to support invocation of ProcessorLifecycleListener API methods before and after start/stop the StreamProcessor(s) 

    1. This requires a refactoring of LocalContainerRunner to  properly launch StreamProcessor instead of directly running SamzaContainer 

...

  1. Changes in all ApplicationRunners need to be included in unit tests. Adding tests for TaskApplicationSpecTaskAppDescriptor as well. 

  2. Applications written in high-level API need to be included in AbstractIntegrationTestHarness for testing. 

  3. Applications written in low-level API also need to be included in AbstractIntegrationTestHarness for testing. 

  4. Applications using different runners via config change also need to be tested. 

...

  1. The StreamApplication.init() is replaced by StreamApplication.describe(). 

  2. StreamApplicationSpecStreamAppDescriptor class replaces StreamGraph to describe the high-level API application 

  3. Use ApplicationRuntimes/ApplicationRuntimeApplicationRunners public classes to replace the usage of ApplicationRunner classes user instantiation of a specific implementation of ApplicationRunner

  4. Changed the mandatory parameter to construct an ApplicationRunner from Config to AppDescriptorImpl

Addition-only changes: 

  1. Added TaskApplication interface 

  2. Added TaskApplicationSpec interface 

  3. Added ProcessorLifecycleListener interface 

...

The main in-compatible change is with high-level API applications: 

Image RemovedImage Added

Rejected Alternatives

...