Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Samza provides complete flexibility in usage of different data types for input steams, this framework will also provide complete flexibility for usage of primitive and derived data types. 


 

Public Interfaces

Code Block
languagejava
themeEclipse
titleTestTask
public class TestTask {
  private TestTask(String systemName, StreamTask task, HashMap<String, String> config) {...}
  private TestTask(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
  
  // Static Factory to config & create runner for low level api 
  public static TestTask create(String systemName, StreamTask task, HashMap<String, String> config) {...}
  public static TestTask create(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
  
  // Multithreading configs for users
  public TestTask setTaskMaxConcurrency(Integer value) {...}
  public TestTask setTaskCallBackTimeoutMS(Integer value) {...}
  public TestTask setTaskMaxConcurrency(Integer value) {...}
 
  // Configure an input stream for samza system, that task can consume from
  public TestTask addInputStream(CollectionStream stream) {...}
  // Configures an output stream for samza system, that task can producer to
  public TestTask addOutputStream(CollectionStream stream) {...}
  public void run() {...}
}

 


Code Block
languagejava
themeEclipse
titleFileStream
public class FileStream<T> {
  public static <T> FileStream<T> of(String fileUri) {...}
}
Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream provides utilities to build a in memory input stream of collections(list, map). It also supports initialization of 
* partitions  
*/
 
public class CollectionStream<T> {
  private CollectionStream(String systemStream, List<T> collection, Integer partitionCount) {...}
  private CollectionStream(String systemStream) {...}
  
  public static <T> CollectionStream<T> empty(String systemStream) {
    return new CollectionStream<>(systemStream);
  }
  
  public static <T> CollectionStream<T> of(String systemStream, List<T> collection){
    return new CollectionStream<>(systemStream, collection, 1);
  }
  
  public static <T> CollectionStream<T> ofPartitions(String systemStream, List<List<T>> collection){
    return new CollectionStream<>(systemStream, collection, collection.size());
  }
  
  public static <K, V> CollectionStream<KV<K, V>> of(String systemStream, Map<K, V> elems) {...}
}

 

 

Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
linenumberstrue
/**
* Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies 
* each integer with 10
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(systemName: "test-samza", myTask, config)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty(streamName: "output"))
    .run();
 
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40});

...