...
- Define a unified API ApplicationBase as a single entry point for users to implement all user-customized logic, for both high-level API and low-level API
User implements a single describe() method to implement all user processing logic before creating the runtime application instance
Sub-classes StreamApplication and TaskApplication provide specific describe() methods for high-level API and low-level API, respectively
Define a unified API class ApplicationDescriptor to contain
High- and low-level processing logic defined via ApplicationBase.describe(). Sub-class StreamAppDescriptor and TaskAppDescriptor are used for high- and low-level APIs respectively.
User implemented ProcessorLifecycleListener class that ProcessorLifecycleListenerFactory interface that creates a ProcessorLifecycleListener which includes customized logic to be invoked before and after starting/stopping the StreamProcessor(s) in the user application
Methods are beforeStart/afterStart/beforeStop/afterStop
Other used-defined objects in an application (e.g. configuration and context)
Expand ApplicationRunner with a mandatory constructor with ApplicationDescriptor object as parameter
An ApplicationRunner is now constructed with an ApplicationDescriptor as the parameter
ApplicationDescriptor contains all user customized logic.
ApplicationRunner deploys and runs the user code. This would be instantiated from the configuration, not exposed to user at all.
A high-level overview of the proposed changes is illustrated below:
Figure-1: high-level user programming model
Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication in LocalApplicationRunner as an example).
...
Code Block |
---|
public interface TaskApplication extends ApplicationBase<TaskAppDescriptor> { } |
ProcessorLifecycleListenerProcessorLifecycleListenerFactory: defines the unified processor lifecycle aware methods to allow users to inject customized logic before/after start/stop the StreamProcessor(s) factory interface to create ProcessorLifecycleListener in an application
Code Block |
---|
public interface ProcessorLifecycleListenerProcessorLifecycleListenerFactory extends Serializable { /** * UserCreate an definedinstance initializationof before{@link aProcessorLifecycleListener} StreamProcessorfor isthe startedStreamProcessor * * @param pcpContext the {@linkcontext ProcessorContext}of inthe which this callback method is invoked.corresponding StreamProcessor */ @param defaultconfig voidthe beforeStart(ProcessorContext pc) {} /**configuration of the corresponding StreamProcessor * User defined@return the {@link ProcessorLifecycleListener} callback object afterfor athe StreamProcessor is started */ ProcessorLifecycleListener * @param pc the {@link ProcessorContext} in which this callback method is invoked. */ default void afterStart(ProcessorContext pc) {} /** * User defined callback before a StreamProcessor is stopped createInstance(ProcessorContext pContext, Config config); } /** * The context for a StreamProcessor. This is a stub class, just include the method to identify the current StreamProcessor. * */ public interface ProcessorContext extends Serializable { String getProcessorId(); } |
ProcessorLifecycleListener: defines the unified processor lifecycle aware methods to allow users to inject customized logic before/after start/stop the StreamProcessor(s) in an application
Code Block |
---|
public interface ProcessorLifecycleListener { /** * @paramUser pcdefined theinitialization {@linkbefore ProcessorContext} in which this callback methoda StreamProcessor is invoked.started */ default void beforeStopbeforeStart(ProcessorContext pc) {} /** * User defined callback after a StreamProcessor is stoppedstarted * */ @paramdefault pcvoid theafterStart() {@link ProcessorContext} in which this callback method is invoked. * @param t the error causing the stop of the StreamProcessor. null value of this parameter indicates a successful completion.} /** * User defined callback before a StreamProcessor is stopped * */ default void afterStop(ProcessorContext pc, Throwable tbeforeStop() {} } /** /** * User defined callback after a StreamProcessor is stopped * * The@param contextt forthe aerror StreamProcessor.causing Thisthe isstop aof stubthe class,StreamProcessor. justnull includevalue theof methodthis toparameter identifyindicates thea currentsuccessful StreamProcessorcompletion. * */ public interfacedefault ProcessorContextvoid extends Serializable { String getProcessorId();afterStop(Throwable t) {} } |
B) Samza framework implemented runtime objects
...
Code Block |
---|
public interface ApplicationDescriptor<T extends ApplicationBase> { /** * Get the global unique application ID in the runtime process * @return globally unique application ID */ String getGlobalAppId(); /** * Get the user defined {@link Config} * @return config object */ Config getConfig(); /** * TODO: this needs to be replaced with proper SharedContextFactory when SAMZA-1714 is completed. * we have to keep it here to enable the current samza-sql implementation. * * Sets the {@link ContextManager} for this application. * <p> * The provided {@link ContextManager} can be used to setup shared context between the operator functions * within a task instance * * @param contextManager the {@link ContextManager} to use for the {@link StreamApplicationSpec} * @return the {@link StreamApplicationSpec} with {@code contextManager} set as its {@link ContextManager} */ ApplicationDescriptor<T> withContextManager(ContextManager contextManager); /** * Sets the lifecycle listener factory for user customized logic before and after starting/stopping * StreamProcessors in the application */ ApplicationDescriptor<T> withProcessorLifecycleListenerwithProcessorLifecycleListenerFactory(ProcessorLifecycleListenerProcessorLifecycleListenerFactory listener); } |
StreamAppDescriptor: this extends ApplicationDescriptor for a high-level application, including all methods to describe a high-level application in StreamGraph.
...
Added TaskApplication interface
Added TaskApplicationSpec interface
Added ProcessorLifecycleListener interfaceFactory interface
There is no configuration change that is backward incompatible w/ current API.
...