...
Current state: Under Discussion
Discussion thread: <link to mailing list DISCUSS thread>http://mail-archives.apache.org/mod_mbox/samza-dev/201703.mbox/browser
JIRA: SAMZA-1126
Released:
...
In Yarn runtime environment, we support a fixed set of processors. Where as, in Zookeeper runtime environment, we support an elastic model where the number of processors can vary during the lifetime of the job (Note: the number of processors within a job is limited by the number of tasks in the Samza job). Furthermore, the StreamProcessor API requires the user to specify a unique processorId. This adds undue burden on the user to guarantee uniqueness of identifier within the job, when processors can be added / removed at any time.
Hence, we need a more flexible model where ProcessorId can be generated by Samza framework or the custom configured by the user.
Proposed Changes
...
- Converts "processorId" parameter in StreamProcessor to a String
...
- Introduces a ProcessorIdentifier interface that can be used the users of StreamProcessor API to generate a unique processorId for each
...
- processor
**Note**: With the introduction of ApplicationRunner in SAMZA-1067, ApplicationRunner will be the Samza user-api, instead of StreamProcessor.
...
Code Block | ||||
---|---|---|---|---|
| ||||
// Existing Constructor(s) - to be deprecated @Deprecated StreamProcessor (int processorId, Config config, ... ) // New Constructor(s) StreamProcessor (StringConfig processorId, Config config, ... )config, ... ) |
Code Block | ||||
---|---|---|---|---|
| ||||
public interface JobCoordinator {
...
static final ProcessorIdGenerator getProcessorIdGeneratorFromConfig(Config config) {
// Static helper to load ProcessorIdGenerator class from config
}
String getProcessorId();
...
} |
New Interface
Code Block | ||||
---|---|---|---|---|
| ||||
/** * To be used in ApplicationRunner.javaby the JobCoordinator to generator ProcessorId * It can be configured by using a configuration - processor.idGenerator.class */ public interface ProcessorIdGenerator { /** * Generates a String representation to identify the processor instance * This value can be representative of its current executing environment. It can also be custom-managed by the user. * **Note**: processorId has to be unique among the processors within a job * @return String , identifier for the processor */ String generateProcessorId(); } // ExampleDefault public class NaiveIdGeneratorUUIDGenerator implements ProcessorIdGenerator { String generateProcessorId() { return UUID.randomUUID(); } } |
New Configuration
app.processor-id-generator.idGenerator.class - Class that is used to generate a processor Id such that each StreamProcessor composing a Samza job has a unique Identifier.
processor.id - In the absence of this app.processor-id-generator.class configuration, we expect a processor.id configuration to be set in the config. This provides config is purely for the sake of backward compatibility with the existing users of StreamProcessor API. It ONLY applies for Stream application(s) which is composed of a single job.
StreamProcessor usage. In the absence of either processor.id or or app.processor.idGenerator.class , Samza should throw an Exception for unknown processorId.
Implementation and Test Plan
- Introduce the new interface interface
- Add unit tests to test/verify compatibility
Compatibility, Deprecation, and Migration Plan
Breaking Compatibility: In version 0.13, we will deprecate remove current constructor overloads by providing compatible constructors that will use String.valueOf(<int>) to generate the String representation of processorId.It in StreamProcessor class. Hence, it is recommended that StreamProcessor API users, who are upgrading to Samza version > 0.12 should change their code to use the new constructor overloadsconstructors. If not, they will encounter build time errors on future upgradescompile time errors.
Existing users of StreamProcessor API should also set the processor.id config to the value previously assigned in the constructor.
Rejected Alternatives
None so far