Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

  • Introduces a ProcessorIdentifier configurable ProcessorIdGenerator interface that can be used the users of StreamProcessor API by JobCoordinator to generate a unique processorId for each processor 
  • Job Coordinator has the responsibility of generating a unique processorId within the runtime environment. Hence, this needs to be a pluggable component. (For more details, see SAMZA-881
  • "processorId" is a String, contrasting to "containerId" that is an integer

Note: With the introduction of ApplicationRunner in SAMZA-1067, ApplicationRunner will be the Samza user-api, instead of StreamProcessor. 

...

Code Block
languagejava
titleJobCoordinator.java
public interface JobCoordinator {
	...
	
	default ProcessorIdGenerator getProcessorIdGeneratorFromConfig(Config config) {
		// Static helper to load ProcessorIdGenerator class from config
	}
 
	// Before
	// int getProcessorId();
 
	String getProcessorId();
	...
}

...

Code Block
languagejava
titleProcessorIdentifier.java
/**
  * To be used by the JobCoordinator to generator ProcessorId
  * It can be configured by using a configuration - app.processor-id-generator.class
  */
public interface ProcessorIdGenerator {
 
	/** 
	* Static helper to load ProcessorIdGenerator class from config
	*/
	static ProcessorIdGenerator createInstance(Config config) {
		... 
	}
	/**
	* Generates a String representation to identify the processor instance
	* This value can be representative of its current executing environment. It can also be custom-managed by the user.
	* **Note**: processorId has to be unique among the processors within a job
	* @return String , identifier for the processor
	*/
	String generateProcessorId();
}
 
// Default
public class UUIDGenerator implements ProcessorIdGenerator {
	String generateProcessorId() {
	    return UUID.randomUUID();
	}
}

...

processor.id - In the absence of app.processor-id-generator.class configuration, we expect a processor.id configuration to be set in the config. This config is purely for the sake of backward compatibility with existing users of StreamProcessor API. It ONLY applies for Stream application(s) which is composed of a single job.   

In the absence of either of processor.id or app.processor.idGenerator.class, Samza should throw an Exception for unknown processorId.

...

Existing users of StreamProcessor API should also set the processor.id config to the value previously assigned in the constructor. 

processor.id - to be deprecated in Samza 0.14.

Rejected Alternatives

None so far