Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public class TaskApplicationExample implements TaskApplication {

  public static void main(String[] args) {
    CommandLine cmdLine = new CommandLine();
    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
    ApplicationRunner runner = ApplicationRuntimes.getApplicationRunner(new TaskApplicationExample(), config);
    runner.run();
    runner.waitForFinish();
  }

  @Override
  public void describe(TaskAppDescriptor appDesc) {
    // add input and output streams
    // TODO: replace "myinput" with inputStreamDescriptor and "myoutput" with outputStreamDescriptor when SEP-14 is implemented
    appDesc.addInputStreamsaddInputStream(Collections.singletonList("myinput"));
    appDesc.addOutputStreamsaddOutputStream(Collections.singletonList("myoutput"));
    TableDescriptor td = new RocksDbTableDescriptor("mytable");
    appDesc.addTablesaddTable(Collections.singletonList(td));
    // create the task factory based on configuration
    appDesc.setTaskFactory(TaskFactoryUtil.createTaskFactory(appBuilderappDesc.getConfig()));
  }

}


Public Interfaces

...

Code Block
public interface ApplicationDescriptor<T extends ApplicationBase> { 
  /** 
   * Get the globaluser uniquedefined application ID in the runtime process{@link Config} 
   * @return globallyconfig uniqueobject application ID 
   */ 
  StringConfig getGlobalAppIdgetConfig(); 
 
  /** 
   * GetTODO: thethis userneeds definedto {@linkbe Config}replaced 
with proper SharedContextFactory *when @returnSAMZA-1714 configis object completed.
   */ 
we have Config getConfig(); 
 
  /** 
   * TODO: this needs to be replaced with proper SharedContextFactory when SAMZA-1714 is completed.
   * we have to to keep it here to enable the current samza-sql implementation.
   *
   * Sets the {@link ContextManager} for this application. 
   * <p> 
   * The provided {@link ContextManager} can be used to setup shared context between the operator functions 
   * within a task instance 
   * 
   * @param contextManager the {@link ContextManager} to use for the {@link StreamApplicationSpec} 
   * @return the {@link StreamApplicationSpec} with {@code contextManager} set as its {@link ContextManager} 
   */ 
  ApplicationDescriptor<T> withContextManager(ContextManager contextManager); 


  /** 
   * Sets the lifecycle listener factory for user customized logic before and after starting/stopping 
   * StreamProcessors in the application 
   */  
  ApplicationDescriptor<T> withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listener); 
} 

...

Code Block
public interface TaskAppDescriptor extends ApplicationSpec<TaskApplication> { 
 
  void setTaskFactory(TaskFactory factory); 
 
  // TODO: the following two interface methods depend on SEP-14
  void addInputStreams(List<InputStreamDescriptor>InputStreamDescriptor inputStreamsinputStream);  
  void addOutputStreamsaddBroadcastStreams(List<OutputStreamDescriptor>InputStreamDescriptor outputStreamsbroadcastStream); 
 
  void addOutputStreams(OutputStreamDescriptor outputStream); 
 
  void addTables(List<TableDescriptor>TableDescriptor tablestable); 
 
} 


ApplicationRunner
 classes 

...

Code Block
public class ApplicationRunners {

  private ApplicationRunners() {

  }

  /**
  public static final ApplicationRunner getApplicationRunner(ApplicationBase userApp, Config config) { * Get the {@link ApplicationRunner} that runs the {@code userApp}
   *
   * if@param (userApp instanceof StreamApplication) {the user application object
   * @param config return getRunner(new StreamAppDescriptorImpl((StreamApplication) userApp, config));
    }
    if (userApp instanceof TaskApplication) {
      return getRunner(new TaskAppDescriptorImpl((TaskApplication) userApp, config));
    }
    throw new IllegalArgumentException(String.format("User application instance has to be either StreamApplicationFactory or TaskApplicationFactory. "
        + "Invalid userApp class %s.", userApp.getClass().getName()));
  }

...
  /**
   * Static method to get the {@link ApplicationRunner}
   *the configuration for this application
   * @param@return appDescthe  {@link AppDescriptorImplApplicationRunner} object that containswill all user-customized application logic and configuration
   * @return  the configure-driven {@link ApplicationRunner} to run the user-defined stream applicationsrun the {@code userApp}
   */
  public static final ApplicationRunner getRunner(AppDescriptorImpl appDescgetApplicationRunner(ApplicationBase userApp, Config config) {
    AppRunnerConfig appRunnerCfgrunnerConfig = new AppRunnerConfig(appDesc.getConfig(config));
    try {
      Class<?> runnerClass = Class.forName(appRunnerCfgrunnerConfig.getAppRunnerClass());
      if (ApplicationRunner.class.isAssignableFrom(runnerClass)) {
        // mandate AppDescritorImpl as the parameter to constructor
        Constructor<?> constructor = runnerClass.getConstructor(AppDescriptorImplApplicationBase.class, Config.class); // *sigh*
        return (ApplicationRunner) constructor.newInstance(appDescuserApp, config);
      }
    } catch (Exception e) {
      throw new ConfigException(String.format("Problem in loading ApplicationRunner class %s",
          appRunnerCfgrunnerConfig.getAppRunnerClass()), e);
    }
    throw new ConfigException(String.format(
        "Class %s does not extend ApplicationRunner properly",
        appRunnerCfgrunnerConfig.getAppRunnerClass()));
  }

}


Implementation and Test Plan

...