Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A high-level overview of the proposed changes is illustrated below: 
Image Removed Image Added

Figure-1: high-level user programming model 

...

Image Added

Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication as an example). 

 


The above design achieves the following goals: 

  1. Defined a unified LifecycleAwareApplication interface for both high- and low-level APIs in different deployment environments. All user code is now included in one of the sub-classes (I.e. StreamApplicaiton or TaskApplication).  

    1. All processing logic is implemented in the standard describe() method in either StreamApplication or TaskApplication. 

    2. All user customized logic to create/start/stop contextual objects in their application are in standard lifecycle aware methods (I.e. beforeStart/afterStart/beforeStop/afterStop) 

  2. Construction of ApplicationRuntime object is implemented by Samza framework code, which hides: 

    1. Choice of a specific ApplicationRunner for different environment via configuration 

    2. Association of a user application to the specific instance of ApplicationRunner 

    3. Initialization of user processing logic before ApplicationRunner executes the application 

    4. Invoking user-defined lifecycle aware methods when run/kill the application via ApplicationRunner 

...

LifecycleAwareApplication: defines the basic life-cycle aware methods to allow users to inject customized logic before and after the lifecycle methods of an application 




Code Block
public interface LifecycleAwareApplication<T extends ApplicationSpec> { 
  void describe(T appSpec); 
  default void beforeStart() {} 
  default void afterStart() {} 
  default void beforeStop() {} 
  default void afterStop() {} 
} 


StreamApplicationextends LifecycleAwareApplication with a typed describe() method for high-level user application 





Code Block
public interface StreamApplication extends LifecycleAwareApplication<StreamApplicationSpec> { 
}




TaskApplicationextends LifecycleAwareApplication with a typed describe() method to initialize the low-level user application 

...

StreamApplicationSpecthis extends ApplicationSpec for a high-level application, including all methods to describe a high-level application in StreamGraph. 




Code Block
public interface StreamApplicationSpec extends ApplicationSpec<StreamApplication>, StreamGraph { 
} 




TaskApplicationSpecthis extends ApplicationSpec for a low-level application, including the user-defined TaskFactory and the corresponding list of input and output streams and tables. 





Code Block
public interface TaskApplicationSpec extends ApplicationSpec<TaskApplication> { 
 
  void setTaskFactory(TaskFactory factory); 
 
  void addInputStreams(List<String> inputStreams); 
 
  void addOutputStreams(List<String> outputStreams); 
 
  void addTables(List<TableDescriptor> tables); 
 
} 


ApplicationRuntime classes 

...

This is an interface class that defines the life-cycle methods available in an runtime instance of application. It is used by users and not intend to be implemented by users. 




Code Block
public interface ApplicationRuntime { 
  /** 
   * Start an application 
   */ 
  void start(); 
 
  /** 
   * Stop an application 
   */ 
  void stop(); 
 
  /** 
   * Get the {@link ApplicationStatus} of an application 
   * @return the runtime status of the application 
   */ 
  ApplicationStatus status(); 
 
  /** 
   * Wait the application to complete. 
   * This method will block until the application completes. 
   */ 
  void waitForFinish(); 
 
  /** 
   * Wait the application to complete with a {@code timeout} 
   *  
  * @param timeout the time to block to wait for the application to complete 
   * @return true if the application completes within timeout; false otherwise 
   */ 
  boolean waitForFinish(Duration timeout); 
 
  /** 
   * Method to add a set of customized {@link MetricsReporter}s in the application 
   * 
   * @param metricsReporters the map of customized {@link MetricsReporter}s objects to be used 
   */ 
  void addMetricsReporters(Map<String, MetricsReporter> metricsReporters); 
 
} 


ApplicationRuntimes 

Samza framework provided factory class to allow instantiation of ApplicationRuntime for user applications. 




Code Block
public class ApplicationRuntimes { 
 
  private ApplicationRuntimes() { 
 
  } 
 
  public static final ApplicationRuntime getApplicationRuntime(LifecycleAwareApplication userApp, Config config) { 
    if (userApp instanceof StreamApplication) { 
      return new AppRuntimeImpl(new StreamAppSpecImpl((StreamApplication) userApp, config)); 
    } 
    if (userApp instanceof TaskApplication) { 
      return new AppRuntimeImpl(new TaskAppSpecImpl((TaskApplication) userApp, config)); 
    } 
    throw new IllegalArgumentException(String.format("User application instance has to be either StreamApplicationFactory or TaskApplicationFactory. " 
        + "Invalid userApp class %s.", userApp.getClass().getName())); 
  } 
} 




Implementation and Test Plan

...

The main in-compatible change is with high-level API applications: 

Image RemovedImage Added

Rejected Alternatives

...