Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleTestTask
/**
* TestTask provides static factory for quick setup of Samza environment for testing low level api, users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask
*/ 

public class TestTask {
  
  // Static Factory to config & create runner for low level api 
  // Mode is either SingleContainer or MultiContainer
  public static TestTask create(StreamTask task, Map<String, String> config, Mode mode) {...}
  public static TestTask create(AsyncStreamTask task, Map<String, String> config, Mode mode) {...}
  
  // MultithreadingAdd/Ovveride configsany forcustom usersconfigs
  public TestTask setTaskMaxConcurrencyaddOverrideConfigs(IntegerMap<String,String> valueconfig) {...}
 
  public TestTask setTaskCallBackTimeoutMS(Integer value) {...}// Set container mode either single container or multi container
  public TestTask setTaskMaxConcurrencysetContainerMode(IntegerMode valuemode) {...}
 
  // ConfigureMultithreading anconfigs input stream for samza system, that task can consume fromusers
  public TestTask addInputStreamsetTaskMaxConcurrency(CollectionStreamInteger streamvalue) {...}
  
public TestTask // Configures an output stream for samza system, that task can producer tosetTaskCallBackTimeoutMS(Integer value) {...}
  public TestTask addOutputStreamsetTaskMaxConcurrency(CollectionStreamInteger streamvalue) {...}
   
  // Configure Runstate thefor appapplication
  public voidTestApplication runaddState(String storeName) {...}
}

 
Code Block
languagejava
themeEclipse
titleTestApplication
/**
* TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input  
* streams and then apply various operators on streams and run the application
*/
 
public class TestApplication<T> {  
  // Configure an input stream for samza system, that task can consume from
  public TestTask addInputStream(CollectionStream stream) {...}
  
  // Configures an output stream for samza system, that task can producer to
  public TestTask addOutputStream(CollectionStream stream) {...}
  
  // Run the app
  public void run() {...}
}

 
Code Block
languagejava
themeEclipse
titleTestApplication
/**
* TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input  
* streams and then apply various operators on streams and run the application
*/
 
public class TestApplication<T> {  
  // Static factory to config & create runner for High level api 
  public static TestApplication create(Map<String, String> config) {...}

  // Add/Ovveride any custom configs
  public TestTask addOverrideConfigs(Map<String,String> config) {...}

  // Set container mode either single container or multi container
  public TestTask setContainerMode(Mode mode) {...}

  // Multithreading configs for users
  public TestTask setTaskMaxConcurrency(Integer value Static factory to config & create runner for High level api 
  public static TestApplication create(Map<String, String> config, Mode mode) {...}
  
public TestTask // Configure state for applicationsetTaskCallBackTimeoutMS(Integer value) {...}
  public TestApplicationTestTask addStatesetTaskMaxConcurrency(Serde<?>Integer serdevalue) {...}  
  // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on
  public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...}
  public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...}
  public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...}
  
  // Run the app
  public void run() {...};
}

...

Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of 
* multiple partitions for an input stream 
*/
 
public class CollectionStream<T> { 
  // Create an empty stream that a Samza task can produce to
  public static <T> CollectionStream<T> empty(String steamId) {...}
  
  // Create a stream of messages from input list with single partition
  public static <T> CollectionStream<T> of(String steamId, Iterable<T> collection){...}
  
  // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId
  public static <T> CollectionStream<T> of(String steamId, Map<Integer,Iterable> partitions){...}
  
  // Create a stream of messages from input list with multiple partition (REMOVE THIS ??????) of partitions map is partitionId
  public static <K, V> CollectionStream<KV<K, V>><T> CollectionStream<T> of(String steamId, Map<KMap<Integer,Iterable> V> elemspartitions) {...}
  
}
Code Block
languagejava
themeEclipse
titleEventStream
/**
* EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job, 
* advancing * time for windowing functions  
*/
 
public class EventStream<T> {
  public static abstract class Builder<T> {
    public abstract Builder addElement();
    public abstract Builder addException();
    public abstract Builder advanceTimeTo(long time);
    public abstract EventStream<T> build(); 
 }

 public static <T> Builder<T> builder() {...}
}

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(myTask, config, mode.SINGLE_CONTAINER)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty("test-samza.output"))
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.output").contains({10,30,20,40});
Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}
 

// Initialize and run the test framework
TestTask
    .create(new AsyncRestTask(), config, Mode.SINGLE_CONTAINER)
	.setTaskCallBackTimeoutMS(200)
	.setTaskMaxConcurrency(4)
	.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
	.addOutputStream(CollectionStream.empty("test-samza.Output"))
	.run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40});
Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
/**
* Simple Test case using a collection as an input for a High level application
*/
 
// Initialize and run the test framework
TestApplication
	.create(config, mode.SINGLE_CONTAINER);
    .getInputStream(CollectionStream.of("test-samza.input", {1,2,3}))
    .map(s -> "processed " + s)
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.input").contains(Arrays.asList("processed 1", "processed 2", "processed 4"));

...