Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A high-level overview of the proposed changes is illustrated below: 
Image Added

Figure-1: high-level user programming model 

Image Added

Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication in LocalApplicationRunner as an example). 

...

The above design achieves the following goals: 

  1. Defined a unified 

...

  1. ApplicationBase interface for both high- and low-level APIs in different deployment environments.

...

  1. All user code

...

  1. is now implemented in one of the sub-classes (I.e.

...

  1. StreamApplicaiton

...

  1. or TaskApplication).  
    1. All processing logic is implemented in the standard describe() method in either StreamApplication or TaskApplication. 

    2. All user customized logic

...

    1. to start/

...

    1. stop contextual objects

...

    1. in their

...

    1. application process are in standard

...

    1. lifecycle listener methods (I.e.

...

    1. beforeStart/afterStart/beforeStop/afterStop) 

  1. Construction of ApplicationRuntime object is implemented by Samza framework code, which hides: 

    1. Choice of a specific ApplicationRunner for different environment via configuration 

    2. Association of a user application to the specific instance of ApplicationRunner 

    3. Initialization of user processing logic before ApplicationRunner executes the application 

    4. Invoking user-defined

...

    1. lifecycle listener methods when run/kill the application via ApplicationRunner in local process (I.e. LocalApplicationRunner*) 

      1. Note that RemoteApplicationRunner only submit the application w/o launching the StreamProcessors. Hence, lifecycle listener methods are not invoked in RemoteApplicationRunner. 

      2. Note this is also pending on one refactor item that we need to refactor LocalContainerRunner s.t. 

        1. It implements run/kill/status w/ proper async implementation 

        2. It launches StreamProcessor instead of directly running SamzaContainer 

Note that the application main( that the application main() method in the cross-functional swimlane chart is marked with a different color, since there could be options for the user to use either a user-defined main() or a Samza build-in main() functions. We consider the above two options in three different runtime environments: 

...

For Samza system build-in main method (as in ApplicationRunnerMain#main()), we require the user application class to have a default constructor w/o any parameters: 

Code Block
Class<LifecycleAwareApplication>Class<ApplicationBase> appClass = (Class<LifecycleAwareApplication>Class<ApplicationBase>) Class.forName(appConfig.getAppClass());
if (StreamApplication.class.isAssignableFrom(appClass) || TaskApplication.class.isAssignableFrom(appClass)) {
  return appClass.newInstance();
}

...

A) user-implemented interface classes include the followings: 

LifecycleAwareApplicationApplicationBase: defines the basic life-cycle aware methods to allow users to inject customized logic before and after the lifecycle methods of an application 

Code Block
public interface LifecycleAwareApplication<TApplicationBase<T extends ApplicationSpec> { 
  void describe(T appSpec); 
  default void beforeStart() {} 
  default void afterStart() {} 
  default void beforeStop() {} 
  default void afterStop() {} 
} 

StreamApplicationextends LifecycleAwareApplication with a typed describe() method for high-level user application 

Code Block
public interface StreamApplication extends LifecycleAwareApplication<StreamApplicationSpec> { 
}

TaskApplicationextends LifecycleAwareApplication with a typed describe() method to initialize the low-level user application 

...

} 

StreamApplicationextends ApplicationBase with a typed describe() method for high-level user application 

Code Block
public interface StreamApplication extends ApplicationBase<StreamApplicationSpec> { 
}

TaskApplicationextends ApplicationBase with a typed describe() method to initialize the low-level user application 

Code Block
public interface TaskApplication extends ApplicationBase<TaskApplicationSpec> { 
}

ProcessorLifecycleListenerdefines the unified processor lifecycle aware methods to allow users to inject customized logic before/after start/stop the StreamProcessor(s) in an application 


Code Block
public interface ProcessorLifecycleListener { 
  default void beforeStart() {} 
  default void afterStart() {} 
  default void beforeStop() {} 
  default void afterStop() {} 
}


BSamza framework implemented runtime objects 

...

ApplicationSpecthis is a base interface for both high- and low-level applications.  


Code Block
public interface ApplicationSpec<T extends LifecycleAwareApplication>ApplicationBase> { 
  /** 
   * Get the global unique application ID in the runtime process 
   * @return globally unique application ID 
   */ 
  String getGlobalAppId(); 
 
  /** 
   * Get the user defined {@link Config} 
   * @return config object 
   */ 
  StringConfig getGlobalAppIdgetConfig(); 
 
  /** 
   * GetSets the user defined {@link ContextManager} for this {@link ConfigApplicationSpec}. 
   * @return config object <p> 
   * The provided {@link ContextManager} can be used to setup shared context between the operator functions 
   */ 
within  Config getConfig();a task instance 
 
  /** 
   * @param SetscontextManager the {@link ContextManager} to use for thisthe {@link StreamApplicationSpec}. 
   * <p>@return 
   * The provided the {@link ContextManagerStreamApplicationSpec} canwith be{@code usedcontextManager} toset setupas sharedits context{@link between the operator functions ContextManager} 
   */ within
  aApplicationSpec<T> task instancewithContextManager(ContextManager contextManager); 


   /** 
   * @param contextManagerSets the {@linklifecycle ContextManager} to use listener for theuser {@linkcustomized StreamApplicationSpec}logic 
before and after *starting/stopping @return
 the {@link StreamApplicationSpec}* withStreamProcessors {@codein contextManager} set as its {@link ContextManager} the application 
   */  
  ApplicationSpec<T> withContextManagerwithProcessorLifecycleListener(ContextManagerProcessorLifecycleListener contextManagerlistener); 
 
} 

StreamApplicationSpecthis extends ApplicationSpec for a high-level application, including all methods to describe a high-level application in StreamGraph. 

Code Block
public interface StreamApplicationSpec extends ApplicationSpec<StreamApplication>, StreamGraph { 
} 

...

The implementation of the above API changes involves the following sections: 

  1. UserApplicationApplicationBase/StreamApplication/TaskApplication interfaces and user code examples for high- and low-level APIs. Interface classes of StreamApplication and TaskApplication don’t have default implementation. The main effort is to write user code examples implementing those interfaces. We need to port all existing high-level user code examples in samza-test module and also add low-level user code examples. 

  2. Implementation of runtime public API classes: AppSpecImpl/StreamAppSpecImplTaskAppSpecImplApplicationRuntimes/ ApplicationRuntime. Those classes are implemented by Samza framework and directly used by users. Hence, it needs both implementation and user code examples. 

  3. Internal implementation of ApplicationRunners: implementation of ApplicationRunners need to be refactored to support running different ApplicationSpec, based on whether the ApplicationSpec is StreamApplicationSpec or TaskApplicationSpec. All ApplicationRunner classes need to be refactored to support TaskApplicationSpec. 

  4. Implementation of local application runners need to support invocation of ProcessorLifecycleListener API methods before and after start/stop the StreamProcessor(s) 

    1. This requires a refactoring of LocalContainerRunner to  properly launch StreamProcessor instead of directly running SamzaContainer 

Test plans: 

  1. Changes in all ApplicationRunners need to be included in unit tests. Adding tests for TaskApplicationSpec as well. 

  2. Applications written in high-level API need to be included in AbstractIntegrationTestHarness for testing. 

  3. Applications written in low-level API also need to be included in AbstractIntegrationTestHarness for testing. 

  4. Applications using different runners via config change also need to be tested. 

...

  1. Added TaskApplication interface 

  2. Added TaskApplicationSpec interface 

  3. Added LifecycleAwareApplication.beforeStart/afterStart/beforeStop/afterStop interfacesAdded ProcessorLifecycleListener interface 

There is no configuration change that is backward incompatible w/ current API. 

...