Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion Accepted

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201703.mbox/browser

JIRA: SAMZA-1126

Released:  0.13.0

Problem

In Samza 0.12, we introduced the StreamProcessor API to support Samza as a Library model. That introduced "processorId" that is synonymous to the logical "containerId" that Samza assigns in a fixed-set container model. 
However, having an "int" processorId is insufficient to uniquely identify a processor in various runtime environment(s). This proposal is to define proper semantics to processorId and how it will be used in the samza tech stack.

...

In Yarn runtime environment, we support a fixed set of processors. Where as, in Zookeeper runtime environment, we support an elastic model where the number of processors can vary during the lifetime of the job (Note: the number of processors within a job is limited by the number of tasks in the Samza job).

Furthermore, the StreamProcessor API requires the user to specify a unique processorId. This adds undue burden on the user to guarantee uniqueness of identifier within the job, when processors can be added / removed at any time. 
Hence, we need a more flexible model where ProcessorId can be generated by Samza framework or the custom configured by the user.

Proposed Changes

  • Converts "processorId" parameter in StreamProcessor to a String
  • Introduces a ProcessorIdentifier configurable ProcessorIdGenerator interface that can be used the users of StreamProcessor API by JobCoordinator to generate a unique processorId for each processor 
  • Job Coordinator has the responsibility of generating a unique processorId within the runtime environment. Hence, this needs to be a pluggable component. (For more details, see SAMZA-881
  • "processorId" is a String, contrasting to "containerId" that is an integer

Note: With the introduction of ApplicationRunner in SAMZA-1067, ApplicationRunner will be the Samza user-api, instead of StreamProcessor. 

...

Code Block
languagejava
titleJobCoordinator.java
public interface JobCoordinator {
	...
	 
	static final ProcessorIdGenerator getProcessorIdGeneratorFromConfig(Config config) {// Before
		// Static helper to load ProcessorIdGenerator class from config
	}int getProcessorId();
 
	String getProcessorId();
	...
}

...

Code Block
languagejava
titleProcessorIdentifierProcessorIdGenerator.java
/**
  * To be used by the JobCoordinator to generator ProcessorId
  * It can be configured by using a configuration - processor.idGeneratorapp.processor-id-generator.class
  */
public interface ProcessorIdGenerator {
	/**
	* Generates a String representation to identify the processor instance
	* This value can be representative of its current executing environment. It can also be custom-managed by the user.
	* **Note**: processorId has to be unique among the processors within a job
	* @return String , identifier for the processor
	*/
	String generateProcessorId();
}
 
// Default (after 0.13+)
public class UUIDGenerator implements ProcessorIdGenerator {
	String generateProcessorId() {
	    return UUID.randomUUID();
	}
}

...

processor.id - In the absence of app.processor-id-generator.class configuration, we expect a processor.id configuration to be set in the config. This config is purely for the sake of backward compatibility with existing users of StreamProcessor API. It ONLY applies for Stream application(s) which is composed of a single job.   

In the absence of either of processor.id or app.processor.idGenerator.class, Samza should throw an Exception for unknown processorId.

...

  • Introduce the new interface
  • Add unit tests to test/verify compatibility
  • JobModel compatibility - The data model hierarchy used "int" as containerId everywhere (as a key in JobModel & as a member variable in ContainerModel). JobModel is read from / written to CoordinatorStream using Jackson based serializer/deserializers. This needs to be changed to handle deserialization of old "int" containerIds, in order to not break existing Yarn jobs. 
  • Certain parts of the code implicitly assume the "comparable" nature of ContainerModel that relies on integer comparison of containerId. An example would be the TaskNameGrouper - GroupByContainerCount. In general, the TaskNameGrouper should take in a list of processorId, as opposed to assuming that processorId always falls within the 0 to N-1 range

Compatibility, Deprecation, and Migration Plan

Breaking Code Compatibility: In version 0.13, we will remove current constructor overloads in StreamProcessor class. Hence, it is recommended that StreamProcessor API users, who are upgrading to Samza version > 0.12 should change their code to use the new constructors. If not, they will encounter compile time errors. 

Existing users of StreamProcessor API should also set the processor.id config to the value previously assigned in the constructor.

Note: In case of more than one StreamProcessor within the same JVM, it is still required to use unique processorId for each StreamProcessor instance. The custom ProcessorIdGenerator, then, can contain a static counter which is incremented for each call to generateProcessorId and the counter value can be appended to the identifier

processor.id - to be deprecated in Samza 0.14.

Rejected Alternatives

None so far