...
A high-level overview of the proposed changes is illustrated below:
Figure-1: high-level user programming model
Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication in LocalApplicationRunner as an example).
...
The above design achieves the following goals:
- Defined a unified
...
- ApplicationBase interface for both high- and low-level APIs in different deployment environments.
...
- All user code
...
- is now implemented in one of the sub-classes (I.e.
...
- StreamApplicaiton
...
- or TaskApplication).
All processing logic is implemented in the standard describe() method in either StreamApplication or TaskApplication.
All user customized logic
...
to start/
...
stop contextual objects
...
in their
...
application process are in standard
...
lifecycle listener methods (I.e.
...
beforeStart/afterStart/beforeStop/afterStop)
Construction of ApplicationRuntime object is implemented by Samza framework code, which hides:
Choice of a specific ApplicationRunner for different environment via configuration
Association of a user application to the specific instance of ApplicationRunner
Initialization of user processing logic before ApplicationRunner executes the application
Invoking user-defined
...
lifecycle listener methods when run/kill the application via ApplicationRunner in local process (I.e. LocalApplicationRunner*)
Note that RemoteApplicationRunner only submit the application w/o launching the StreamProcessors. Hence, lifecycle listener methods are not invoked in RemoteApplicationRunner.
Note this is also pending on one refactor item that we need to refactor LocalContainerRunner s.t.
It implements run/kill/status w/ proper async implementation
It launches StreamProcessor instead of directly running SamzaContainer
Note that the application main( that the application main() method in the cross-functional swimlane chart is marked with a different color, since there could be options for the user to use either a user-defined main() or a Samza build-in main() functions. We consider the above two options in three different runtime environments:
...
For Samza system build-in main method (as in ApplicationRunnerMain#main()), we require the user application class to have a default constructor w/o any parameters:
Code Block |
---|
Class<LifecycleAwareApplication>Class<ApplicationBase> appClass = (Class<LifecycleAwareApplication>Class<ApplicationBase>) Class.forName(appConfig.getAppClass()); if (StreamApplication.class.isAssignableFrom(appClass) || TaskApplication.class.isAssignableFrom(appClass)) { return appClass.newInstance(); } |
...
A) user-implemented interface classes include the followings:
LifecycleAwareApplicationApplicationBase: defines the basic life-cycle aware methods to allow users to inject customized logic before and after the lifecycle methods of an application
Code Block |
---|
public interface LifecycleAwareApplication<TApplicationBase<T extends ApplicationSpec> { void describe(T appSpec); default void beforeStart() {} default void afterStart() {} default void beforeStop() {} default void afterStop() {} } |
StreamApplication: extends LifecycleAwareApplication with a typed describe() method for high-level user application
Code Block |
---|
public interface StreamApplication extends LifecycleAwareApplication<StreamApplicationSpec> {
} |
TaskApplication: extends LifecycleAwareApplication with a typed describe() method to initialize the low-level user application
...
} |
StreamApplication: extends ApplicationBase with a typed describe() method for high-level user application
Code Block |
---|
public interface StreamApplication extends ApplicationBase<StreamApplicationSpec> {
} |
TaskApplication: extends ApplicationBase with a typed describe() method to initialize the low-level user application
Code Block |
---|
public interface TaskApplication extends ApplicationBase<TaskApplicationSpec> {
} |
ProcessorLifecycleListener: defines the unified processor lifecycle aware methods to allow users to inject customized logic before/after start/stop the StreamProcessor(s) in an application
Code Block |
---|
public interface ProcessorLifecycleListener { default void beforeStart() {} default void afterStart() {} default void beforeStop() {} default void afterStop() {} } |
B) Samza framework implemented runtime objects
...
ApplicationSpec: this is a base interface for both high- and low-level applications.
Code Block |
---|
public interface ApplicationSpec<T extends LifecycleAwareApplication>ApplicationBase> { /** * Get the global unique application ID in the runtime process * @return globally unique application ID */ String getGlobalAppId(); /** * Get the user defined {@link Config} * @return config object */ StringConfig getGlobalAppIdgetConfig(); /** * GetSets the user defined {@link ContextManager} for this {@link ConfigApplicationSpec}. * @return config object <p> * The provided {@link ContextManager} can be used to setup shared context between the operator functions */ within Config getConfig();a task instance /** * @param SetscontextManager the {@link ContextManager} to use for thisthe {@link StreamApplicationSpec}. * <p>@return * The provided the {@link ContextManagerStreamApplicationSpec} canwith be{@code usedcontextManager} toset setupas sharedits context{@link between the operator functions ContextManager} */ within aApplicationSpec<T> task instancewithContextManager(ContextManager contextManager); /** * @param contextManagerSets the {@linklifecycle ContextManager} to use listener for theuser {@linkcustomized StreamApplicationSpec}logic before and after *starting/stopping @return the {@link StreamApplicationSpec}* withStreamProcessors {@codein contextManager} set as its {@link ContextManager} the application */ ApplicationSpec<T> withContextManagerwithProcessorLifecycleListener(ContextManagerProcessorLifecycleListener contextManagerlistener); } |
StreamApplicationSpec: this extends ApplicationSpec for a high-level application, including all methods to describe a high-level application in StreamGraph.
Code Block |
---|
public interface StreamApplicationSpec extends ApplicationSpec<StreamApplication>, StreamGraph { } |
...
The implementation of the above API changes involves the following sections:
UserApplicationApplicationBase/StreamApplication/TaskApplication interfaces and user code examples for high- and low-level APIs. Interface classes of StreamApplication and TaskApplication don’t have default implementation. The main effort is to write user code examples implementing those interfaces. We need to port all existing high-level user code examples in samza-test module and also add low-level user code examples.
Implementation of runtime public API classes: AppSpecImpl/StreamAppSpecImpl/ TaskAppSpecImpl/ ApplicationRuntimes/ ApplicationRuntime. Those classes are implemented by Samza framework and directly used by users. Hence, it needs both implementation and user code examples.
Internal implementation of ApplicationRunners: implementation of ApplicationRunners need to be refactored to support running different ApplicationSpec, based on whether the ApplicationSpec is a StreamApplicationSpec or TaskApplicationSpec. All ApplicationRunner classes need to be refactored to support TaskApplicationSpec.
Implementation of local application runners need to support invocation of ProcessorLifecycleListener API methods before and after start/stop the StreamProcessor(s)
This requires a refactoring of LocalContainerRunner to properly launch StreamProcessor instead of directly running SamzaContainer
Test plans:
Changes in all ApplicationRunners need to be included in unit tests. Adding tests for TaskApplicationSpec as well.
Applications written in high-level API need to be included in AbstractIntegrationTestHarness for testing.
Applications written in low-level API also need to be included in AbstractIntegrationTestHarness for testing.
Applications using different runners via config change also need to be tested.
...
Added TaskApplication interface
Added TaskApplicationSpec interface
Added LifecycleAwareApplication.beforeStart/afterStart/beforeStop/afterStop interfacesAdded ProcessorLifecycleListener interface
There is no configuration change that is backward incompatible w/ current API.
...