...
A high-level overview of the proposed changes is illustrated below:
Figure-1: high-level user programming model
...
Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication as an example).
The above design achieves the following goals:
Defined a unified LifecycleAwareApplication interface for both high- and low-level APIs in different deployment environments. All user code is now included in one of the sub-classes (I.e. StreamApplicaiton or TaskApplication).
All processing logic is implemented in the standard describe() method in either StreamApplication or TaskApplication.
All user customized logic to create/start/stop contextual objects in their application are in standard lifecycle aware methods (I.e. beforeStart/afterStart/beforeStop/afterStop)
Construction of ApplicationRuntime object is implemented by Samza framework code, which hides:
Choice of a specific ApplicationRunner for different environment via configuration
Association of a user application to the specific instance of ApplicationRunner
Initialization of user processing logic before ApplicationRunner executes the application
Invoking user-defined lifecycle aware methods when run/kill the application via ApplicationRunner
...
LifecycleAwareApplication: defines the basic life-cycle aware methods to allow users to inject customized logic before and after the lifecycle methods of an application
Code Block |
---|
public interface LifecycleAwareApplication<T extends ApplicationSpec> {
void describe(T appSpec);
default void beforeStart() {}
default void afterStart() {}
default void beforeStop() {}
default void afterStop() {}
} |
StreamApplication: extends LifecycleAwareApplication with a typed describe() method for high-level user application
Code Block |
---|
public interface StreamApplication extends LifecycleAwareApplication<StreamApplicationSpec> {
} |
TaskApplication: extends LifecycleAwareApplication with a typed describe() method to initialize the low-level user application
...
StreamApplicationSpec: this extends ApplicationSpec for a high-level application, including all methods to describe a high-level application in StreamGraph.
Code Block |
---|
public interface StreamApplicationSpec extends ApplicationSpec<StreamApplication>, StreamGraph {
} |
TaskApplicationSpec: this extends ApplicationSpec for a low-level application, including the user-defined TaskFactory and the corresponding list of input and output streams and tables.
Code Block |
---|
public interface TaskApplicationSpec extends ApplicationSpec<TaskApplication> {
void setTaskFactory(TaskFactory factory);
void addInputStreams(List<String> inputStreams);
void addOutputStreams(List<String> outputStreams);
void addTables(List<TableDescriptor> tables);
} |
ApplicationRuntime classes
...
This is an interface class that defines the life-cycle methods available in an runtime instance of application. It is used by users and not intend to be implemented by users.
Code Block |
---|
public interface ApplicationRuntime {
/**
* Start an application
*/
void start();
/**
* Stop an application
*/
void stop();
/**
* Get the {@link ApplicationStatus} of an application
* @return the runtime status of the application
*/
ApplicationStatus status();
/**
* Wait the application to complete.
* This method will block until the application completes.
*/
void waitForFinish();
/**
* Wait the application to complete with a {@code timeout}
*
* @param timeout the time to block to wait for the application to complete
* @return true if the application completes within timeout; false otherwise
*/
boolean waitForFinish(Duration timeout);
/**
* Method to add a set of customized {@link MetricsReporter}s in the application
*
* @param metricsReporters the map of customized {@link MetricsReporter}s objects to be used
*/
void addMetricsReporters(Map<String, MetricsReporter> metricsReporters);
} |
ApplicationRuntimes:
Samza framework provided factory class to allow instantiation of ApplicationRuntime for user applications.
Code Block |
---|
public class ApplicationRuntimes {
private ApplicationRuntimes() {
}
public static final ApplicationRuntime getApplicationRuntime(LifecycleAwareApplication userApp, Config config) {
if (userApp instanceof StreamApplication) {
return new AppRuntimeImpl(new StreamAppSpecImpl((StreamApplication) userApp, config));
}
if (userApp instanceof TaskApplication) {
return new AppRuntimeImpl(new TaskAppSpecImpl((TaskApplication) userApp, config));
}
throw new IllegalArgumentException(String.format("User application instance has to be either StreamApplicationFactory or TaskApplicationFactory. "
+ "Invalid userApp class %s.", userApp.getClass().getName()));
}
} |
Implementation and Test Plan
...
The main in-compatible change is with high-level API applications:
Rejected Alternatives
...