Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Overview

A deployable Samza application currently consists of JARs for Samza infrastructure code (and dependent JARs) and JARs for application-specific code (and dependent JARs). The full deployable package is determined at build time. When deploying an application, the built package of JARs is placed on the necessary node(s), which includes the job coordinator and the processing containers. This build-time packaging has benefits, as it simplifies the deployment responsibilities of Samza infrastructure – the package built by the application has everything needed to run a Samza application. Application owners (who may not be the same as the owners of the Samza infrastructure) choose the version of Samza to use and do the packaging.

...

Config keyDescription
samza.cluster.based.job.coordinator.dependency.isolation.enabledSet to "true" to enable cluster-based job coordinator dependency isolation

YARN-specific

These configs are for localizing the framework resources in a YARN environment. If using a different execution environment, then it will be necessary to specify localization configs specific to that environment for the framework API and framework infrastructure resources. Other environments may have a different way for specifying the resource locations.

Config keyDescription
yarn.resources.__samzaFrameworkApi.pathPath to the Samza framework API resource
yarn.resources.__samzaFrameworkApi.*Any other YARN resource configurations for the Samza framework API resource
yarn.resources.__samzaFrameworkInfrastructure.pathPath to the Samza framework infrastructure resource
yarn.resources.__samzaFrameworkInfrastructure.*Any other YARN resource configurations for the Samza framework infrastructure resource

Existing JAR management

Currently, Samza infrastructure code and dependencies are included in the tarball with the Samza application. This means that conflicting dependencies between the application and Samza are resolved at build time before the tarball is created, which can cause a certain version of a dependency to be excluded. All JARs in the tarball are installed into a single directory for classpath generation and execution.

...

Generating the Samza API whitelist

In order to load the Samza API classes from the API classloader, we need to tell cytodynamics what those classes are. We can do this by providing a whitelist of packages/classes when building the cytodynamics classloader. All public interfaces/classes inside of samza-api should be considered an API class. One way to generate this whitelist is to use a Gradle task to find all the classes from samza-api and put that list in a file. Then, that file can be read by Samza when constructing the cytodynamics classloader. The Gradle task should also include classes from samza-kv.

...

  • The pluggable classes implement Samza API interfaces (e.g. SystemFactory), and the classes corresponding to those interfaces need to be loaded by the API classloader. Implementations of plugin interfaces can be on both the "infrastructure" and "application" classpaths, and all components need to use interfacs loaded by the same classloader (i.e. API classloader).
  • Object deserialization (e.g. Avro) may be used within "infrastructure plugins" code, but the application must provide the classes for the concrete deserialized objects at runtime, since the application will be using those deserialized objects. For this case, the "infrastructure plugins" classloader needs will load the infrastructure plugins class, but it will need to delegate to the application classloader .
  • Samza provides SerializableSerde and JsonSerdeV2 as serdes, but the classes being used are from the application side (this is similar to Avro).

Flow for loading a class from the infrastructure classloader:

  1. If a class is a Samza API class, then load it from the API classloader.
  2. If the class is on the infrastructure classpath, load it from the infrastructure classloader.
  3. If the class is on the application classpath, load it from the application classloader.
  4. ClassNotFoundException

This can be achieved with cytodynamics. The API classloader will be the parent of the infrastructure classloader, using a FULL isolation level and a regex specifying that all Samza API classes are preferred from the API classloader. A FULL isolation level means that a class will be loaded from the parent if the class matches the parent-preferred regex. This achieves Step 1 above. The application classloader will also be a parent of the infrastructure classloader, using a NONE isolation level. A NONE isolation level means that a class will be preferred to be loaded from the child, but the parent will be used as a fallback. This achieves Steps 2-3 above.

An effect of using this ordering is a pluggable class implemented by the application will be used when that class is not provided by the infrastructure plugins.

"Application" classloader

There are also many pluggable classes which are owned by an application owner. In the job coordinator, an example of this would be a custom SystemFactory implementation.

Similarly to the infrastructure classloader, this classloader needs to load Samza API interfaces from the API classloader.

Flow for loading a class from the application classloader:

  1. If a class is a Samza API class, then load it from the API classloader.
  2. If the class is on the application classpath, load it from the application classloader.
  3. ClassNotFoundException

This can be achieved with cytodynamics. The application classloader will be associated with the API classloader as a parent, using a FULL isolation level and a whitelisted list of Samza API classes. This gives us the desired loading.

This structure means that if the application classloader needs a class which is an infrastructure plugin (e.g. custom system factory using KafkaSystemFactory as an "underlying system implementation"), then it will load that class from the application classpath, not the infrastructure classpath. This is reasonable, because the application is providing the implementation of the pluggable class directly, so we will just treat the infrastructure plugin class as a regular library at that point.

The classpath for this classloader will be the package of JARs built by the application.

Handling SamzaApplication.describe

It is currently unnecessary to generate a whitelist for the infrastructure or the application classloader. The delegation strategy between the different classloaders allows us to avoid specifying other classes in a whitelist.

Since SamzaApplication.describe does not currently run on the job coordinator, then we do not yet need to have handling for classes used within SamzaApplication.describe, such as descriptors or serdes. The handling for components used in SamzaApplication.describe will be discussed further in other designs.

Classloader wiring

By using the special classloader to instantiate the "main" class, any dependencies will then be loaded using that classloader. Then Java will automatically propagate the special classloader through the rest of Samza. We can modify the "main" method to use reflection to load the "main" class and then trigger the actual Samza startup.

Code Block
public static void main(String[] args) {
  ClassLoader isolatingClassLoader = buildIsolatingClassLoader();
  Class<?> isolatedClass = Class.forName(MainClass.class.getName(), true, isolatingClassLoader);
  isolatedClass.getDeclaredMethod("doMain").invoke(null);
}

Pros

  • Cytodynamics provides an explicit and granular way to specify if a class should be from the parent classpath (i.e. API)
  • Classloader propagation allows the correct external dependencies to be used, even if infrastructure and the application use different versions of the same dependency
  • Do not need to modify existing Samza API classes
  • Do not need to explicitly wire classloader through Samza

Cons

  • Need to ensure proper specification of Samza API classes
    • Are there any classes that are not owned by Samza but are used as part of the Samza API? (e.g. java.lang)
  • Need to generate separate classpaths for each classloader
  • Multiple classloaders is not obvious, so certain assumptions are invalid (e.g. static variables are not shared across classloaders)
  • Extra dependency for Samza
    • Seems like a very lightweight dependency though

Making the necessary JARs available for running the job coordinator

Packaging the job coordinator JARs

  • for the deserialized object classes.
    • Note that object deserialization is not used on the job coordinator, so it is less of a concern in the scope of this SEP. However, we do need to consider it for applying isolation mode to the processing containers (in a future SEP), so it will be good if the strategy used in job coordinator isolation carries over to the processing containers. 
    • For the Avro case: Since the Avro objects need to be used by the application code, then the application will need to be able to choose the version of Avro. The infrastructure code will delegate to the application classloader for the Avro classes as well, which means that the Avro version chosen by the application does need to be compatible with the Avro version used by the infrastructure.
    • This also applies to other serdes such as SerializableSerde and JsonSerdeV2.

Flow for loading a class from the infrastructure classloader:

  1. If a class is a Samza API class, then load it from the API classloader.
  2. If the class is on the infrastructure classpath, load it from the infrastructure classloader.
  3. If the class is on the application classpath, load it from the application classloader.
  4. ClassNotFoundException

This can be achieved with cytodynamics. The API classloader will be the parent of the infrastructure classloader, using a FULL isolation level and a regex specifying that all Samza API classes are preferred from the API classloader. A FULL isolation level means that a class will be loaded from the parent if the class matches the parent-preferred regex. This achieves Step 1 above. The application classloader will also be a parent of the infrastructure classloader, using a NONE isolation level. A NONE isolation level means that a class will be preferred to be loaded from the child, but the parent will be used as a fallback. This achieves Steps 2-3 above.

An effect of using this ordering is a pluggable class implemented by the application will be used when that class is not provided by the infrastructure plugins.

"Application" classloader

There are also many pluggable classes which are owned by an application owner. In the job coordinator, an example of this would be a custom SystemFactory implementation.

Similarly to the infrastructure classloader, this classloader needs to load Samza API interfaces from the API classloader.

Flow for loading a class from the application classloader:

  1. If a class is a Samza API class, then load it from the API classloader.
  2. If the class is on the application classpath, load it from the application classloader.
  3. ClassNotFoundException

This can be achieved with cytodynamics. The application classloader will be associated with the API classloader as a parent, using a FULL isolation level and a whitelisted list of Samza API classes. This gives us the desired loading.

This structure means that if the application classloader needs a class which is an infrastructure plugin (e.g. custom system factory using KafkaSystemFactory as an "underlying system implementation"), then it will load that class from the application classpath, not the infrastructure classpath. This is reasonable, because the application is providing the implementation of the pluggable class directly, so we will just treat the infrastructure plugin class as a regular library at that point.

The classpath for this classloader will be the package of JARs built by the application.

Handling SamzaApplication.describe

The SamzaApplication.describe method needs to be able to delegate to the framework for certain concrete descriptor components (e.g. system descriptors, table functions). The framework descriptor components will be added as part of the framework API whitelist which will be checked when loading classes in the application classloader, so that the application classloader will delegate to the framework API classloader for framework descriptors. The descriptors are used to generate configs through the descriptor API classes, so concrete framework descriptors and custom descriptors will both work.

Table functions get serialized into configs by the table descriptors that they are contained in. They only need to be deserialized for processing logic, so the job coordinator does not need to deserialize them. On the processing containers, they can get deserialized using the framework infrastructure classloader, so that they can access application classes (e.g. schemas) if necessary. The infrastructure classloader will not delegate to the API classloader for the concrete descriptors.

Flow for loading a class from the application classloader:

  1. If a class is a framework API class, load it from the framework API classloader.
  2. If a class is a framework descriptor class, load it from the framework API classloader.
  3. Load the class from the application classpath.

Classloader wiring

By using the special classloader to instantiate the "main" class, any dependencies will then be loaded using that classloader. Then Java will automatically propagate the special classloader through the rest of Samza. We can modify the "main" method to use reflection to load the "main" class and then trigger the actual Samza startup.

Code Block
public static void main(String[] args) {
  ClassLoader isolatingClassLoader = buildIsolatingClassLoader();
  Class<?> isolatedClass = Class.forName(MainClass.class.getName(), true, isolatingClassLoader);
  isolatedClass.getDeclaredMethod("doMain").invoke(null);
}

Pros

  • Cytodynamics provides an explicit and granular way to specify if a class should be from the parent classpath (i.e. API)
  • Classloader propagation allows the correct external dependencies to be used, even if infrastructure and the application use different versions of the same dependency
  • Do not need to modify existing Samza API classes
  • Do not need to explicitly wire classloader through Samza

Cons

  • Need to ensure proper specification of Samza API classes
    • Are there any classes that are not owned by Samza but are used as part of the Samza API? (e.g. java.lang)
  • Need to generate separate classpaths for each classloader
  • Multiple classloaders is not obvious, so certain assumptions are invalid (e.g. static variables are not shared across classloaders)
  • Extra dependency for Samza
    • Seems like a very lightweight dependency though

Making the necessary JARs available for running the job coordinator

Packaging the job coordinator JARs

The API and infrastructure classloaders each need a package of JARs which is isolated from the application. Those packages need to be built separately from an application. They need to include the core Samza components (e.g. samza-api, samza-core), and they can contain any pluggable components used across many applications (e.g. samza-kafka). The directory structure of the API and infrastructure packages should be the same as the structure for the application (e.g. scripts in the "bin" directory, libraries The API and infrastructure classloaders each need a package of JARs which is isolated from the application. Those packages need to be built separately from an application. They need to include the core Samza components (e.g. samza-api, samza-core), and they can contain any pluggable components used across many applications (e.g. samza-kafka). The directory structure of the API and infrastructure packages should be the same as the structure for the application (e.g. scripts in the "bin" directory, libraries in the "lib" directory).

The packaging is left to the group of Samza jobs that are using the same set of job coordinator JARs, as different components may be included by different jobs. There are multiple tools that exist for building the packages (e.g. Gradle, Maven).

...

  • (required) samza:samza-core: job coordinator code, default groupers
  • (required) samza:samza-shell (launch scripts)
  • (optional; if using samza-log4j2 as infrastructure) samza:samza-log4j2
  • (optional; if using samza-kafka as infrastructure) samza:samza-kafka: Kafka checkpoint mananger implementation
  • (optional; if using samza-kv-rocksdb as infrastructure) samza:samza-kv-rocksdb: RocksDB storage engine
  • (optional; if using samza-yarn as infrastructure) samza:samza-yarn: YARN resource manager factory
  • Other Samza modules or custom modules can be included in here if they want to be considered as infrastructure.

Localizing the job coordinator JARs

When making a request to YARN, clients are allowed to pass a map of resources to localize on the container. Currently, the "yarn.package.path" config is used to localize the application package, and this includes the Samza infrastructure code. Applications will need to add other resources using "yarn.resources.*.path" configs.

...

  • they want to be considered as infrastructure.

Localizing the job coordinator JARs

When making a request to YARN, clients are allowed to pass a map of resources to localize on the container. Currently, the "yarn.package.path" config is used to localize the application package, and this includes the Samza infrastructure code. Applications will need to add framework resources using "yarn.resources.*.path" configs.

  1. Continue to use "yarn.package.path" for the application package.
  2. Set "yarn.resources.__samzaFrameworkApi.path" to the path for the API package.
  3. Set "yarn.resources.__samzaFrameworkInfrastructure.path" to the path for the infrastructure package.

Samza will look in specific locations on the file system for the JARs for setting up the classpaths for the different classloaders. The framework API classpath will come from "${user.dir}/__samzaFrameworkApi", the framework infrastructure classpath will come from "${user.dir}/__samzaFrameworkInfrastructure", and the application classpath will come from "${user.dir}/__package". When using the above 3 configs, YARN will place the resources into the desired locations.

In non-YARN execution environments, the "yarn" localization configurations won't apply. Other environments will have their own localization flows. If those other environments are unable to localize the resources into the desired file locations, then we can add a generic way (e.g. configuration or environment variables) to specify the file locations to get the classpath resources. The file location variables would apply to any Samza job; only the environment-specific localization flows would be different

...

.

Generating classpaths for the JARs

...

  1. Set the context classloader to be the infrastructure classloader
  2. The framework packaging needs to have a certain set-up. The following steps are for supporting log4j2 as the logging implementation for slf4j. It should be possible to support other logging implementations by adding the correct dependencies and specifying the correct classes on the API whitelist.
    1. Include log4j2 dependencies in the framework API package (org.apache.logging.log4j:log4j-api, org.apache.logging.log4j:log4j-core, org.apache.logging.log4j:log4j-slf4j-impl, org.apache.logging.log4j:log4j-1.2-api).
    2. Add the classes from log4j-api and log4j-core to the API whitelist. This can be done by just adding "org.apache.logging.log4j.*" to the whitelist.
    3. Include samza-log4j2 as a dependency for the framework infrastructure package.
    4. Include log4j2 dependencies in the framework infrastructure API package. These should already be included transitively through samza-log4j2.
    5. Exclude all log4j v1 dependencies from all classpaths (org.slf4j:slf4j-log4j12, log4j:log4j).
    6. (Recommended) Add a default log4j2.xml configuration file if there are cases in which the application does not provide one.
  3. When setting the system property for the log4j2 configuration file location ("log4j.configurationFile"), the application's log4j2.xml should be used if it exists. Otherwise, a default log4j2.xml configuration from the framework infrastructure can be used. This can be done by passing an extra environment variable which is the "application lib directory" which may contain the application's log4j2.xml file to the job coordinator execution, and then reading that environment variable in the run-class.sh script when setting the log4j configuration system property.

For more context about why these changes are needed, see Details around necessary changes for logging135861549.

Pros

  • Able to isolate log4j2 pluggable components built by Samza
  • Can override Samza infrastructure logging configuration

...

  1. Locally build the framework tarballs for API and infrastructure. It would be useful to put an example somewhere for how to build those tarballs.
  2. Deploy Zookeeper, Kafka, and YARN locally (https://samza.apache.org/startup/hello-samza/latest/).
  3. Fill in certain configs (see 135861549 above). These will go into the properties file passed to the run-app.sh script.
  4. Create the tarball for the application (https://samza.apache.org/startup/hello-samza/latest/). For testing local changes, remember to run the "publishToMavenLocal" command.

Automated integration test

...

  • This will require multiple configs, including the location of the framework artifacts for YARN resources (see 135861549 above).

...

Changes can also be committed to samza-hello-samza to automatically execute the steps above.

Automated integration test

We could also consider writing an integration test using the integration test framework (which uses real YARN)

...

.

Full YARN cluster testing

It will also be useful to deploy some test jobs into a full YARN cluster with multiple nodes in order to verify the functionality.

Alternative solutions

Alternative solutions for SEP-24

...