Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSIONIMPLEMENTED

Discussion thread:http://mail-archives.apache.org/mod_mbox/samza-dev/201807.mbox/%3CCAFvExu3_nmaSQTy=5SypzwmqGA7S9+Txa=QkyERQ+hT3JZ29ig@mail.gmail.com%3E

...

  1. Instantiation of specific implementation of ApplicationRunner is exposed to the user, which requires user to choose a specific implementation of ApplicationRunner in source code, depending on the deployment environment (I.e. YARN vs standalone). 
  2. ApplicationRunner only supports high-level API and does not fully support low-level API: 

    1. In standalone environment, user's program written in StreamTask/AsyncStreamTask classes is only supported in LocalApplicationRunner w/ a run() method 

    2. In YARN, RemoteApplicationRunner only support high-level API applications and falls back to JobRunner for low-level API applications. 

  3. There is no unified API to allow user to specify initialization logic in high-level API and low-level API in initialization either. 

  4. There is no defined standard lifecycle of a user application process in both YARN and standalone deployment. Hence, no consistent pattern to insert user code into the application’s full lifecycle 

    1. There is no standard method to insert user-defined application initialization sequence 

      1. In YARN, all application processeare initialized (i.e. configure re-writer, stream processor initialization, etc.) by the build-in main functions in Samza framework (i.e. ApplicationRunnerMain on launch host and LocalContainerRunner on NodeManagers). 

      2. In standalone, user can put arbitrary code in user main function to initialize the application process. 

    2. There is no defined method to allow user to inject customized logic before start/stop the user defined processors (I.e. StreamProcessors defined by user applicationeither, in addition to initialization 

...

  1. Hide the choice of specific implementation of ApplicationRunner via configuration, not in source code. 
  2. Define a unified API to allow user to describe the processing logic in high- and low-level API in all environment (I.e. all ApplicationRunners) 

  3. Expand the ApplicationRunner to run both low- and high-level APIs application in YARN and standalone environments. 

  4. Define a standard processor life-cycle aware API to allow user’s users to inject customized logic to be injected before and after start/stop the processors in both YARN and standalone environments. 

...

The proposed changes are the followings: 

  1. Define a unified API ApplicationBaseAPI SamzaApplication as a single entry point for users to implement all user-customized logic, for both high-level API and low-level API 
    1. User implements a single describe() method to implement all user processing logic before creating the runtime application instance 

      1. Sub-classes StreamApplication and TaskApplication provide specific describe(methods for high-level API and low-level API, respectively 

  2. Define a unified API class ApplicationDescriptor to contain 

    1. High- and low-level processing logic defined via ApplicationBaseSamzaApplication.describe(). Sub-class StreamAppDescriptor and TaskAppDescriptor are used for high- and low-level APIs respectively.

    2. User implemented ProcessorLifecycleListenerFactory interface that creates a ProcessorLifecycleListener which includes customized logic before and after starting/stopping the StreamProcessor(s) in the user application 

      1. Methods are beforeStart/afterStart/beforeStop/afterStop

    3. Other used-defined objects in an application (e.g. configuration and context) (NOTE: this would be updated by SEP-15)

  3. Expand ApplicationRunner with a mandatory constructor with ApplicationDescriptor SamzaApplication and Configure object as parameter

    1. An ApplicationRunner is now constructed with an ApplicationDescriptor a user implemented SamzaApplication and Configure as the parameterparameters

      1. SamzaApplication and Configure defines ApplicationDescriptor contains all user customized logic .and configuration for an application 

      2. ApplicationRunner deploys and runs the user code. This would be instantiated from the configuration, not exposed to user at all. 

A high-level overview of the proposed changes is illustrated below: 
Image Removed

Image Added

Figure-1: high-level user programming model 

...

Image Added

Figure-2: Interaction and lifecycle of runtime API objects (using StreamApplication in LocalApplicationRunner as an example). 

...

The above design achieves the following goals: 

  1. Defined a unified ApplicationBaseunified SamzaApplication interface for both high- and low-level APIs in different deployment environments. All user code is now implemented in one of the sub-classes (I.e. StreamApplicaiton or TaskApplication).  
    1. All processing logic is implemented in the standard describe() method in either StreamApplication or TaskApplication. 

    2. All user customized logic to start/stop contextual objects in their application process are in standard lifecycle listener methods defined in ProcessorLifecycleListener. 

  2. Construction of ApplicationRunner object is implemented by Samza framework code, which hides: 

    1. Choice of a specific ApplicationRunner for different environment via configuration 

    2. Association of a user application to the specific instance of ApplicationRunner as the parameter to the constructor

    3. Initialization of user processing logic before ApplicationRunner executes the application when constructing the ApplicationDescriptor object

    4. Invoking user-defined lifecycle listener methods when run/kill the application via ApplicationRunner in local process (I.e. LocalApplicationRunner*) 

      1. Note that RemoteApplicationRunner only submit the application w/o launching the StreamProcessors. Hence, lifecycle listener methods are not invoked in RemoteApplicationRunner. 

      2. Note this is also pending on one refactor item that we need to refactor LocalContainerRunner s.t. 

        1. It implements run/kill/status w/ proper async implementation 

        2. It launches StreamProcessor instead of directly running SamzaContainer 

...

For Samza system build-in main method (as in ApplicationRunnerMain#main()), we require the user application class to have a default constructor w/o any parameters: 

Code Block
Class<ApplicationBase>Class<SamzaApplication> appClass = (Class<ApplicationBase>Class<SamzaApplication>) Class.forName(appConfig.getAppClass());
if (StreamApplication.class.isAssignableFrom(appClass) || TaskApplication.class.isAssignableFrom(appClass)) {
  return appClass.newInstance();
}

...

Code Block
public class PageViewCounterExample implements StreamApplication {

  public static void main(String[] args) {
    CommandLine cmdLine = new CommandLine();
    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(ApplicationClassUtils.fromConfig(config), config);
    runner.run();
    runner.waitForFinish();
  }

  @Override
  public void describe(StreamAppDescriptorStreamApplicationDescriptor appDesc) {
      MessageStream<PageViewEvent>// pageViewEventsDetailed =examples null;
on how to create InputDescriptor and //OutputDescriptor TODO: replace "pageViewEventStream" with pveStreamDescriptor whenare in SEP-14 is
 implemented
     InputDescriptor pageViewEventspveStreamDescriptor = appDesc.getInputStreamgetInputDescriptor("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
      OutputStream<KV<String,OutputDescriptor PageViewCount>>outputDescriptor pageViewEventPerMemberStream =
          appDesc.getOutputStream getOutputDescriptor("pageViewEventPerMemberStream",
              KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));

      SupplierFunction<Integer>MessageStream<PageViewEvent> initialValuepageViewEvents = () -> 0null;
      FoldLeftFunction<PageViewEventpageViewEvents = appDesc.getInputStream(pveStreamDescriptor);
      OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
          appDesc.getOutputStream(outputDescriptor);

      SupplierFunction<Integer> initialValue = () -> 0;
      FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
      pageViewEvents
          .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn,
              null, null)
              .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
              .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
          .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
          .sendTo(pageViewEventPerMemberStream);
  }
}

...

Code Block
public class TaskApplicationExample implements TaskApplication {

  public static void main(String[] args) {
    CommandLine cmdLine = new CommandLine();
    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
    ApplicationRunner runner = ApplicationRuntimes.getApplicationRunner(new TaskApplicationExample(), config);
    runner.run();
    runner.waitForFinish();
  }

  @Override
  public void describe(TaskAppDescriptor appDesc) {
    // addDetailed inputexamples andon outputhow streams
to create   // TODO: replace "myinput" with inputStreamDescriptor and "myoutput" with outputStreamDescriptor whenInputDescriptor and OutputDescriptor are in SEP-14
 is  implemented
 InputDescriptor pveStreamDescriptor  appDesc.addInputStreams(Collections.singletonList= getInputDescriptor("myinput"));
    appDesc.addOutputStreams(Collections.singletonList("myoutput"pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
    TableDescriptorOutputDescriptor tdoutputDescriptor = new RocksDbTableDescriptorgetOutputDescriptor("mytablepageViewEventPerMemberStream");,
    appDesc.addTables(Collections.singletonList(td)    KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class));
    appDesc.addInputStream(pveStreamDescriptor);
    appDesc.addOutputStream(outputDescriptor);
    TableDescriptor td = new RocksDbTableDescriptor("mytable");
    appDesc.addTable(td);
    // create the task factory based on configuration
    appDesc.setTaskFactory(TaskFactoryUtil.createTaskFactory(appBuilderappDesc.getConfig()));
  }

}


Public Interfaces

...

A) user-implemented interface classes include the followings: 

ApplicationBaseSamzaApplication: defines the basic life-cycle aware methods to allow users to inject customized logic before and after the lifecycle methods of an application 

Code Block
public interface ApplicationBase<TSamzaApplication<S extends ApplicationDescriptor> { 
  void describe(TS appDesc); 
} 

StreamApplicationextends ApplicationBaseSamzaApplication with a typed describe() method for high-level user application 

Code Block
public interface StreamApplication extends ApplicationBase<StreamAppDescriptor>SamzaApplication<StreamApplicationDescriptor> { 
}

TaskApplicationextends ApplicationBasextends SamzaApplication with a typed describe() method to initialize the low-level user application 

Code Block
public interface TaskApplication extends ApplicationBase<TaskAppDescriptor>SamzaApplication<TaskApplicationDescriptor> { 
}

ProcessorLifecycleListenerFactorydefines the factory interface to create ProcessorLifecycleListener in an application

...

Code Block
public interface ProcessorLifecycleListener {
  /**
   * User defined initialization before a StreamProcessor is started
   */
  default void beforeStart() {}

  /**
   * User defined callback after a StreamProcessor is started
   *
   */
  default void afterStart() {}

  /**
   * User defined callback beforeafter a StreamProcessor is stopped
   *successfully
   */
  default void beforeStopafterStop() {}

  /**
   * User defined callback after a StreamProcessor is stopped with failure
   *
   * @param t the error causing the stop of the StreamProcessor. null value of this parameter indicates a successful completion.
   */
  default void afterStopafterFailure(Throwable t) {}
}


B
Samza framework implemented runtime objects 

Samza framework generates two sets of runtime classes that are directly exposed to the user. One set of classes are ApplicationDescriptorApplicationDescriptorImpl that includes all user-defined logic and configuration for an application; the other set of classes are ApplicationRunner class. 

ApplicationDescriptor

...

 and ApplicationDescriptorImpl classes  

ApplicationDescriptorthis is a base interface for both high- and low-level applications.  The corresponding implementation class is ApplicationDescriptorImpl.


Code Block
public interface ApplicationDescriptor<TApplicationDescriptor<S extends ApplicationBase>ApplicationDescriptor> { 
  /** 
   * Get the global{@link unique application ID inConfig} of the runtime process application
   * @return globallyconfig uniqueof the application ID 
   */ 
  StringConfig getGlobalAppIdgetConfig(); 
 
  /** 
   * GetSets the user defined {@link ConfigContextManager} for this application.
   * @return config object <p>
   */ 
Setting the Config getConfig(); 
 
  /** 
   * TODO: this needs to be replaced with proper SharedContextFactory when SAMZA-1714 is completed.{@link ContextManager} is optional. The provided {@link ContextManager} can be used to build the shared
   * context between the operator functions within a task instance
   *
 we  have* toTODO: keepthis itshould herebe toreplaced enableby the current samza-sql implementation.
 shared context factory when SAMZA-1714 is fixed.

   *
 @param contextManager * Sets the {@link ContextManager} to use for thisthe application. 
   * <p>@return 
type {@code S} * The provided {of {@link ContextManagerApplicationDescriptor} canwith be{@code usedcontextManager} toset setupas sharedits context between the operator functions {@link ContextManager}
   */
 within aS task instance withContextManager(ContextManager contextManager);

   /** 
   * @param contextManagerSets the {@link ContextManagerProcessorLifecycleListenerFactory} tofor usethis forapplication.
 the {@link StreamApplicationSpec} *
   * @return<p>Setting thea {@link StreamApplicationSpecProcessorLifecycleListenerFactory} with {@code contextManager} set as its {@link ContextManager} is optional to a user application. It allows users to
   */ 
plug in ApplicationDescriptor<T>optional withContextManager(ContextManager contextManager); 


  /** 
   * Sets the lifecycle listener factory for user customized logic before and after starting/stopping code to be invoked in different stages before/after the main processing logic is started/stopped in
   * StreamProcessors in the application.
   *
   */ @param listenerFactory the 
user implemented ApplicationDescriptor<T>{@link withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory} listener); 
} 

StreamAppDescriptorthis extends ApplicationDescriptor for a high-level application, including all methods to describe a high-level application in StreamGraph. 

Code Block
public interface StreamAppDescriptor extends ApplicationDescriptor<StreamApplication>, StreamGraph { 
} 

...

that creates lifecycle listener
   *                        with callback methods before and after the start/stop of each StreamProcessor in the application
   * @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
   */
  S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory);

  /**
   * Sets a set of customized {@link MetricsReporterFactory}s in the application
   *
   * @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used
   * @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories}
   */
  S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories);
} 

StreamApplicationDescriptorthis extends ApplicationDescriptor for a high-level application, including all methods to describe a high-level application in a graph. The corresponding implementation is StreamApplicationDescriptorImpl.

Code Block
public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> { 
  /**
   * Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting
   * {@code job.default.system} and its properties in configuration.
   * <p>
   * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
   * <p>
   * If an input/output stream is created with a stream-level Serde, they will be used, else the serde specified
   * for the {@code job.default.system} in configuration will be used.
   * <p>
   * Providing an incompatible message type for the intermediate streams that use the default serde will result in
   * {@link ClassCastException}s at runtime.
   *
   * @param defaultSystemDescriptor the default system descriptor to use
   * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
   */
  StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);

  /**
   * Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}.
   * <p>
   * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>},
   * can receive messages of type {@code KV<K, V>}. An input {@code MessageStream<M>}, obtained using a descriptor with
   * any other {@code Serde<M>}, can receive messages of type M - the key in the incoming message is ignored.
   * <p>
   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the
   * {@code SystemConsumer} deserializes the incoming messages itself, and no further deserialization is required from  
   * the framework.
   * <p>
   * Multiple invocations of this method with the same {@code inputDescriptor} will throw an
   * {@link IllegalStateException}.
   *
   * @param inputDescriptor the descriptor for the stream
   * @param <M> the type of messages in the input {@link MessageStream}
   * @return the input {@link MessageStream}
   * @throws IllegalStateException when invoked multiple times with the same {@code inputDescriptor}
   */
  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);

  /**
   * Gets the {@link OutputStream} corresponding to the {@code outputDescriptor}.
   * <p>
   * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>},
   * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, obtained using a descriptor with any
   * other {@code Serde<M>}, can send messages of type M without a key.
   * <p>
   * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the
   * {@code SystemProducer} serializes the outgoing messages itself, and no prior serialization is required from
   * the framework.
   * <p>
   * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are partitioned using their serialized key.
   * When sending messages to any other {@code OutputStream<M>}, messages are partitioned using a null partition key.
   * <p>
   * Multiple invocations of this method with the same {@code outputDescriptor} will throw an
   * {@link IllegalStateException}.
   *
   * @param outputDescriptor the descriptor for the stream
   * @param <M> the type of messages in the {@link OutputStream}
   * @return the {@link OutputStream}
   * @throws IllegalStateException when invoked multiple times with the same {@code outputDescriptor}
   */
  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);

  /**
   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
   * <p>
   * Multiple invocations of this method with the same {@link TableDescriptor} will throw an
   * {@link IllegalStateException}.
   *
   * @param tableDescriptor the {@link TableDescriptor}
   * @param <K> the type of the key
   * @param <V> the type of the value
   * @return the {@link Table} corresponding to the {@code tableDescriptor}
   * @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor}
   */
  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
} 


Task
ApplicationDescriptorthis extends ApplicationDescriptor for a low-level application, including the user-defined TaskFactory and the corresponding list of input and output streams and tables.  The corresponding implementation is TaskApplicationDescriptorImpl.

Code Block
public interface TaskApplicationDescriptor extends ApplicationDescriptor<TaskApplicationDescriptor> { 
  /**
   * Sets the {@link TaskFactory} for the user application. The {@link TaskFactory#createInstance()} creates task instance
   * that implements the main processing logic of the user application.
   *
   * @param factory the {@link TaskFactory} including the low-level task processing logic. The only allowed task factory
   *                classes are {@link org.apache.samza.task.StreamTaskFactory} and {@link org.apache.samza.task.AsyncStreamTaskFactory}.
Code Block
public interface TaskAppDescriptor extends ApplicationSpec<TaskApplication> { 
 
  void setTaskFactory(TaskFactory factory); 
 
  // TODO: the following two interface methods depend on SEP-14
  void addInputStreams(List<InputStreamDescriptor> inputStreams);  
  void addOutputStreams(List<OutputStreamDescriptor> outputStreams); 
 
  void addTables(List<TableDescriptor> tables); 
 
} 

...

ApplicationRunner 

This is an interface class that defines the standard execution methods to deploy an application. It is used by users and not intend to be implemented by users. 

Code Block
public interface ApplicationRunner {
  /**
   * Start a runtime instance of the application
   */
  void run();

  /**
   * Stop a runtime instance of the application
   */
  void killsetTaskFactory(TaskFactory factory);

  /**
   * GetAdds the {@linkinput ApplicationStatus}stream of a runtime instance of to the application.
   *
 @return the runtime* status@param ofisd the application{@link InputDescriptor}
   */
  ApplicationStatusvoid statusaddInputStream(InputDescriptor isd);

  /**
   * WaitAdds the runtimeoutput instancestream ofto the application.
 to  complete.*
   * @param Thisosd methodthe will{@link blockOutputDescriptor} untilof the applicationoutput completes.stream
   */
  void waitForFinishaddOutputStream(OutputDescriptor osd);

  /**
   * WaitAdds the runtime instance of{@link TableDescriptor} used in the application to complete with a {@code timeout
   *
   * @param table {@link TableDescriptor}
   */
  void *addTable(TableDescriptor table);
} 


ApplicationRunner
 classes 

ApplicationRunner 

This is an interface class that defines the standard execution methods to deploy an application. It is used by users and not intend to be implemented by users. 

Code Block
public interface ApplicationRunner {
  /** @param timeout the time to block to wait for the application to complete
   * Start a @returnruntime trueinstance ifof the application completes within timeout; false otherwise
   */
  booleanvoid waitForFinishrun(Duration timeout);

  /**
   * MethodStop toa addruntime a setinstance of customizedthe {@link MetricsReporter}s in the application runtime instance
   application
   */
  void kill();

  /**
   * @paramGet metricsReportersthe the{@link mapApplicationStatus} of customizeda {@link MetricsReporter}s objects to be usedruntime instance of the application
   */
 @return void addMetricsReporters(Map<String, MetricsReporter> metricsReporters);

}

ApplicationRunners 

Samza framework provided factory class to allow instantiation of ApplicationRunner for user applications. 

the runtime status of the application
   */
  ApplicationStatus status();

  /**
   * Wait the runtime instance of the application to complete.
   * This method will block until the application completes.
   */
  void waitForFinish();

  /**
   * Wait the runtime instance of the application to complete with a {@code timeout}
   *
   * @param timeout the time to block to wait for the application to complete
   * @return true if the application completes within timeout; false otherwise
   */
  boolean waitForFinish(Duration timeout);
}


ApplicationRunners 

Samza framework provided factory class to allow instantiation of ApplicationRunner for user applications. 

Code Block
public class ApplicationRunners {

  private ApplicationRunners() {

  }

Code Block
public class ApplicationRunners {

  private ApplicationRunners() {

  }

  public static final ApplicationRunner getApplicationRunner(ApplicationBase userApp, Config config) {
    if (userApp instanceof StreamApplication) {
      return getRunner(new StreamAppDescriptorImpl((StreamApplication) userApp, config));
    }
    if (userApp instanceof TaskApplication) {
      return getRunner(new TaskAppDescriptorImpl((TaskApplication) userApp, config));
    }
    throw new IllegalArgumentException(String.format("User application instance has to be either StreamApplicationFactory or TaskApplicationFactory. "
        + "Invalid userApp class %s.", userApp.getClass().getName()));
  }

...
  /**
   * Static method to get Get the {@link ApplicationRunner} that runs the {@link@code ApplicationRunneruserApp}
   *
   * @param appDescuserApp the {@linkuser AppDescriptorImpl}application object
 that  * contains@param allconfig user-customizedthe applicationconfiguration logicfor andthis configurationapplication
   * @return  the configure-driven {@link ApplicationRunner} to ApplicationRunner} object that will run the user-defined stream applications{@code userApp}
   */
  public static final ApplicationRunner getRunner(AppDescriptorImpl appDescgetApplicationRunner(SamzaApplication userApp, Config config) {
    AppRunnerConfigString appRunnerCfgappRunnerClassName = new AppRunnerConfig(appDesc.getConfig()getAppRunnerClass(config);
    try {
      Class<?> runnerClass = Class.forName(appRunnerCfg.getAppRunnerClass(appRunnerClassName));
      if (!ApplicationRunner.class.isAssignableFrom(runnerClass)) {
        throw new ConfigException(
            String.format("Class %s does not extend ApplicationRunner // mandate AppDescritorImpl as the parameter to constructorproperly", appRunnerClassName));
      }
        Constructor<?> constructor = runnerClass.getConstructor(AppDescriptorImplSamzaApplication.class, Config.class); // *sigh*
        return (ApplicationRunner) constructor.newInstance(appDesc);
userApp, config);
    } catch (ConfigException ce) {
      // this is thrown due to invalid app.runner.class configuration
      throw }ce;
    } catch (Exception e) {
      throw// new ConfigException(String.format("Problem in loading ApplicationRunner class %s",
          appRunnerCfg.getAppRunnerClass()), e);other types of exception during class loading and construction of new instance
    }
    throw new ConfigException(String.format(
    "Could not load ApplicationRunner class %s", appRunnerClassName), e);
    "Class}
 %s does}

 not extendprivate ApplicationRunner properly",
   static String getAppRunnerClass(Config config) {
    return appRunnerCfgconfig.getAppRunnerClass()))getOrDefault(APP_RUNNER_CFG, DEFAULT_APP_RUNNER);
  }
}


Implementation and Test Plan

The implementation of the above API changes involves the following sections: 

  1. ApplicationBaseSamzaApplication/StreamApplication/TaskApplication interfaces and user code examples for high- and low-level APIs. Interface classes of StreamApplication and TaskApplication don’t have default implementation. The main effort is to write user code examples implementing those interfaces. We need to port all existing high-level user code examples in samza-test module and also add low-level user code examples. 

  2. Implementation of runtime public API classes: AppApplicationDescriptorImpl/StreamAppStreamApplicationDescriptorImplTaskAppTaskApplicationDescriptorImplApplicationRunners. Those classes are implemented by Samza framework and directly used by users. Hence, it needs both implementation and user code examples. 

  3. Internal implementation of ApplicationRunners: implementation of ApplicationRunners need to be refactored to support running different ApplicationDescriptor, based on whether the ApplicationDescriptor is StreamAppDescriptor or TaskAppDescriptorboth StreamApplicationDescriptor and TaskApplicationDescriptor. All ApplicationRunner classes need to be refactored to support TaskAppDescriptorTaskApplicationDescriptor. 

  4. Implementation of local application runners need to support creation of ProcessorLifecycleListener instance and invocation of ProcessorLifecycleListener API methods before and after start/stop the StreamProcessor(s) 

    1. This requires a refactoring of LocalContainerRunner to  properly launch StreamProcessor instead of directly running SamzaContainer 

...

  1. The StreamApplication.init() is replaced by StreamApplication.describe(). 

  2. StreamAppDescriptorStreamApplicationDescriptor class replaces StreamGraph to describe the high-level API application 

  3. Use ApplicationRunners public classes to replace the user instantiation of a specific implementation of ApplicationRunner

  4. Changed the mandatory parameter to construct an ApplicationRunner from Config to AppDescriptorImpl(SamzaApplication, Config)

Addition-only changes: 

  1. Added TaskApplication interface 

  2. Added TaskApplicationSpecTaskApplicationDescriptor interface 

  3. Added ProcessorLifecycleListenerFactory interface 

...

The main in-compatible change is with high-level API applications: Image Removed

Image Added

Rejected Alternatives

The rejected alternatives is to always run user’s main() function for high- and low-level APIs in YARN and standalone. The reasons to reject this option are the following: 

...