Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Define a unified API ApplicationBase as a single entry point for users to implement all user-customized logic, for both high-level API and low-level API 
    1. User implements a single describe() method to implement all user processing logic before creating the runtime application instance 

      1. Sub-classes StreamApplication and TaskApplication provide specific describe(methods for high-level API and low-level API, respectively 

  2. Define a unified API class ApplicationDescriptor to contain 

    1. High- and low-level processing logic defined via ApplicationBase.describe(). Sub-class StreamAppDescriptor and TaskAppDescriptor are used for high- and low-level APIs respectively.

    2. User implemented ProcessorLifecycleListener class that ProcessorLifecycleListenerFactory interface that creates a ProcessorLifecycleListener which includes customized logic to be invoked before and after starting/stopping the StreamProcessor(s) in the user application 

      1. Methods are beforeStart/afterStart/beforeStop/afterStop

    3. Other used-defined objects in an application (e.g. configuration and context)

  3. Expand ApplicationRunner with a mandatory constructor with ApplicationDescriptor object as parameter

    1. An ApplicationRunner is now constructed with an ApplicationDescriptor as the parameter

      1. ApplicationDescriptor contains all user customized logic. 

      2. ApplicationRunner deploys and runs the user code. This would be instantiated from the configuration, not exposed to user at all. 

A high-level overview of the proposed changes is illustrated below: 
Image RemovedImage Added

Figure-1: high-level user programming model Image Removed

Image Added

Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication in LocalApplicationRunner as an example). 

...

Code Block
public interface TaskApplication extends ApplicationBase<TaskAppDescriptor> { 
}

ProcessorLifecycleListenerProcessorLifecycleListenerFactorydefines the unified processor lifecycle aware methods to allow users to inject customized logic before/after start/stop the StreamProcessor(s) factory interface to create ProcessorLifecycleListener in an application 


Code Block
public interface ProcessorLifecycleListenerProcessorLifecycleListenerFactory extends Serializable {
  /**
   * UserCreate an definedinstance initializationof before{@link aProcessorLifecycleListener} StreamProcessorfor isthe startedStreamProcessor
   *
   * @param pcpContext the {@linkcontext ProcessorContext}of inthe which this callback method is invoked.corresponding StreamProcessor
   */
 @param defaultconfig voidthe beforeStart(ProcessorContext pc) {}

  /**configuration of the corresponding StreamProcessor
   * User defined@return the {@link ProcessorLifecycleListener} callback object afterfor athe StreamProcessor is started
   */
  ProcessorLifecycleListener * @param pc the {@link ProcessorContext} in which this callback method is invoked.
   */
  default void afterStart(ProcessorContext pc) {}

  /**
   * User defined callback before a StreamProcessor is stopped
   createInstance(ProcessorContext pContext, Config config);
}


/**
 * The context for a StreamProcessor. This is a stub class, just include the method to identify the current StreamProcessor.
 *
 */
public interface ProcessorContext extends Serializable {
  String getProcessorId();
}


ProcessorLifecycleListenerdefines the unified processor lifecycle aware methods to allow users to inject customized logic before/after start/stop the StreamProcessor(s) in an application 


Code Block
public interface ProcessorLifecycleListener {
  /**
   * @paramUser pcdefined theinitialization {@linkbefore ProcessorContext} in which this callback methoda StreamProcessor is invoked.started
   */
  default void beforeStopbeforeStart(ProcessorContext pc) {}

  /**
   * User defined callback after a StreamProcessor is stoppedstarted
   *
   */
  @paramdefault pcvoid theafterStart() {@link ProcessorContext} in which this callback method is invoked.
   * @param t the error causing the stop of the StreamProcessor. null value of this parameter indicates a successful completion.}

  /**
   * User defined callback before a StreamProcessor is stopped
   *
   */
  default void afterStop(ProcessorContext pc, Throwable tbeforeStop() {}
}
  /**

/**
   * User defined callback after a StreamProcessor is stopped
   *
   * The@param contextt forthe aerror StreamProcessor.causing Thisthe isstop aof stubthe class,StreamProcessor. justnull includevalue theof methodthis toparameter identifyindicates thea currentsuccessful StreamProcessorcompletion.
 *
  */
public  interfacedefault ProcessorContextvoid extends Serializable {
  String getProcessorId();afterStop(Throwable t) {}
}


B
Samza framework implemented runtime objects 

...

Code Block
public interface ApplicationDescriptor<T extends ApplicationBase> { 
  /** 
   * Get the global unique application ID in the runtime process 
   * @return globally unique application ID 
   */ 
  String getGlobalAppId(); 
 
  /** 
   * Get the user defined {@link Config} 
   * @return config object 
   */ 
  Config getConfig(); 
 
  /** 
   * TODO: this needs to be replaced with proper SharedContextFactory when SAMZA-1714 is completed.
   * we have to keep it here to enable the current samza-sql implementation.
   *
   * Sets the {@link ContextManager} for this application. 
   * <p> 
   * The provided {@link ContextManager} can be used to setup shared context between the operator functions 
   * within a task instance 
   * 
   * @param contextManager the {@link ContextManager} to use for the {@link StreamApplicationSpec} 
   * @return the {@link StreamApplicationSpec} with {@code contextManager} set as its {@link ContextManager} 
   */ 
  ApplicationDescriptor<T> withContextManager(ContextManager contextManager); 


  /** 
   * Sets the lifecycle listener factory for user customized logic before and after starting/stopping 
   * StreamProcessors in the application 
   */  
  ApplicationDescriptor<T> withProcessorLifecycleListenerwithProcessorLifecycleListenerFactory(ProcessorLifecycleListenerProcessorLifecycleListenerFactory listener); 
} 

StreamAppDescriptorthis extends ApplicationDescriptor for a high-level application, including all methods to describe a high-level application in StreamGraph. 

...

  1. Added TaskApplication interface 

  2. Added TaskApplicationSpec interface 

  3. Added ProcessorLifecycleListener interfaceFactory interface 

There is no configuration change that is backward incompatible w/ current API. 

...