Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Samza Yarn follows a multi-stage deployment model, where Job Runner, which runs on the submission host, reads configuration, performs planning and persist persists config in the coordinator stream before submitting the job to Yarn cluster. In Yarn, Application Master (AM) reads config from coordinator stream before spinning up containers to execute. Split of responsibility between job runner and AM is operationally confusing, and makes debugging the pipeline difficult with multiple points of failure. In addition, since planning invokes user code, it usually requires isolation on the runner from security perspective to guard the framework from malicious user code, or a malicious user can gain access to other user jobs running on the same runner. 

...

We will simplify job runner to only submit the Samza job to Yarn with provided submission related configs, without invoking other complex logic such as config retrieval, planning, DAG generation, coordinator stream persisting etc. 

...

  • configs directly related to job submission, such as yarn.package.path, job.name etc.
  • configs needed by the config loader on AM to fetch config fromjob config, such as path to the property file in the tarball, all of such configs will have a job.config.loader.properties prefix.
  • configs that users would like to override.

In addition, these configs can only be supplied by --config, job runner will not read configs from local file anymore. We will use the term "submission config" to refer to configs provided using --config during submission.

This is in favor because we don't need to maintain the old launch workflow and we can eliminate the need to read configs multiple times. This is also consistent with other stream processing frameworks, such as Flink, Spark and Dataflow.

In order to simplify job runner, we need to move the config retrieval logic from runner to AM, which is a prerequisite of planning, DAG generation. We will achieve this by providing a new config loader interface, which will be used by AM to fetch config directly. AM will invoke config loader to fetch config, perform planning, generate DAG and persist the final config back to coordinator stream for backward compatibility. This suggests that AM may need extra CPU and/or memory compared to existing flow. 

We will force users to update how they start their Samza jobs. In case of problems, users should be able to roll back to Samza 1.3, see Compatibility, Deprecation, and Migration Plan for more details.

Public Interfaces

The following job config submission config will be introduced to configure loader class on AM to fetch config, which points to a ConfigLoader class:

...

Interface which AM relies on to read configuration from. It takes in a properties map, which defines the variables it needed in needs in order to get the proper config.

...

JobConfig

We will add one new configs in config in JobConfig as well as a config prefix that wraps the properties needed for the loader to load config:

...

Default implementation of ConfigLoader, which reads "path" from the input propertiessubmission config, which leads to a property file.

...

RemoteApplicationRunner#run will simplify submit the job to Yarn given the submission configscCompatibility, Deprecation, and Migration Planonfigs.

Code Block
@Override
public void run(ExternalContext externalContext) {
  JobRunner runner = new JobRunner(config);
  runner.getJobFactory().getJob(config).submit();
}

...

YarnJob#buildEnvironment will wrap the provided start up config submission config as env variable to pass to Yarn.


Code Block
privateCompatibility, Deprecation, and Migration Planprivate[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig,
    jobConfig: JobConfig): Map[String, String] = {
    val envMapBuilder = Map.newBuilder[String, String]

    envMapBuilder += ShellCommandConfig.ENV_CONFIG ->
      Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config))

...