Status
Current state: Under Discussion
Discussion thread: <link to mailing list DISCUSS thread>
JIRA: SAMZA-1126
Released:
Problem
In Samza 0.12, we introduced the StreamProcessor API to support Samza as a Library model. That introduced "processorId" that is synonymous to the logical "containerId" that Samza assigns in a fixed-set container model.
However, having an "int" processorId is insufficient to uniquely identify a processor in various runtime environment(s). This proposal is to define proper semantics to processorId and how it will be used in the samza tech stack.
NOTE: containerId and processorId may be used interchangeably.
Motivation
Current uses of containerId are:
- Logical Identifier that is used as a key in the JobModel
- Container Level Metrics are prefixed with the containerId
In Yarn runtime environment, we support a fixed set of processors. Where as, in Zookeeper runtime environment, we support an elastic model where the number of processors can vary during the lifetime of the job (Note: the number of processors within a job is limited by the number of tasks in the Samza job). Furthermore, the StreamProcessor API requires the user to specify a unique processorId. This adds undue burden on the user to guarantee uniqueness of identifier within the job, when processors can be added / removed at any time.
Hence, we need a more flexible model where ProcessorId can be generated by Samza framework or the custom configured by the user.
Proposed Changes
* Converts "processorId" parameter in StreamProcessor to a String
* Introduces a ProcessorIdentifier interface that can be used the users of StreamProcessor API to generate a unique processorId for each processor
**Note**: With the introduction of ApplicationRunner in SAMZA-1067, ApplicationRunner will be the Samza user-api, instead of StreamProcessor.
Public Interfaces
Interface Change
// Existing Constructor(s) - to be deprecated @Deprecated StreamProcessor (int processorId, Config config, ... ) // New Constructor(s) StreamProcessor (String processorId, Config config, ... )
New Interface
/** * To be used in ApplicationRunner.java * It can be configured by using a configuration - processor.idGenerator.class */ public interface ProcessorIdGenerator { /** * Generates a String representation to identify the processor instance * This value can be representative of its current executing environment. It can also be custom-managed by the user. * **Note**: processorId has to be unique among the processors within a job * @return String , identifier for the processor */ String generateProcessorId(); } // Example public class NaiveIdGenerator implements ProcessorIdGenerator { String generateProcessorId() { return UUID.randomUUID(); } }
New Configuration
processor.idGenerator.class - Class that is used to generate a processor Id such that each StreamProcessor composing a Samza job has a unique Identifier. In the absence of this configuration, we expect a processor.id configuration to be set. This provides compatibility with the existing StreamProcessor usage. In the absence of either processor.id or processor.idGenerator.class , Samza should throw an Exception for unknown processorId.
Implementation and Test Plan
- Introduce the new interface
- Add unit tests to test/verify compatibility
Compatibility, Deprecation, and Migration Plan
In version 0.13, we will deprecate constructor overloads by providing compatible constructors that will use String.valueOf(<int>) to generate the String representation of processorId.
It is recommended that StreamProcessor API users, who are upgrading to Samza version > 0.12 should change their code to use the new constructor overloads. If not, they will encounter build time errors on future upgrades.
Rejected Alternatives
None so far