You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: [ UNDER DISCUSSION | ACCEPTED | REJECTED ]

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-TBD

Released: 

Problem

In the current implementation of ApplicationRunner, there are a few issues: 

  1. Instantiation of specific implementation of ApplicationRunner is exposed to the user, which requires user to choose a specific implementation of ApplicationRunner in source code, depending on the deployment environment (I.e. YARN vs standalone). 
  2. ApplicationRunner only supports high-level API and does not fully support low-level API: 

    1. In standalone environment, user's program written in StreamTask/AsyncStreamTask classes is only supported in LocalApplicationRunner w/ a run() method 

    2. In YARN, RemoteApplicationRunner only support high-level API applications and falls back to JobRunner for low-level API applications. 

  3. There is no unified API to allow user to specify high-level API and low-level API in initialization either. 

  4. There is no defined standard lifecycle of a user application process in both YARN and standalone deployment. Hence, no consistent pattern to insert user code into the application’s full lifecycle 

    1. There is no standard method to insert user-defined application initialization sequence 

      1. In YARN, all application processeare initialized (i.e. configure re-writer, stream processor initialization, etc.) by the build-in main functions in Samza framework (i.e. ApplicationRunnerMain on launch host and LocalContainerRunner on NodeManagers). 

      2. In standalone, user can put arbitrary code in user main function to initialize the application process. 

    2. There is no defined method to allow user to inject customized logic before start/stop the user defined processors (I.e. StreamProcessors defined by user applicationeither, in addition to initialization 

Motivation

Our goal is to allow users to write high-level or low-level API applications once and deploy in both YARN and standalone environments without code change. The following requirements are necessary to achieve our goal:

  1. Hide the choice of specific implementation of ApplicationRunner via configuration, not in source code. 
  2. Define a unified API to allow user to specify processing logic in high- and low-level API in all environment (I.e. all ApplicationRunners) w/ all lifecycle methods 

  3. Define a set of standard application execution methods to deploy both low- and high-level APIs application in YARN and standalone environments. 

  4. Define a standard processor life-cycle aware API to allow user’s customized logic to be injected before and after start/stop the processors in both YARN and standalone environments. 

Note that we need to define the following concepts clearly: 

  1. Application’s execution methods in runtime are the functions to change the deployment status of an application runtime instance (I.e. run/status/kill/waitForFinish) 
    1. ApplicationRunner’s execution methods are the deployment environment specific implementation to the above API methods 
  2. Application’s processor lifecycle aware methods are the user-defined functions to inject customized logic to be executed before or after we start or stop the stream processing logic defined by the user application (I.e. beforeStart/afterStart/beforeStop/afterStop are called when we start/stop the StreamProcessors in local host) 

Proposed Changes

The proposed changes are the followings: 

  1. Define a unified API ApplicationBase as a single entry point for users to implement all user-customized logic, for both high-level API and low-level API 
    1. User implements a single describe() method to implement all user processing logic before creating the runtime application instance 

      1. Sub-classes StreamApplication and TaskApplication provide specific describe(methods for high-level API and low-level API, respectively 

  2. Define a unified API class ApplicationSpec to contain 

    1. High- and low-level processing logic defined via ApplicationBase.describe() 

    2. User implemented ProcessorLifecycleListener class that includes customized logic to be invoked before and after starting/stopping the StreamProcessor(s) in the user application 

      1. Methods are beforeStart/afterStart/beforeStop/afterStop 

  3. Define the class ApplicationRuntime to represent a runtime instance of a user application, with a set of standard execution methods to change the deployment status of the runtime application instance in different environments (I.e. YARN and standalone)

    1. A runtime instance of user application is defined as a bundle of both 

      1. ApplicationSpec that contains all user customized logic. 

      2. ApplicationRunner that deploys and runs the user code. This would be instantiated from the configuration, not exposed to user at all. 

    2. The set of standard execution methods for an ApplicationRuntime object includes run/kill/status/waitForFinish 

A high-level overview of the proposed changes is illustrated below: 

Figure-1: high-level user programming model 


Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication in LocalApplicationRunner as an example). 


The above design achieves the following goals: 

  1. Defined a unified LifecycleAwareApplication interface for both high- and low-level APIs in different deployment environments. All user code is now included in one of the sub-classes (I.e. StreamApplicaiton or TaskApplication).  

    1. All processing logic is implemented in the standard describe() method in either StreamApplication or TaskApplication. 

    2. All user customized logic to create/start/stop contextual objects in their application are in standard lifecycle aware methods (I.e. beforeStart/afterStart/beforeStop/afterStop) 

  2. Construction of ApplicationRuntime object is implemented by Samza framework code, which hides: 

    1. Choice of a specific ApplicationRunner for different environment via configuration 

    2. Association of a user application to the specific instance of ApplicationRunner 

    3. Initialization of user processing logic before ApplicationRunner executes the application 

    4. Invoking user-defined lifecycle aware methods when run/kill the application via ApplicationRunner 

Note that the application main() method in the cross-functional swimlane chart is marked with a different color, since there could be options for the user to use either a user-defined main() or a Samza build-in main() functions. We consider the above two options in three different runtime environments: 

  1. Standalone environment: user will use LocalApplicationRunner to launch the application in the same JVM process 

  2. YARN application launch host: user will use RemoteApplicationRunner to submit the application to a remote cluster 

  3. YARN NodeManagerSamza will run a build-in runner to launch the container in the same JVM process 

For Samza system build-in main method (as in ApplicationRunnerMain#main()), we require the user application class to have a default constructor w/o any parameters: 

Class<LifecycleAwareApplication> appClass = (Class<LifecycleAwareApplication>) Class.forName(appConfig.getAppClass());
if (StreamApplication.class.isAssignableFrom(appClass) || TaskApplication.class.isAssignableFrom(appClass)) {
  return appClass.newInstance();
}

The reason is: when deploying via RemoteApplicationRunner, in the NodeManager, we will run the managed main() method implemented by Samza, which don’t have the ability to invoke customized constructor for user application. Hence, we expect a default constructor implemented by any user application. This is the same behavior as we expected today from any user implementing a high- or low-level application.  

For user-defined main() applications, we can run it in both standalone and YARN, as long as: 

  1. The user application class implements a default constructor w/o any parameters 

  2. Creation of ApplicationRuntime in main is using Samza provided methods (I.e. ApplicationRuntimes.getApplicationRuntime()) 

Simple code examples of high- and low-level API applications: 

public class PageViewCounterExample implements StreamApplication {

  // local execution mode
  public static void main(String[] args) {
    CommandLine cmdLine = new CommandLine();
    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
    ApplicationRuntime appRuntime = ApplicationRuntimes.getApplicationRuntime(ApplicationClassUtils.fromConfig(config), config);
    appRuntime.start();
    appRuntime.waitForFinish();
  }

  @Override
  public void describe(StreamApplicationSpec graph) {
      MessageStream<PageViewEvent> pageViewEvents = null;
      pageViewEvents = graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
      OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
          graph.getOutputStream("pageViewEventPerMemberStream",
              KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));

      SupplierFunction<Integer> initialValue = () -> 0;
      FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
      pageViewEvents
          .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn,
              null, null)
              .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
              .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
          .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
          .sendTo(pageViewEventPerMemberStream);
  }
}
public class TaskApplicationExample implements TaskApplication {

  public static void main(String[] args) {
    CommandLine cmdLine = new CommandLine();
    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
    ApplicationRuntime appRuntime = ApplicationRuntimes.getApplicationRuntime(new TaskApplicationExample(), config);
    appRuntime.start();
    appRuntime.waitForFinish();
  }

  @Override
  public void describe(TaskApplicationSpec appBuilder) {
    // add input and output streams
    appBuilder.addInputStreams(Collections.singletonList("myinput"));
    appBuilder.addOutputStreams(Collections.singletonList("myoutput"));
    TableDescriptor td = new RocksDbTableDescriptor("mytable");
    appBuilder.addTables(Collections.singletonList(td));
    // create the task factory based on configuration
    appBuilder.setTaskFactory(TaskFactoryUtil.createTaskFactory(appBuilder.getConfig()));
  }

}


Public Interfaces

There are two types of public API classes that are exposed to the user: a) user-implemented interface classes that allows users to inject customized code; b) Samza framework implemented runtime objects that allows users to start/stop a runtime application. 

A) user-implemented interface classes include the followings: 

LifecycleAwareApplication: defines the basic life-cycle aware methods to allow users to inject customized logic before and after the lifecycle methods of an application 



public interface LifecycleAwareApplication<T extends ApplicationSpec> { 
  void describe(T appSpec); 
  default void beforeStart() {} 
  default void afterStart() {} 
  default void beforeStop() {} 
  default void afterStop() {} 
} 


StreamApplicationextends LifecycleAwareApplication with a typed describe() method for high-level user application 




public interface StreamApplication extends LifecycleAwareApplication<StreamApplicationSpec> { 
}



TaskApplicationextends LifecycleAwareApplication with a typed describe() method to initialize the low-level user application 

public interface TaskApplication extends LifecycleAwareApplication<TaskApplicationSpec> { 
}

BSamza framework implemented runtime objects 

Samza framework generates two sets of runtime classes that are directly exposed to the user. One set of classes are ApplicationSpec that includes all user-defined logic and configuration for an application; the other set of classes are runnable application instances that are a bundle of the ApplicationSpec and the corresponding ApplicationRunner. 

ApplicationSpec classes  

ApplicationSpecthis is a base interface for both high- and low-level applications. 

 

public interface ApplicationSpec<T extends LifecycleAwareApplication> { 
  /** 
   * Get the global unique application ID in the runtime process 
   * @return globally unique application ID 
   */ 
  String getGlobalAppId(); 
 
  /** 
   * Get the user defined {@link Config} 
   * @return config object 
   */ 
  Config getConfig(); 
 
  /** 
   * Sets the {@link ContextManager} for this {@link StreamApplicationSpec}. 
   * <p> 
   * The provided {@link ContextManager} can be used to setup shared context between the operator functions 
   * within a task instance 
   * 
   * @param contextManager the {@link ContextManager} to use for the {@link StreamApplicationSpec} 
   * @return the {@link StreamApplicationSpec} with {@code contextManager} set as its {@link ContextManager} 
   */ 
  ApplicationSpec<T> withContextManager(ContextManager contextManager); 
 
} 

StreamApplicationSpecthis extends ApplicationSpec for a high-level application, including all methods to describe a high-level application in StreamGraph. 



public interface StreamApplicationSpec extends ApplicationSpec<StreamApplication>, StreamGraph { 
} 




TaskApplicationSpecthis extends ApplicationSpec for a low-level application, including the user-defined TaskFactory and the corresponding list of input and output streams and tables. 




public interface TaskApplicationSpec extends ApplicationSpec<TaskApplication> { 
 
  void setTaskFactory(TaskFactory factory); 
 
  void addInputStreams(List<String> inputStreams); 
 
  void addOutputStreams(List<String> outputStreams); 
 
  void addTables(List<TableDescriptor> tables); 
 
} 


ApplicationRuntime classes 


ApplicationRuntime 

This is an interface class that defines the life-cycle methods available in an runtime instance of application. It is used by users and not intend to be implemented by users. 



public interface ApplicationRuntime { 
  /** 
   * Start an application 
   */ 
  void start(); 
 
  /** 
   * Stop an application 
   */ 
  void stop(); 
 
  /** 
   * Get the {@link ApplicationStatus} of an application 
   * @return the runtime status of the application 
   */ 
  ApplicationStatus status(); 
 
  /** 
   * Wait the application to complete. 
   * This method will block until the application completes. 
   */ 
  void waitForFinish(); 
 
  /** 
   * Wait the application to complete with a {@code timeout} 
   *  
  * @param timeout the time to block to wait for the application to complete 
   * @return true if the application completes within timeout; false otherwise 
   */ 
  boolean waitForFinish(Duration timeout); 
 
  /** 
   * Method to add a set of customized {@link MetricsReporter}s in the application 
   * 
   * @param metricsReporters the map of customized {@link MetricsReporter}s objects to be used 
   */ 
  void addMetricsReporters(Map<String, MetricsReporter> metricsReporters); 
 
} 


ApplicationRuntimes 

Samza framework provided factory class to allow instantiation of ApplicationRuntime for user applications. 



public class ApplicationRuntimes { 
 
  private ApplicationRuntimes() { 
 
  } 
 
  public static final ApplicationRuntime getApplicationRuntime(LifecycleAwareApplication userApp, Config config) { 
    if (userApp instanceof StreamApplication) { 
      return new AppRuntimeImpl(new StreamAppSpecImpl((StreamApplication) userApp, config)); 
    } 
    if (userApp instanceof TaskApplication) { 
      return new AppRuntimeImpl(new TaskAppSpecImpl((TaskApplication) userApp, config)); 
    } 
    throw new IllegalArgumentException(String.format("User application instance has to be either StreamApplicationFactory or TaskApplicationFactory. " 
        + "Invalid userApp class %s.", userApp.getClass().getName())); 
  } 
} 



Implementation and Test Plan

The implementation of the above API changes involves the following sections: 

  1. UserApplication/StreamApplication/TaskApplication interfaces and user code examples for high- and low-level APIs. Interface classes of StreamApplication and TaskApplication don’t have default implementation. The main effort is to write user code examples implementing those interfaces. We need to port all existing high-level user code examples in samza-test module and also add low-level user code examples. 

  2. Implementation of runtime public API classes: AppSpecImpl/StreamAppSpecImplTaskAppSpecImplApplicationRuntimes/ ApplicationRuntime. Those classes are implemented by Samza framework and directly used by users. Hence, it needs both implementation and user code examples. 

  3. Internal implementation of ApplicationRunners: implementation of ApplicationRunners need to be refactored to support running different ApplicationSpec, based on whether the ApplicationSpec is StreamApplicationSpec or TaskApplicationSpec. All ApplicationRunner classes need to be refactored to support TaskApplicationSpec. 

Test plans: 

  1. Changes in all ApplicationRunners need to be included in unit tests. Adding tests for TaskApplicationSpec as well. 

  2. Applications written in high-level API need to be included in AbstractIntegrationTestHarness for testing. 

  3. Applications written in low-level API also need to be included in AbstractIntegrationTestHarness for testing. 

  4. Applications using different runners via config change also need to be tested. 

Compatibility, Deprecation, and Migration Plan

The proposed changes the existing API classes: 

Incompatible changes: 

  1. The StreamApplication.init() is replaced by StreamApplication.describe(). 

  2. StreamApplicationSpec class replaces StreamGraph to describe the high-level API application 

  3. Use ApplicationRuntimes/ApplicationRuntime public classes to replace the usage of ApplicationRunner classes 

Addition-only changes: 

  1. Added TaskApplication interface 

  2. Added TaskApplicationSpec interface 

  3. Added LifecycleAwareApplication.beforeStart/afterStart/beforeStop/afterStop interfaces 

There is no configuration change that is backward incompatible w/ current API. 

The main in-compatible change is with high-level API applications: 

Rejected Alternatives

The rejected alternatives is to always run user’s main() function for high- and low-level APIs in YARN and standalone. The reasons to reject this option are the following: 

  1. In legacy low-level APIs, user doesn’t have main() function implemented. 

  2. In applications launched via standard lifecycle management framework like Spring, users don’t write main() function either. 

  3. In YARN environment, we want to manage the main() function to be launched in the NodeManager (to avoid launching arbitrary user code in NodeManager). 

  


 

  • No labels