THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
public class TaskApplicationExample implements TaskApplication { public static void main(String[] args) { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); ApplicationRunner runner = ApplicationRuntimes.getApplicationRunner(new TaskApplicationExample(), config); runner.run(); runner.waitForFinish(); } @Override public void describe(TaskAppDescriptor appDesc) { // add input and output streams // TODO: replace "myinput" with inputStreamDescriptor and "myoutput" with outputStreamDescriptor when SEP-14 is implemented appDesc.addInputStreamsaddInputStream(Collections.singletonList("myinput")); appDesc.addOutputStreamsaddOutputStream(Collections.singletonList("myoutput")); TableDescriptor td = new RocksDbTableDescriptor("mytable"); appDesc.addTablesaddTable(Collections.singletonList(td)); // create the task factory based on configuration appDesc.setTaskFactory(TaskFactoryUtil.createTaskFactory(appBuilderappDesc.getConfig())); } } |
Public Interfaces
...
Code Block |
---|
public interface ApplicationDescriptor<T extends ApplicationBase> { /** * Get the globaluser uniquedefined application ID in the runtime process{@link Config} * @return globallyconfig uniqueobject application ID */ StringConfig getGlobalAppIdgetConfig(); /** * GetTODO: thethis userneeds definedto {@linkbe Config}replaced with proper SharedContextFactory *when @returnSAMZA-1714 configis object completed. */ we have Config getConfig(); /** * TODO: this needs to be replaced with proper SharedContextFactory when SAMZA-1714 is completed. * we have to to keep it here to enable the current samza-sql implementation. * * Sets the {@link ContextManager} for this application. * <p> * The provided {@link ContextManager} can be used to setup shared context between the operator functions * within a task instance * * @param contextManager the {@link ContextManager} to use for the {@link StreamApplicationSpec} * @return the {@link StreamApplicationSpec} with {@code contextManager} set as its {@link ContextManager} */ ApplicationDescriptor<T> withContextManager(ContextManager contextManager); /** * Sets the lifecycle listener factory for user customized logic before and after starting/stopping * StreamProcessors in the application */ ApplicationDescriptor<T> withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listener); } |
...
Code Block |
---|
public interface TaskAppDescriptor extends ApplicationSpec<TaskApplication> { void setTaskFactory(TaskFactory factory); // TODO: the following two interface methods depend on SEP-14 void addInputStreams(List<InputStreamDescriptor>InputStreamDescriptor inputStreamsinputStream); void addOutputStreamsaddBroadcastStreams(List<OutputStreamDescriptor>InputStreamDescriptor outputStreamsbroadcastStream); void addOutputStreams(OutputStreamDescriptor outputStream); void addTables(List<TableDescriptor>TableDescriptor tablestable); } |
ApplicationRunner classes
...
Code Block |
---|
public class ApplicationRunners { private ApplicationRunners() { } /** public static final ApplicationRunner getApplicationRunner(ApplicationBase userApp, Config config) { * Get the {@link ApplicationRunner} that runs the {@code userApp} * * if@param (userApp instanceof StreamApplication) {the user application object * @param config return getRunner(new StreamAppDescriptorImpl((StreamApplication) userApp, config)); } if (userApp instanceof TaskApplication) { return getRunner(new TaskAppDescriptorImpl((TaskApplication) userApp, config)); } throw new IllegalArgumentException(String.format("User application instance has to be either StreamApplicationFactory or TaskApplicationFactory. " + "Invalid userApp class %s.", userApp.getClass().getName())); } ... /** * Static method to get the {@link ApplicationRunner} *the configuration for this application * @param@return appDescthe {@link AppDescriptorImplApplicationRunner} object that containswill all user-customized application logic and configuration * @return the configure-driven {@link ApplicationRunner} to run the user-defined stream applicationsrun the {@code userApp} */ public static final ApplicationRunner getRunner(AppDescriptorImpl appDescgetApplicationRunner(ApplicationBase userApp, Config config) { AppRunnerConfig appRunnerCfgrunnerConfig = new AppRunnerConfig(appDesc.getConfig(config)); try { Class<?> runnerClass = Class.forName(appRunnerCfgrunnerConfig.getAppRunnerClass()); if (ApplicationRunner.class.isAssignableFrom(runnerClass)) { // mandate AppDescritorImpl as the parameter to constructor Constructor<?> constructor = runnerClass.getConstructor(AppDescriptorImplApplicationBase.class, Config.class); // *sigh* return (ApplicationRunner) constructor.newInstance(appDescuserApp, config); } } catch (Exception e) { throw new ConfigException(String.format("Problem in loading ApplicationRunner class %s", appRunnerCfgrunnerConfig.getAppRunnerClass()), e); } throw new ConfigException(String.format( "Class %s does not extend ApplicationRunner properly", appRunnerCfgrunnerConfig.getAppRunnerClass())); } } |
Implementation and Test Plan
...