You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Status

Current state: [ UNDER DISCUSSION ]

Discussion thread<link to mailing list DISCUSS thread>

JIRA Unable to render Jira issues macro, execution error.

Released: 

Problem:

Samza today provides various api's like high level (fluent) for chaining complex jobs, low level (task) with configuration like Samza-Yarn, Samza as a library (Samza standalone) and in coming future Samza will support In-memory system for in memory streams that a user can produce/conusme from. Still Samza users have to get their hands dirty with too much configuration details and dependency on external system(like kafka) to write integration tests. In order to alleviate this we intend to provide a fluent test api to users. This will help in setting-up quick tests against their system without depending on underlying incoming stream e.g Kafka Topic. The goal of the proposed SEP we will provide a standardized framework to set-up continuous integration test for High Level Api, Low Level Api with Single Container configuration & Multi Container Configuration (Samza standalone using Zookeeper) with ease. Users won’t have to worry about the underlying Samza config details or even understand implementation details. This abstraction will increase developer productivity and help client's make their Samza system robust.

Motivation

Addition of this Continuous Integration Test Framework will alleviate:

  1. Lack of a standardized way to set up integration tests for users

  2. Lack of brevity in code and much detailed understanding low level config details of the Samza Api to set up samza systems

  3. Lack of a pluggable Test System with any Samza single & multi-container configuration

  4. Set of robust tests for current Samza api

Assumptions 

  1. System depends on In Memory system to spin up in-memory system consumers and producers

  2. Single Container system runs on SingleContainerGrouperFactory, that means these test are supposed to be ran on a single container environment

  3. Multi Container system leverages Samza as a library (Samza standalone) using Zookeeper as coordination service

  4. In order to provide a testing api with least config management, basic configs are set up for users, but flexibility to add/override some custom config is also provided

Proposed Changes

We propose a three step process for configuration of integration test for your samza job. Three facets are data injection, transformation and validation. Injection phase means configuring the input source for Samza job. Transformation phase means api logic of samza job(low-level/high-level).  Validation phase asserts the expected results to the actual results computed after running a job.

Data Injection:

The initial source of data stream is always supposed to be bounded since this is a test, and that may originate as one of the following sources

  • In Memory Data Stream
    Users have ability to produce to & consume from in memory system partitions after SEP-8. In Memory Data system alleviates the need of serialization/deserialization of data since it does not persist data. We take advantage of this and provide very succinct Stream classes to serve as input data sources, which are the following:
    • Collection Stream
      Users can plug a collection (either List or Map) to create an in-memory input stream
      e.g:  CollectionStream.of(...,{1,2,3,4})
    • Event Builder Stream
      Event builder helps a user to mimic runtime samza processing environment in its tests, for example adding an exception in the steam, advancing time for window functions
  • File Stream
    Users can create an input stream reading and writing from local file
    e.g:  FileStream.of("/path/to/file")
             
  • Local Kafka Stream
    Users can also consume bounded streams from a kafka topic which serves as initial input. Samza already provide api's to consume from and produce to kafka. For kafka streams we will need serde configurations

Data Transformation:

Framework supports Samza job application logic, either using Low level api (StreamTask and AsyncStream Task) & High level api. This is done in the same way user writes their Samza job, but instead of writing verbose configs, they just need to pass class instance implementing this logic to the api.  

Data Validation: 

For the low level api once user runs the job, users can assert data from any intermediate streams the job produced to or the final stream that contains the output. Whereas Samza fluent api does job chaining hence only the final expected output can be compared in this case.   

Data Types & Partitions:

Samza provides complete flexibility in usage of different data types for input steams, this framework will also provide complete flexibility for usage of primitive and derived data types. Test framework will provide api's for initialization of input streams (data injection), read from/write to single partition and multi-partition bounded streams (data transformation) and verification of expected to actual results (data validation)

Flexible Config

Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode. It also provides apis to configure concurrency semantics for the job. 

Public Interfaces

Two Apis for writing tests are: Low Level Test Api (TestTask) & High Level Api (TestApplication)

 

TestTask
public class TestTask {
  private TestTask(String systemName, StreamTask task, HashMap<String, String> config) {...}
  private TestTask(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
  
  // Static Factory to config & create runner for low level api 
  public static TestTask create(String systemName, StreamTask task, HashMap<String, String> config) {...}
  public static TestTask create(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
  
  // Multithreading configs for users
  public TestTask setTaskMaxConcurrency(Integer value) {...}
  public TestTask setTaskCallBackTimeoutMS(Integer value) {...}
  public TestTask setTaskMaxConcurrency(Integer value) {...}
 
  // Configure an input stream for samza system, that task can consume from
  public TestTask addInputStream(CollectionStream stream) {...}
  
  // Configures an output stream for samza system, that task can producer to
  public TestTask addOutputStream(CollectionStream stream) {...}
  
  // Run the app
  public void run() {...}
}

 
FileStream
public class TestApplication<T> {
  
  // Configure an input stream for samza system, that app can use high order functions on
  public <T> MessageStream<T> getInputStream(EventStream<T> stream) {return null;}
  public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {return null;}
  public <T> MessageStream<T> getInputStream(FileStream<T> stream) {return null;}
  
  // Run the app
  public void run() { };
}

Types of Input Streams

 

CollectionStream
/**
* CollectionStream provides utilities to build a in memory input stream of collections(list, map). It also supports initialization of 
* partitions  
*/
 
public class CollectionStream<T> {
  private CollectionStream(String systemStream, List<T> collection, Integer partitionCount) {...}
  private CollectionStream(String systemStream) {...}
  
  public static <T> CollectionStream<T> empty(String systemStream) {
    return new CollectionStream<>(systemStream);
  }
  
  public static <T> CollectionStream<T> of(String systemStream, List<T> collection){
    return new CollectionStream<>(systemStream, collection, 1);
  }
  
  public static <T> CollectionStream<T> ofPartitions(String systemStream, List<List<T>> collection){
    return new CollectionStream<>(systemStream, collection, collection.size());
  }
  
  public static <K, V> CollectionStream<KV<K, V>> of(String systemStream, Map<K, V> elems) {...}
}
EventStream
public class EventStream<T> {
  public static abstract class Builder<T> {
    public abstract Builder addElement();
    public abstract Builder addException();
    public abstract Builder advanceTimeTo(long time);
    public abstract EventStream<T> build(); 
 }

 public static <T> Builder<T> builder() {...}
}
FileStream
public class FileStream<T> {
  public static <T> FileStream<T> of(String fileUri) {...}
}


A Sample Test:


Simple StreamTask Test
/**
* Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies 
* each integer with 10
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(systemName: "test-samza", myTask, config)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty(streamName: "output"))
    .run();
 
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40});

 


Implementation and Test Plan


Compatibility, Deprecation, and Migration Plan

 

Rejected Alternatives

 

  • No labels