It would be helpful to be able to isolate the dependencies of the Samza infrastructure from the dependencies of the application. This SEP covers how to achieve this for the cluster-based job coordinator, which is used when running Samza jobs in resource management systems like YARN.


cluster-based job coordinatorprocess that is responsible for managing the processing containers of a Samza job (e.g. starting containers, keeping correct # of containers running) when running Samza with a resource management system
YARNa resource management system which can be used to run Samza jobs
application mastera cluster-based job coordinator in the context of YARN
application runnerSamza component which is responsible for launching an application
application (or application-specific)code and dependencies which are specific to a particular Samza application, as opposed to Samza infrastructure
pluggable (or plugin) classclass which is specified by an application through configuration (e.g. system factory, grouper)


  • Application dependencies should not be able to impact the Samza cluster-based job coordinator
  • Solution should be leverageable for the Samza logic running on processing containers


Existing JAR management

Currently, Samza infrastructure code and dependencies are included in the tarball with the Samza application. This means that conflicting dependencies between the application and Samza are resolved at build time before the tarball is created, which can cause a certain version of a dependency to be excluded. All JARs in the tarball are installed into a single directory for classpath generation and execution.

Decoupling job coordinator JARs from application JARs

In order to isolate the job coordinator JARs from the application JARs, we will use multiple classloaders, associated with different classpaths. The JARs needed by the "application" will be in a different classpath than the JARs needed for the infrastructure. The separated classpaths will allow duplicate dependencies to be used within the same JVM. The functionality we build will need to ensure that the correct dependency is used for a given class (e.g. infrastructure dependency for infrastructure class vs. application dependency for application class).

Useful details about classloaders:

  • If a class A is directly defined by a classloader CL (CL is called the "defining loader"), then classloader CL will also be called to load (i.e. call loadClass) any dependencies of class A (even when using reflection). If a classloader CL delegates to another classloader CL1 for actually defining class A, and classloader CL1 actually defines class A, then classloader CL1 will be called to load dependencies of class A.
  • If a class A is directly loaded by classloader CL and class A is also directly loaded by classloader CL1, then an instance corresponding to the first class A cannot be cast to the second class A. This means that all classes (or instances of classes) that are shared "across" classloaders must be loaded from the same classloader. An interface loaded by a common classloader is sufficient to allow sharing of a concrete object, even if the concrete class comes from a different classloader than the interface.

We will leverage the cytodynamics library ( to help manage classloaders. The cytodynamics library provides a way to annotate or whitelist certain classes as "API classes", to be loaded through a parent classpath. The rest will be loaded by a child classpath. The cytodynamics library also provides a way to load classes from the child classpath before checking the parent. These are useful features for ensuring that dependencies are chosen correctly at runtime.

This design involves using three separate classloaders: API, infrastructure, and application. API is associated with the classes that might be implemented or used outside of Samza, such as SamzaApplication or SystemFactory. Infrastructure consists of the core implementation of Samza (e.g. ClusterBasedJobCoordinator) and built-in plugin implementations (e.g. KafkaSystemFactory). Application is the code provided by the application.

"API" classloader

This classloader is responsible for loading the following categories of classes:

  • Basic Samza API interfaces/classes (e.g. StreamApplication, TaskApplication), since Samza processes interact directly with those Samza API classes
  • Base classes which can be used to help build pluggable classes (e.g. BlockingEnvelopeMap, KeyValueStorageEngine), in order to isolate the base logic
  • Utility libraries provided by Samza to help build applications

The classpath that is associated with this classloader will not contain specific implementations of any API interfaces (e.g. KafkaSystemFactory). Those classes will be accessed through the "infrastructure" or "application" classloaders.

This classloader can be a URLClassLoader with the bootstrap classloader as the parent.

Generating the Samza API whitelist

In order to load the Samza API classes from the API classloader, we need to tell cytodynamics what those classes are. We can do this by providing a whitelist of packages/classes when building the cytodynamics classloader. All public interfaces/classes inside of samza-api should be considered an API class. One way to generate this whitelist is to use a Gradle task to find all the classes from samza-api and put that list in a file. Then, that file can be read by Samza when constructing the cytodynamics classloader. The Gradle task should also include classes from samza-kv.

Other than classes that are explicitly provided by Samza as API, there are some other classes which need to be loaded by a common classloader so that they can be shared across classloaders. For some cases, like log4j2, instead of including each specific class name, cytodynamics accepts wildcard entries for the whitelist (e.g. "org.apache.logging.log4j.*").

samza-apimain API classes
samza-kvsome classes from here are used by implementations of pluggable classes
org.apache.logging.log4j:log4j-apisee Logging below for more information
org.apache.logging.log4j:log4j-coresee Logging below for more information

"Infrastructure" classloader

This classloader is responsible for loading the following categories of classes:

  • Classes which are used directly when starting up a Samza process (e.g. ClusterBasedJobCoordinator)
  • Implementations of plugin classes which are owned by the Samza team (e.g. KafkaSystemFactory)
    • These might not directly come from the Samza project. For example, a custom system implementation can be included here if it is desirable to consider it as "framework code".

This classloader will need to be able to delegate to the other classloaders in some cases.

  • The pluggable classes implement Samza API interfaces (e.g. SystemFactory), and the classes corresponding to those interfaces need to be loaded by the API classloader. Implementations of plugin interfaces can be on both the "infrastructure" and "application" classpaths, and all components need to use interfacs loaded by the same classloader (i.e. API classloader).
  • Object deserialization (e.g. Avro) may be used within "infrastructure plugins" code, but the application must provide the classes for the concrete objects at runtime. For this case, the "infrastructure plugins" classloader needs to delegate to the application classloader.
  • Samza provides SerializableSerde and JsonSerdeV2 as serdes, but the classes being used are from the application side (this is similar to Avro).

Flow for loading a class from the infrastructure classloader:

  1. If a class is a Samza API class, then load it from the API classloader.
  2. If the class is on the infrastructure classpath, load it from the infrastructure classloader.
  3. If the class is on the application classpath, load it from the application classloader.
  4. ClassNotFoundException

This can be achieved with cytodynamics. The API classloader will be the parent of the infrastructure classloader, using a FULL isolation level and a regex specifying that all Samza API classes are preferred from the API classloader. A FULL isolation level means that a class will be loaded from the parent if the class matches the parent-preferred regex. This achieves Step 1 above. The application classloader will also be a parent of the infrastructure classloader, using a NONE isolation level. A NONE isolation level means that a class will be preferred to be loaded from the child, but the parent will be used as a fallback. This achieves Steps 2-3 above.

An effect of using this ordering is a pluggable class implemented by the application will be used when that class is not provided by the infrastructure plugins.

"Application" classloader

There are also many pluggable classes which are owned by an application owner. In the job coordinator, an example of this would be a custom SystemFactory implementation.

Similarly to the infrastructure classloader, this classloader needs to load Samza API interfaces from the API classloader.

Flow for loading a class from the application classloader:

  1. If a class is a Samza API class, then load it from the API classloader.
  2. If the class is on the application classpath, load it from the application classloader.
  3. ClassNotFoundException

This can be achieved with cytodynamics. The application classloader will be associated with the API classloader as a parent, using a FULL isolation level and a whitelisted list of Samza API classes. This gives us the desired loading.

This structure means that if the application classloader needs a class which is an infrastructure plugin (e.g. custom system factory using KafkaSystemFactory as an "underlying system implementation"), then it will load that class from the application classpath, not the infrastructure classpath. This is reasonable, because the application is providing the implementation of the pluggable class directly, so we will just treat the infrastructure plugin class as a regular library at that point.

The classpath for this classloader will be the package of JARs built by the application.

Handling SamzaApplication.describe

It is currently unnecessary to generate a whitelist for the infrastructure or the application classloader. The delegation strategy between the different classloaders allows us to avoid specifying other classes in a whitelist.

Since SamzaApplication.describe does not currently run on the job coordinator, then we do not yet need to have handling for classes used within SamzaApplication.describe, such as descriptors or serdes. The handling for components used in SamzaApplication.describe will be discussed further in other designs.

Classloader wiring

By using the special classloader to instantiate the "main" class, any dependencies will then be loaded using that classloader. Then Java will automatically propagate the special classloader through the rest of Samza. We can modify the "main" method to use reflection to load the "main" class and then trigger the actual Samza startup.

Code Block
public static void main(String[] args) {
  ClassLoader isolatingClassLoader = buildIsolatingClassLoader();
  Class<?> isolatedClass = Class.forName(MainClass.class.getName(), true, isolatingClassLoader);


  • Cytodynamics provides an explicit and granular way to specify if a class should be from the parent classpath (i.e. API)
  • Classloader propagation allows the correct external dependencies to be used, even if infrastructure and the application use different versions of the same dependency
  • Do not need to modify existing Samza API classes
  • Do not need to explicitly wire classloader through Samza


  • Need to ensure proper specification of Samza API classes
    • Are there any classes that are not owned by Samza but are used as part of the Samza API? (e.g. java.lang)
  • Need to generate separate classpaths for each classloader
  • Multiple classloaders is not obvious, so certain assumptions are invalid (e.g. static variables are not shared across classloaders)
  • Extra dependency for Samza
    • Seems like a very lightweight dependency though

Making the necessary JARs available for running the job coordinator

Packaging the job coordinator JARs

The API and infrastructure classloaders each need a package of JARs which is isolated from the application. Those packages need to be built separately from an application. They need to include the core Samza components (e.g. samza-api, samza-core), and they can contain any pluggable components used across many applications (e.g. samza-kafka). This packaging is left to the Samza user (or group of users), as different components may be included by different users. There are multiple tools that exist for building the packages (e.g. Gradle, Maven).


API classloader dependencies

  • (required) samza:samza-api
  • (required) samza:samza-kv: includes KeyValueStorageEngine, which is a base class for StorageEngine
  • (optional; if using samza-log4j2 as infrastructure) log4j2 API/core

Infrastructure classloader dependencies

  • (required) samza:samza-core: job coordinator code, default groupers
  • (required) samza:samza-shell (launch scripts)
  • (optional; if using samza-log4j2 as infrastructure) samza:samza-log4j2
  • (optional; if using samza-kafka as infrastructure) samza:samza-kafka: Kafka checkpoint mananger implementation
  • (optional; if using samza-kv-rocksdb as infrastructure) samza:samza-kv-rocksdb: RocksDB storage engine
  • (optional; if using samza-yarn as infrastructure) samza:samza-yarn: YARN resource manager factory
  • Other Samza modules or custom modules can be included in here if they want to be considered as infrastructure.

Localizing the job coordinator JARs

When making a request to YARN, clients are allowed to pass a map of resources to localize on the container. Currently, the "yarn.package.path" config is used to localize the application package, and this includes the Samza infrastructure code. Applications will need to add other resources using "yarn.resources.*.path" configs.

  1. Continue to use "yarn.package.path" for the application package.
  2. Set "yarn.resources.__api.path" to the path for the API package.
  3. Set "yarn.resources.__infrastructure.path" to the path for the infrastructure package.

Generating classpaths for the JARs

Before this design, an application just had a single classpath, so we could specify that classpath through the "classpath" option for the "java" command.

This design introduces multiple classloaders, and each one has its own classpath. The cytodynamics library accepts a classpath for building a classloader. Therefore, we will need a way to generate the classpath for each separate classloader to be made accessible to the Java process. We will have different directories for each category of JARs (i.e. API, infrastructure, application), so the classpath for a certain classloader can consist of a list of all JARs in the corresponding directory.

The current working directory can be obtained from System.getProperty("user.dir"), and we can find the separate JAR directories from there in code. We can also generate the classpaths in code by finding all of the JAR files in a given directory.


  • Easier to localize Samza infrastructure on its own, since it is separate from applications
  • Evolves well into general split deployment, since can just localize different Samza packages to do an upgrade
  • Leverages existing flow for localizing JARs
  • Samza infrastructure can define the full runtime package of JARs (including dependencies) at build time


  • Need to ensure that framework packages has consistent versions with the version of Samza used within the application
  • Need to localize artifacts to multiple places
  • Not all jobs use all infrastructure plugins, so this would localize more JARs than necessary for each job


In the single classloader case, all classes were easily able to use logging through static access to the logging API (e.g. slf4j). Samza did have to do a little bit of management for using the log4j binding vs. the log4j2 binding for slf4j.

With multiple classloaders, we have to be more careful, since static contexts are not shareable if they get loaded by different classloaders.

It would be good if we can split deploy Samza implementations of log4j pluggable components (e.g. StreamAppender).

Useful notes:

  • Log4j searches for a configuration file specified by the "log4j.configuration" system property (or "log4j.configurationFile" for log4j2). If that property is not specified, then log4j will try to find a log4j.xml (or log4j2.xml for log4j2) file on the classpath. Note that log4j2 will look also for a log4j2.xml if the file specified at "log4j.configurationFile" is not found. See LogManager for the log4j implementation and ConfigurationFactory for the log4j2 implementation.
    • Samza does specify the "log4j.configuration" property in
    • If the "log4j.configuration" system property is an accessible file, then all classloaders will be able to load it.
    • The log4j.xml file will only be searched for through the current classloader.
  • When initializing a class that has a static slf4j Logger field, the LoggerFactory and some core log4j components/interfaces will be loaded from the "current" classloader. However, some pluggable log4j components, (e.g. Appender) will be loaded by the Thread.getContextClassLoader and then passed back to the "current" classloader. If the context classloader loads core log4j components separately from the "current" classloader, then the appenders can't be shared, since the Appender interface would need to come from the same classloader.
    • A config "log4j.ignoreTCL" does exist to ignore the context classloader. Log4j will fall back to using the current classloader if the context class loader is not found or ignored (see org.apache.log4j.helpers.Loader). Samza doesn't currently set the context class loader, although it might be possible that the context class loader gets set by some system using Samza.
  • We should not instantiate multiple instances of RollingFileAppender which write to the same file at the same time due to concurrency issues. Usually, this isn't something to worry about since logging is initialized statically, but when there are multiple classloaders, it is possible to instantiate multiple appenders at the same time.
  • Log4j2 does some special resource loading involving looking at the parent classloader of the context classloader (see ProviderUtil), so we need to be careful if log4j-core is on both the API and infrastructure classpaths, since it might lead to using the same class from both classloaders.
    • This can lead to error logs of the form "Unrecognized format specifier" and "Unrecognized conversion specifier", since plugins get loaded from one classloader and get sent to the other.
  • If a context classloader is set, then all log4j2 plugins are loaded from that classloader. Otherwise, it will load from the "current classloader".


  1. To use the pluggable components from the infrastructure classloader, the context classloader needs to be set to the infrastructure classloader.
  2. Framework API module will include slf4j and log4j2 dependencies (including log4j2 binding). Only log4j-api and log4j-core classes will be in the API whitelist.
    1. slf4j dependencies are just needed for the classes in the API module which use logging.
      1. We should not add the slf4j API nor any slf4j binding to the parent-preferred whitelist for the API classloader. If the application does not want to use the logging framework that is used by API/infrastructure, then that should be allowed. This does mean that the application will always need to include an slf4j binding on its classpath if it is using slf4j, even if it is the slf4j to log4j2 binding. If the slf4j to log4j2 binding is included by the application, then it will delegate to the API classloader for log4j-api classes.
    2. log4j-api is included in the API whitelist so the log4j2 concrete classes which implement log4j-api classes (e.g. LoggerContextFactory) and are loaded by the context classloader would be compatible with the application layer
    3. log4j-core is included in the API whitelist since there are some log4j2 concrete classes which implement log4j-core interfaces (e.g. Appender) and come from the application classloader, and those need to be compatible with the infrastructure layer
  3. Infrastructure module will include slf4j and log4j2 dependencies (including log4j2 binding). It will also include samza-log4j2.
    1. slf4j-api and log4j-slf4j-impl are needed for the classes in the API module which use logging.
    2. log4j-api classes will end up getting loaded from the API classloader, so it's not necessary to include it, but it will be transitively pulled in and it is not necessary to exclude it.
    3. log4j-core is needed for base log4j2 functionality and for being able to use custom Samza log4j2 plugins
    4. samza-log4j2 is for including custom Samza log4j2 plugins
  4. When setting the log4j2 configuration file ("log4j.configurationFile" system property), we need to use the application's log4j2.xml if it exists. If the application does not provide that file, then we need to provide a default log4j2.xml in the infrastructure classpath.
    1. This can be done by passing an extra environment variable which is the "application lib directory" which may contain the application's log4j2.xml file to the job coordinator execution, and then reading that environment variable in the script when setting the log4j configuration system property.
  5. All classloaders (API, infrastructure, application) need to exclude "log4j:log4j" (i.e. log4j1) from the classpath and use "org.apache.logging.log4j:log4j-1.2-api" (i.e. bridge from log4j1 to log4j2). This means log4j1 will not be supported, and log4j2 must be used.


  • Able to split deploy log4j2 pluggable components built by Samza
  • Can override Samza infrastructure logging configuration
  • Applications can choose their own logging API


  • Samza ends up controlling log4j2 API version
  • Need to figure out how to manage configuration files for log4j2 correctly
  • No support for log4j1, so existing apps would need to migrate to log4j2

External context

On the job coordinator, no ExternalContext is built, so there should be no conflict between Samza infrastructure and application. Therefore, we don't need to do anything for isolation for ExternalContext usage in the job coordinator.

We will need to consider the conflict on the application runners when using ExternalContext in SamzaApplication.describe, and we need to ensure that the pattern we choose works with general split deployment in which we need to consider the ExternalContext usage on processing containers. This will be discussed in other designs.


Some Beam infrastructure code runs on Samza RAIN hosts when a deployment is requested. This is needed for creating the SamzaApplication in order to call the Samza RemoteApplicationRunner. Beam can have its own container dependency which includes the Samza infrastructure JARs. This allows the Beam applications to not explicitly specify a Samza version.

No Beam-specific code runs on the application master, so we do not need to make additional changes for that part.


Currently, Samza SQL applications just consist of SQL statements (i.e. text in a file).

The functionality provided by this document should not currently be leveraged by Samza SQL, since Samza SQL requires general split deployment and isolation is not needed due to the non-existence of application JARs. We still need to ensure that the new functionality does not break the existing Samza SQL functionality. One area to watch out for is that Samza SQL currently uses the SQL framework code as the main classpath, so that should not break.

In the future, UDFs should be able to be specified by applications. We should be able to leverage the separate classloader solution for this. Also, it is possible in the future that the job coordinator will need to run SQL-specific code. This would likely be a pluggable component, so we should be able to handle that by including it on the Samza infrastructure classpath.

Backward Compatibility

If this feature is off, this is backwards compatible, because we will use the old single-classpath model.

If this feature is on, then there is some runtime impact:

  • Before, when Samza-owned components were packaged with the application, then their runtime dependencies would be dependent on the application's dependencies, so their runtime dependencies might not match their build-time dependencies. In this design, the Samza-owned components on the job coordinator are able to use the actual build-time dependencies as runtime dependencies. However, the old behavior will continue to exist in the application runners and on the processing containers. Therefore, this design will introduce an inconsistency between the dependencies used across the runners, the job coordinator, and the processing containers. If there is any flow which requires the same set of dependencies to be used across all 3 pieces, then there would be a problem. This would be a general problem of any solution which only does job coordinator dependency isolation. An example of an issue could be if Java serialization is used to serialize a class from a transitive dependency on the application runner, and then it is deserialized on the job coordinator, where the version of the class from the transitive dependency on the job coordinator is different than the version on the runner. Although it is possible that this could break something, it seems very unlikely that it could cause a problem. The inter-process flows we currently have involving the job coordinator should be using objects defined within Samza (same Samza version is used across components), simple objects (e.g. strings), or serialization technologies that have good compatibility concepts built-in (e.g. JSON). Once we have general split deployment, this will no longer be a problem.
    • The Jackson JSON library itself could still be inconsistent due to application packaging, but the application should only be able to override the minor version of what Samza uses (i.e. Jackson 1.*), since Jackson 2.* has a different artifact name and class namespace.