You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-1126

Released: 

Problem

In Samza 0.12, we introduced the StreamProcessor API to support Samza as a Library model. That introduced "processorId" that is synonymous to the logical "containerId" that Samza assigns in a fixed-set container model. 
However, having an "int" processorId is insufficient to uniquely identify a processor in various runtime environment(s). This proposal is to define proper semantics to processorId and how it will be used in the samza tech stack.

NOTE: containerId and processorId may be used interchangeably.

Motivation

Current uses of containerId are:

  1. Logical Identifier that is used as a key in the JobModel
  2. Container Level Metrics are prefixed with the containerId

In Yarn runtime environment, we support a fixed set of processors. Where as, in Zookeeper runtime environment, we support an elastic model where the number of processors can vary during the lifetime of the job (Note: the number of processors within a job is limited by the number of tasks in the Samza job). Furthermore, the StreamProcessor API requires the user to specify a unique processorId. This adds undue burden on the user to guarantee uniqueness of identifier within the job, when processors can be added / removed at any time. 
Hence, we need a more flexible model where ProcessorId can be generated by Samza framework or the custom configured by the user.

Proposed Changes

* Converts "processorId" parameter in StreamProcessor to a String
* Introduces a ProcessorIdentifier interface that can be used the users of StreamProcessor API to generate a unique processorId for each processor

**Note**: With the introduction of ApplicationRunner in SAMZA-1067, ApplicationRunner will be the Samza user-api, instead of StreamProcessor. 

Public Interfaces

Interface Change

StreamProcessor.java
// Existing Constructor(s) - to be deprecated
@Deprecated
StreamProcessor (int processorId, Config config, ... )

// New Constructor(s)
StreamProcessor (String processorId, Config config, ... )

New Interface

ProcessorIdentifier.java
/**
  * To be used in ApplicationRunner.java
  * It can be configured by using a configuration - processor.idGenerator.class
  */
public interface ProcessorIdGenerator {
	/**
	* Generates a String representation to identify the processor instance
	* This value can be representative of its current executing environment. It can also be custom-managed by the user.
	* **Note**: processorId has to be unique among the processors within a job
	* @return String , identifier for the processor
	*/
	String generateProcessorId();
}
 
// Example
public class NaiveIdGenerator implements ProcessorIdGenerator {
	String generateProcessorId() {
	    return UUID.randomUUID();
	}
}

New Configuration

processor.idGenerator.class - Class that is used to generate a processor Id such that each StreamProcessor composing a Samza job has a unique Identifier. In the absence of this configuration, we expect a processor.id configuration to be set. This provides compatibility with the existing StreamProcessor usage. In the absence of either processor.id or processor.idGenerator.class , Samza should throw an Exception for unknown processorId.

Implementation and Test Plan

  • Introduce the new interface 
  • Add unit tests to test/verify compatibility

Compatibility, Deprecation, and Migration Plan

In version 0.13, we will deprecate constructor overloads by providing compatible constructors that will use String.valueOf(<int>) to generate the String representation of processorId.

It is recommended that StreamProcessor API users, who are upgrading to Samza version > 0.12 should change their code to use the new constructor overloads. If not, they will encounter build time errors on future upgrades.  

Rejected Alternatives

None so far

  • No labels