Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion Accepted

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201703.mbox/browser

JIRA: SAMZA-1126

Released:  0.13.0

Problem

In Samza 0.12, we introduced the StreamProcessor API to support Samza as a Library model. That introduced "processorId" that is synonymous to the logical "containerId" that Samza assigns in a fixed-set container model. 
However, having an "int" processorId is insufficient to uniquely identify a processor in various runtime environment(s). This proposal is to define proper semantics to processorId and how it will be used in the samza tech stack.

...

Code Block
languagejava
titleJobCoordinator.java
public interface JobCoordinator {
	...
 
	// Before
	// int getProcessorId();
 
	String getLocalProcessorIdgetProcessorId();
	...
}

New Interface

Code Block
languagejava
titleProcessorIdGenerator.java
/**
  * To be used by the JobCoordinator to generator ProcessorId
  * It can be configured by using a configuration - app.processor-id-generator.class
  */
public interface ProcessorIdGenerator {
	/**
	* Generates a String representation to identify the processor instance
	* This value can be representative of its current executing environment. It can also be custom-managed by the user.
	* **Note**: processorId has to be unique among the processors within a job
	* @return String , identifier for the processor
	*/
	String generateProcessorId();
}
 
// Default (after 0.13+)
public class UUIDGenerator implements ProcessorIdGenerator {
	String generateProcessorId() {
	    return UUID.randomUUID();
	}
}

...

Existing users of StreamProcessor API should also set the processor.id config to the value previously assigned in the constructor.

Note: In In case of more than one StreamProcessor within the same JVM, it is still required to use unique processorId for each StreamProcessor instance. The custom ProcessorIdGenerator, then, should can contain a static counter which is incremented for each call to generateProcessorId and the counter value can be appended to the identifier. 

...