Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Config keyDescription
samza.cluster.based.job.coordinator.dependency.isolation.enabledSet to "true" to enable cluster-based job coordinator dependency isolation

YARN-specific

These configs are for localizing the framework resources in a YARN environment. If using a different execution environment, then it will be necessary to specify localization configs specific to that environment for the framework API and framework infrastructure resources. Other environments may have a different way for specifying the resource locations.

Config keyDescription
yarn.resources.__samzaFrameworkApi.pathPath to the Samza framework API resource
yarn.resources.__samzaFrameworkApi.*Any other YARN resource configurations for the Samza framework API resource
yarn.resources.__samzaFrameworkInfrastructure.pathPath to the Samza framework infrastructure resource
yarn.resources.__samzaFrameworkInfrastructure.*Any other YARN resource configurations for the Samza framework infrastructure resource

Existing JAR management

Currently, Samza infrastructure code and dependencies are included in the tarball with the Samza application. This means that conflicting dependencies between the application and Samza are resolved at build time before the tarball is created, which can cause a certain version of a dependency to be excluded. All JARs in the tarball are installed into a single directory for classpath generation and execution.

...

Generating the Samza API whitelist

In order to load the Samza API classes from the API classloader, we need to tell cytodynamics what those classes are. We can do this by providing a whitelist of packages/classes when building the cytodynamics classloader. All public interfaces/classes inside of samza-api should be considered an API class. One way to generate this whitelist is to use a Gradle task to find all the classes from samza-api and put that list in a file. Then, that file can be read by Samza when constructing the cytodynamics classloader. The Gradle task should also include classes from samza-kv.

...

  • The pluggable classes implement Samza API interfaces (e.g. SystemFactory), and the classes corresponding to those interfaces need to be loaded by the API classloader. Implementations of plugin interfaces can be on both the "infrastructure" and "application" classpaths, and all components need to use interfacs loaded by the same classloader (i.e. API classloader).
  • Object deserialization (e.g. Avro) may be used within "infrastructure plugins" code, but the application must provide the classes for the concrete deserialized objects at runtime, since the application will be using those deserialized objects. For this case, the "infrastructure plugins" classloader needs to will load the infrastructure plugins class, but it will need to delegate to the application classloader for the deserialized object classes.
    • Note that object deserialization is not used on the job coordinator, so it is less of a concern in the scope of this SEP. However, we do need to consider it for applying isolation mode to the processing containers (in a future SEP), so it will be good if the strategy used in job coordinator isolation carries over to the processing containers. 
    • For the Avro case: Since the Avro objects need to be used by the application code, then the application will need to be able to choose the version of Avro. The infrastructure code will delegate to the application classloader
    .Samza provides SerializableSerde and JsonSerdeV2 as serdes, but the classes being used are from the application side (this is similar to Avro)
    • for the Avro classes as well, which means that the Avro version chosen by the application does need to be compatible with the Avro version used by the infrastructure.
    • This also applies to other serdes such as SerializableSerde and JsonSerdeV2.

Flow for loading a class from the infrastructure classloader:

...

A consequence of this structure is that there are "multiple" application classloaders on the job coordinator: one in this describe flow and the one described above at at "APIApplication" classloader. Therefore, any classes loaded by one of the application classloaders cannot be used by the classes of the other application classloader. An example of when this could happen is in the low-level API. The application's TaskFactory implementation will be loaded by the application classloader described above, but the Kafka events deserialized into Avro objects will be loaded by the other application classloader. Even though the Avro objects are the same class (even associated with the same binary), the TaskFactory implementation won't be able to use the Avro objects since a different classloader instance was used. We can solve this by serializing the components specified through the descriptor and deserializing those components using the classloader that is used for the rest of the AM. This is consistent with the strategy to be able to serialize the whole job description. The interfaces have already been marked as Serializable.

...