Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

These configs are for localizing the framework resources in a YARN environment. If using a different execution environment, then it will be necessary to specify localization configs specific to that environment for the framework API and framework infrastructure resources. Other environments may have a different way for specifying the resource locations.

Config keyDescription
yarn.resources.__samzaFrameworkApi.pathPath to the Samza framework API resource
yarn.resources.__samzaFrameworkApi.*Any other YARN resource configurations for the Samza framework API resource
yarn.resources.__samzaFrameworkInfrastructure.pathPath to the Samza framework infrastructure resource
yarn.resources.__samzaFrameworkInfrastructure.*Any other YARN resource configurations for the Samza framework infrastructure resource

Existing JAR management

Currently, Samza infrastructure code and dependencies are included in the tarball with the Samza application. This means that conflicting dependencies between the application and Samza are resolved at build time before the tarball is created, which can cause a certain version of a dependency to be excluded. All JARs in the tarball are installed into a single directory for classpath generation and execution.

...

Generating the Samza API whitelist

In order to load the Samza API classes from the API classloader, we need to tell cytodynamics what those classes are. We can do this by providing a whitelist of packages/classes when building the cytodynamics classloader. All public interfaces/classes inside of samza-api should be considered an API class. One way to generate this whitelist is to use a Gradle task to find all the classes from samza-api and put that list in a file. Then, that file can be read by Samza when constructing the cytodynamics classloader. The Gradle task should also include classes from samza-kv.

...

Handling SamzaApplication.describe

The infrastructure classloader will include the concrete descriptors, and we will build an additional application classloader which can delegate to the infrastructure classloader when running describe.Since the application code is calling the descriptors directly, then the application classloader SamzaApplication.describe method needs to be able to delegate to the infrastructure classloader. However, we do not want to delegate for every class. We only want to delegate framework for certain concrete descriptor components (e.g. system descriptors, serdes). We don't want to delegate for application dependencies or classes which are only implemented by the application.

Flow for loading a class from the additional application classloader for SamzaApplication.describe:

  1. If a class is a Samza API class, then load it from the API classloader.
  2. If the class is on the infrastructure classpath and it is in the infrastructure whitelist (e.g. descriptor), load it from the infrastructure classloader.
  3. If the class is on the application classpath, load it from the application classloader.
  4. ClassNotFoundException

A consequence of this structure is that there are "multiple" application classloaders on the job coordinator: one in this describe flow and the one described above at "Application" classloader. Therefore, any classes loaded by one of the application classloaders cannot be used by the classes of the other application classloader. An example of when this could happen is in the low-level API. The application's TaskFactory implementation will be loaded by the application classloader described above, but the Kafka events deserialized into Avro objects will be loaded by the other application classloader. Even though the Avro objects are the same class (even associated with the same binary), the TaskFactory implementation won't be able to use the Avro objects since a different classloader instance was used. We can solve this by serializing the components specified through the descriptor and deserializing those components using the classloader that is used for the rest of the AM. This is consistent with the strategy to be able to serialize the whole job description. The interfaces have already been marked as Serializable.

Pros

  • API classloader stays simpler
  • Allows application to delegate to infrastructure for describe, and infrastructure to delegate to application for processing

Cons

  • Additional classloader component adds complexity
    • Includes having multiple classloaders associated with the same application classpath
  • Need to serialize and deserialize components of the application description
    • Currently, some application descriptions are not actually Serializable (e.g. application context factory for both SQL and Beam)

Classloader wiring

By using the special classloader to instantiate the "main" class, any dependencies will then be loaded using that classloader. Then Java will automatically propagate the special classloader through the rest of Samza. We can modify the "main" method to use reflection to load the "main" class and then trigger the actual Samza startup.

Code Block
public static void main(String[] args) {
  ClassLoader isolatingClassLoader = buildIsolatingClassLoader();
  Class<?> isolatedClass = Class.forName(MainClass.class.getName(), true, isolatingClassLoader);
  isolatedClass.getDeclaredMethod("doMain").invoke(null);
}

Pros

  • Cytodynamics provides an explicit and granular way to specify if a class should be from the parent classpath (i.e. API)
  • Classloader propagation allows the correct external dependencies to be used, even if infrastructure and the application use different versions of the same dependency
  • Do not need to modify existing Samza API classes
  • Do not need to explicitly wire classloader through Samza

Cons

  • Need to ensure proper specification of Samza API classes
    • Are there any classes that are not owned by Samza but are used as part of the Samza API? (e.g. java.lang)
  • Need to generate separate classpaths for each classloader
  • Multiple classloaders is not obvious, so certain assumptions are invalid (e.g. static variables are not shared across classloaders)
  • Extra dependency for Samza
    • Seems like a very lightweight dependency though

Making the necessary JARs available for running the job coordinator

Packaging the job coordinator JARs

The API and infrastructure classloaders each need a package of JARs which is isolated from the application. Those packages need to be built separately from an application. They need to include the core Samza components (e.g. samza-api, samza-core), and they can contain any pluggable components used across many applications (e.g. samza-kafka). The directory structure of the API and infrastructure packages should be the same as the structure for the application (e.g. scripts in the "bin" directory, libraries in the "lib" directory).

The packaging is left to the group of Samza jobs that are using the same set of job coordinator JARs, as different components may be included by different jobs. There are multiple tools that exist for building the packages (e.g. Gradle, Maven).

An example of packaging will be included in the samza-hello-samza project.

Dependencies

API classloader dependencies

  • (required) samza:samza-api
  • (required) samza:samza-kv: includes KeyValueStorageEngine, which is a base class for StorageEngine
  • (optional; if using samza-log4j2 as infrastructure) log4j2 API/core

Infrastructure classloader dependencies

  • (required) samza:samza-core: job coordinator code, default groupers
  • (required) samza:samza-shell (launch scripts)
  • (optional; if using samza-log4j2 as infrastructure) samza:samza-log4j2
  • (optional; if using samza-kafka as infrastructure) samza:samza-kafka: Kafka checkpoint mananger implementation
  • (optional; if using samza-kv-rocksdb as infrastructure) samza:samza-kv-rocksdb: RocksDB storage engine
  • (optional; if using samza-yarn as infrastructure) samza:samza-yarn: YARN resource manager factory
  • Other Samza modules or custom modules can be included in here if they want to be considered as infrastructure.

Localizing the job coordinator JARs

When making a request to YARN, clients are allowed to pass a map of resources to localize on the container. Currently, the "yarn.package.path" config is used to localize the application package, and this includes the Samza infrastructure code. Applications will need to add other resources using "yarn.resources.*.path" configs.

...

table functions). The framework descriptor components will be added as part of the framework API whitelist which will be checked when loading classes in the application classloader, so that the application classloader will delegate to the framework API classloader for framework descriptors. The descriptors are used to generate configs through the descriptor API classes, so concrete framework descriptors and custom descriptors will both work.

Table functions get serialized into configs by the table descriptors that they are contained in. They only need to be deserialized for processing logic, so the job coordinator does not need to deserialize them. On the processing containers, they can get deserialized using the framework infrastructure classloader, so that they can access application classes (e.g. schemas) if necessary. The infrastructure classloader will not delegate to the API classloader for the concrete descriptors.

Flow for loading a class from the application classloader:

  1. If a class is a framework API class, load it from the framework API classloader.
  2. If a class is a framework descriptor class, load it from the framework API classloader.
  3. Load the class from the application classpath.

Classloader wiring

By using the special classloader to instantiate the "main" class, any dependencies will then be loaded using that classloader. Then Java will automatically propagate the special classloader through the rest of Samza. We can modify the "main" method to use reflection to load the "main" class and then trigger the actual Samza startup.

Code Block
public static void main(String[] args) {
  ClassLoader isolatingClassLoader = buildIsolatingClassLoader();
  Class<?> isolatedClass = Class.forName(MainClass.class.getName(), true, isolatingClassLoader);
  isolatedClass.getDeclaredMethod("doMain").invoke(null);
}

Pros

  • Cytodynamics provides an explicit and granular way to specify if a class should be from the parent classpath (i.e. API)
  • Classloader propagation allows the correct external dependencies to be used, even if infrastructure and the application use different versions of the same dependency
  • Do not need to modify existing Samza API classes
  • Do not need to explicitly wire classloader through Samza

Cons

  • Need to ensure proper specification of Samza API classes
    • Are there any classes that are not owned by Samza but are used as part of the Samza API? (e.g. java.lang)
  • Need to generate separate classpaths for each classloader
  • Multiple classloaders is not obvious, so certain assumptions are invalid (e.g. static variables are not shared across classloaders)
  • Extra dependency for Samza
    • Seems like a very lightweight dependency though

Making the necessary JARs available for running the job coordinator

Packaging the job coordinator JARs

The API and infrastructure classloaders each need a package of JARs which is isolated from the application. Those packages need to be built separately from an application. They need to include the core Samza components (e.g. samza-api, samza-core), and they can contain any pluggable components used across many applications (e.g. samza-kafka). The directory structure of the API and infrastructure packages should be the same as the structure for the application (e.g. scripts in the "bin" directory, libraries in the "lib" directory).

The packaging is left to the group of Samza jobs that are using the same set of job coordinator JARs, as different components may be included by different jobs. There are multiple tools that exist for building the packages (e.g. Gradle, Maven).

An example of packaging will be included in the samza-hello-samza project.

Dependencies

API classloader dependencies

  • (required) samza:samza-api
  • (required) samza:samza-kv: includes KeyValueStorageEngine, which is a base class for StorageEngine
  • (optional; if using samza-log4j2 as infrastructure) log4j2 API/core

Infrastructure classloader dependencies

  • (required) samza:samza-core: job coordinator code, default groupers
  • (required) samza:samza-shell (launch scripts)
  • (optional; if using samza-log4j2 as infrastructure) samza:samza-log4j2
  • (optional; if using samza-kafka as infrastructure) samza:samza-kafka: Kafka checkpoint mananger implementation
  • (optional; if using samza-kv-rocksdb as infrastructure) samza:samza-kv-rocksdb: RocksDB storage engine
  • (optional; if using samza-yarn as infrastructure) samza:samza-yarn: YARN resource manager factory
  • Other Samza modules or custom modules can be included in here if they want to be considered as infrastructure.

Localizing the job coordinator JARs

When making a request to YARN, clients are allowed to pass a map of resources to localize on the container. Currently, the "yarn.package.path" config is used to localize the application package, and this includes the Samza infrastructure code. Applications will need to add framework resources using "yarn.resources.*.path" configs.

  1. Continue to use "yarn.package.path" for the application package.
  2. Set "yarn.resources.__samzaFrameworkApi.path" to the path for the API package.
  3. Set "yarn.resources.__samzaFrameworkInfrastructure.path" to the path for the infrastructure package.

Samza will look in specific locations on the file system for the JARs for setting up the classpaths for the different classloaders. The framework API classpath will come from "${user.dir}/__samzaFrameworkApi", the framework infrastructure classpath will come from "${user.dir}/__samzaFrameworkInfrastructure", and the application classpath will come from "${user.dir}/__package". When using the above 3 configs, YARN will place the resources into the desired locations.

In non-YARN execution environments, the "yarn" localization configurations won't apply. Other environments will have their own localization flows. If those other environments are unable to localize the resources into the desired file locations, then we can add a generic way (e.g. configuration or environment variables) to specify the file locations to get the classpath resources. The file location variables would apply to any Samza job; only the environment-specific localization flows would be different

...

.

Generating classpaths for the JARs

...

The current working directory can be obtained from System.getProperty("user.dir"), and we can find the separate JAR directories from there in code. We can also generate the classpaths in code by finding all of the JAR files in a given directory.

Pros

  • Easier to localize Samza infrastructure on its own, since it is separate from applications
  • Evolves well into general split deployment, since can just localize different Samza packages to do an upgrade
  • Leverages existing flow for localizing JARs
  • Samza infrastructure can define the full runtime package of JARs (including dependencies) at build time

Cons

  • Need to ensure that framework packages has consistent versions with the version of Samza used within the application
  • Need to localize artifacts to multiple places
  • Not all jobs use all infrastructure plugins, so this would localize more JARs than necessary for each job

...

For more context about why these changes are needed, see 135861549.

Pros

  • Able to isolate log4j2 pluggable components built by Samza
  • Can override Samza infrastructure logging configuration

Cons

  • Samza ends up controlling log4j2 API version
  • No support for isolation for log4j1 pluggable components, so existing apps would need to migrate to log4j2 to get isolation

...