Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION  ACCEPTED ]

Discussion thread<link to mailing list DISCUSS thread>

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1629

Released: 

Problem:

Samza today provides various api's like high level (fluent/application) for chaining complex jobs, low level (task) with configuration like Samza-Yarn, Samza as a library (Samza standalone) and in coming future Samza will support In-memory system for in memory streams that a user can produce/conusme from. Still Samza users have to get their hands dirty with too much configuration details and dependency on external system(like kafka) to write integration tests. In order to alleviate this we intend to provide a fluent test api to users. This will help in setting-up quick tests against their system without depending on underlying incoming stream from another system e.g Kafka Topic.

The goal of the proposed SEP we will provide a standardized framework to set-up continuous integration test for High Level Api, Low Level Api with Single Container configuration & Multi Container Configuration (Samza standalone using Zookeeper) with ease. Users won’t have to worry about the underlying Samza config details or even understand implementation details. This abstraction will increase developer productivity and help client's make their Samza system robust.

...

  • Samza Job Owners: To make their Samza jobs robust
  • Samza Dev Teams: To enable dev teams make their samza infra robust

Motivation

Addition of this Continuous Integration Test Framework will alleviate:

This SEP targets two phase development of the Framework:

  • PHASE I: Testing Samza's public apis targeting Samza job Owners
  • PHASE II: Testing various component of Samza targeting testing  Samza Dev team

Terminologies

  • p1: refers to apis targeted for development for Phase I
  • p2: refers to apis targeted for development for Phase II

Motivation

Addition of this Integration Test Framework will alleviate:

  1. Lack of a standardized way to set up integration tests for users

  2. Lack of brevity in code and

  3. Lack of a standardized way to set up integration tests for users

  4. Lack of brevity in code and much detailed understanding of low level configs to set up samza systems

  5. Lack of a pluggable Test System with any Samza configuration (single & multi-container)

  6. Comprehensive set of tests for Samza api

Assumptions 

 

  1. System depends on In Memory system to spin up in-memory system consumers and producers for in memory data streams

  2. Single Container mode runs on SingleContainerGrouperFactory

  3. Multi Container system leverages Samza as a library (Samza standalone) using Zookeeper as coordination service for setting up test

  4. In order to provide a testing api with least config management, basic configs are set up for users, but flexibility to add/override some custom config is provided

  5. Testing is always supposed to be done using bounded streams using EndOfStreamMessage

  6. System should directly consume from File stream or Local Kafka stream without any additional configs, File Stream implementation is out of scope for this SEP, although this SEP suggests what the design for file stream should look like

  7. Bounded message streams from Kafka (either using Latch or EndOfStreamMessage) is beyond the scope of this SEP

  8. For Stateful testing system may use RocksDb as it ships with Samza or maintain the state in memory

Design

We propose a three step process for configuration of integration test for your samza job. Three facets are data injection, transformation and validation.

...

Gliffy Diagram
nameFlow New
pagePin6

Data Injection:

In the context of this Test framework an input stream means a stream that Samza job may consume from and an output stream means a stream that Samza job can produce to. The initial source of data stream is always supposed to be bounded since this is a test, and that may originate as one of the following sources listed below.

  • In Memory Data Stream
    Users have ability to produce to & consume from in memory system partitions after SEP-8. In Memory Data system alleviates the need of serialization/deserialization of data since it does not persist data. We take advantage of this and provide very succinct Stream classes to serve as input data sources, which are the following:
    • Collection Stream (p1)
      Users can plug a collection (either List or Map) to create an in-memory input stream
      e.g:  CollectionStream.of(...,{1,2,3,4})
    • Event Builder Stream (p2)
      Event builder helps a user to mimic runtime samza processing environment in its tests, for example adding an exception in the steam, advancing time for window functions.
  • File Stream
    Users can create an input stream reading and writing from local file
    e.g:  FileStream.of("/path/to/file")
  • Local Kafka Stream
    Users can also consume bounded streams from a kafka topic which serves as initial input. Samza already provide api's to consume from and produce to kafka. For kafka streams we will need serde configurations, however implementation of bounded streams in kafka (using Latche Message or EndOfStreamMessage) is beyond scope of this SEP

For in memory streams the api actually initializes the stream by spinning up a Samza producer using an InMemorySystemProducer to write the stream, this is how a collection of data or events gets initialized as a steam. It also configures any output stream if the user has added any.

Data Transformation:

This is the Samza job you write, this can be either done using Low Level Api or the fluent High Level Api. Test frameworks provides api to set up test for both the cases. Test framework supports both the apis with Single container and Multi-container mode. Users implement StreamTask and /Async Stream task in /Stream Application in the same way, as they do for their Samza job . For high level api users don't need a class implementing StreamApplication, they just configure message streams of any type and apply operators on it directly (see the sample example below(or pass an instance of Samza job).

Data Validation: 

For the low level api & high level api once user runs the job, users can assert data from any streams the job uses. Whereas Samza fluent api does job chaining hence only the final expected output can be compared in this case. StreamAssert spins up a consumer produces to. StreamAssert spins up a consumer for the system under test, gets the messages in the stream and compares this result with the expected value. Various assertion functions provided are:

 

StreamAssertStateAssert
containscontains
containsInAnyOrdersatisfies
inWindowPane 
satifies 

 

Data Types & Partitions:

The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Serdes will be also required if users want to maintain State. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams 

...

Test framework will support stateful and stateless testing, satateful testing can be either done using RocksDB or maintaining state in memory

Future Changes with Stream Descriptors:

The test framework is designed in a way which asks users to do none or minimal Samza configs, in future we intend to use StreamDescriptors in the test framework to do Samza configs. This change would cause a small change in the way user passes their custom configs (if any) to the test framework 

Public Interfaces

Two Apis for writing tests are: Low Level Test Api (TestTask) & High Level Api (TestApplication)

 

With StreamDescriptor and High Level api, another wrapper can be provided over the job to run the configuration in a test environment

Test Matrix (Plan)

Shown below is a targeted test matrix plan to test various components of Samza in p2.

nnumber of threads

x = number of partitions 

m = number of containers

Samza APIConcurrencyPartitionsContainerExpected Result
StreamTask
task.max.concurrency = 1
job.container.thread.pool.size = n
1 <= p <= x1 / min order processing
   
task.max.concurrency > 1
job.container.thread.pool.size = n
1 <= p <= x1 / mout of order processing
   
AsyncStreamTask
task.max.concurrency = 11 <= p <= x1 / min order processing
task.max.concurrency = n1 <= p <= x1 / mout of order processing
Windowable TaskN/A1 <= p <= x1 / mexpecting processing n messages for messages window of time t
Initiable TaskN/A1 <= p <= x1 / mstateful testing (assertions on kv store)
Closable TaskN/A1 <= p <= x1 / mverify closing the client

Map / Flatmap / Filter /

Partition By / Merge /

SendTo / Sink / Join /

Window

 

task.max.concurrency = 1

job.container.thread.pool.size = n

1 <= p <= x 1 / min order processing

task.max.concurrency > 1

job.container.thread.pool.size = n

out of order processing


Metadata StreamsTests
Changelog StreamAssertions on Changelog stream in Stateful tests
Checkpoint StreamAsserts on Checkpoint Stream (Kafka or InMemory)
Coordinator StreamAsserts on Coordinator stream (Kafka or InMemory)


Public Interfaces

 

Code Block
languagejava
themeEclipse
titleTestRunner
Code Block
languagejava
themeEclipse
titleTestTask
/**
* TestTaskTestRunner provides static factory for quick setup of Samza environment for testing lowLow level api and High level api , users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and /AsyncStreamTask/StreamApplication
*/ 

public class TestTaskTestRunner {
  
  // Static Factory to config & create runner for low level api 
  public static TestTaskTestRunner createof(StreamTaskClass tasktaskClass) {...}
  public// staticStatic TestTaskFactory create(AsyncStreamTask task) {...}
  
  // Add/Ovveride any custom configsto config & create runner for high level api
  public static TestTaskTestRunner addOverrideConfigsof(Map<String,String>StreamApplication configapp) {...}
   
  // SetAdd/Ovveride containerany mode either single container or multi containercustom configs
  public TestTaskTestRunner setContainerModeaddConfigs(ModeConfig modeconfigs) {...}
  //public MultithreadingTestRunner configs for users
  public TestTask setTaskMaxConcurrency(Integer value) addOverrideConfig(String key, String val){...}
 
  public// TestTaskConfigure setTaskCallBackTimeoutMS(Integer value) {...}state for application
  public TestTaskTestRunner setTaskMaxConcurrencyaddState(IntegerString valuestoreName) {...}  
   
  // Configure statean input stream for applicationsamza system, that job can consume from
  public TestTaskTestRunner addStateaddInputStream(StringCollectionStream storeNamestream) {...}  
  // ConfigureConfigures an inputoutput stream for samza system, that taskjob can consumeproducer fromto
  public TestTaskTestRunner addInputStreamaddOutputStream(CollectionStream stream) {...}
   
  // ConfiguresConsume anmessages outputfrom streama forStream
 samza system,public thatstatic task<T> canMap<Integer, producer to
  public TestTask addOutputStreamList<T>> consumeStream(CollectionStream stream, Integer timeout) {...}
  
  // Run the appTestRunner
  public void run() {...}
}



 

 

Types of Input Streams

 

CollectionStream:

Code Block
languagejava
themeEclipse
titleTestApplicationCollectionStream
/**
* TestApplicationCollectionStream providesacts statica factorydescriptor forthat quickcan setupbe ofused Samzato environmentbuild foran testingin High Level Api, users can configure memory input  
* streams and then apply various operators on streams and run the applicationstream (single/multiple partition) of java collections
*/
 
public class TestApplication<T>CollectionStream<T> {  
  // StaticCreate factoryan toempty configstream &with createsingle runnerpartition forthat Higha level api 
  public static TestApplication create(Map<String, String> config) {...}

  // Add/Ovveride any custom configsSamza job can produce to
  public TestApplication addOverrideConfigs(Map<String,String> configstatic <T> CollectionStream<T> empty(String systemName, String streamName) {...}
  
  // SetCreate containeran modeempty eitherstream singlewith containermultiple orpartitions multi container
  public TestApplication setContainerMode(Mode mode) {...}

  // Multithreading configs for usersthat a Samza job can produce to
  public static <T> TestApplicationCollectionStream<T> setTaskMaxConcurrency(empty(String systemName, String streamName, Integer valuepartitionCount) {...}
  public
 TestApplication setTaskCallBackTimeoutMS(Integer value) {...}
  public TestApplication setTaskMaxConcurrency(Integer value) {...}  
  
  // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on
  public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...}
  public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...}
  public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...}
  
  // Run the app
  public void run() {...};
}

Types of Input Streams

 

// Create a stream of messages from input list with single partition
  public static <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> collection){...}
  
  // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId
  public static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer,? extends Iterable<T>> partitions){...}
 
}


Code Block
languagejava
themeEclipse
titleCollectionStreamCollectionStreamSystem
/**
* CollectionStreamCollectionStreamSystem provides utilities to create buildand initialize an in memory input stream of collections(list, map). It also supports initialization of 
* multiple partitions for an input stream 
*/
 
public class CollectionStream<T> { 
  // Create an empty stream that a Samza task can produce to.
*/
 
public class CollectionStreamSystem { 

 // Create a CollectionStreamSystem 
 public static CollectionStreamSystem create(String name) {...}  
 
}

 

FileStream:

Code Block
languagejava
themeEclipse
titleFileStream
/**
* FileStream provides utilities to build a stream of messages from a file on disk
*/
 
public class FileStream<T> {
  public static <T> CollectionStream<T>FileStream<T> emptyof(String steamIdfileUri) {...}
}
   
public class // Create a stream of messages from input list with single partitionFileStreamSystem {
  public static <T>FileStreamSystem CollectionStream<T> ofcreate(String steamId, Iterable<T> collectionname) {...}
}


Examples Usages of Test Api:


Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
* Simple 
Test case // Createusing a streamcollection ofas messages froman input listfor witha multiple partition, key of partitions map is partitionId
  public static <T> CollectionStream<T> of(String steamId, Map<Integer,Iterable<T>> partitions){...}
  
}
Code Block
languagejava
themeEclipse
titleEventStream
/**
* EventStream provides utilities to build an in memory low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of events.integers Itand helpsmultiplies mimiceach runinteger time environment of your job, 
* advancing time for windowing functions  
*/
 
public class EventStream<T> {with 10
*/
 
// Create a StreamTask
MyStreamTestTask myTask = new StreamTask() {
  @Override
  public staticvoid abstract class Builder<T> {
    public abstract Builder addElement();
    public abstract Builder addExceptionprocess(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    public abstract Builder advanceTimeTo(long timecollector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10));
    public abstract EventStream<T> build(); 
 }

 public static <T> Builder<T> builder() {...}
}
Code Block
languagejava
themeEclipse
titleFileStream
/**
* FileStream provides utilities to build a stream of messages from a file on disk
*/
 
public class FileStream<T> {
  public static <T> FileStream<T> of(String fileUri) {...}
}

...

}
};
 

CollectionStream<Integer> input = CollectionStream
	.of("test", "input", {1,2,3,4})

CollectionStream output = CollectionStream
	.empty("test", "output")

TestRunner
    .of(MyStreamTestTask.class)
    .addInputStream(input)
    .addOutputStream(output)
    .run();
 
 
// Assertions on the outputs
Assert.assertThat(TestRunner.consumeStream(output), IsIterableContainingInOrder.contains({10,20,30,40})));

 

 

Code Block
languagejava
themeEclipse
titleSimple StreamTask AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads a collection as an input streamfor ofa integerslow andlevel multipliesapplication eachin integerthe withasync 10mode
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception 
public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client Integerclient;
 obj =private (Integer) envelope.getMessage();WebTarget target; 

  @Override
  collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(myTask)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty("test-samza.output"))
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.output").contains({10,30,20,40});
Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}
 

// Initialize and run the test framework
TestTask
    .create(new AsyncRestTask())
	.setTaskCallBackTimeoutMS(200)
	.setTaskMaxConcurrency(4)
	.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
	.addOutputStream(CollectionStream.empty("test-samza.Output"))
	.run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40});
public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}

CollectionStream<Integer> input = CollectionStream
	.of("test", "input", {1,2,3,4});
CollectionStream output = CollectionStream
	.empty("test", "output");

TestRunner
    .of(MyAsyncStreamTask.class)
    .addInputStream(input)
    .addOutputStream(output)
    .run();
// Assertions on the outputs
StreamAssert.that(output).containsInAnyOrder({10,20,30,40});


Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
/**
* Simple Test case using a collection as an input for a High level application
*/
 
public class MyStreamApplication implements StreamApplication {
  @Override
  public void init(StreamGraph graph, Config config) {
    MessageStream<Integer> pageViews = graph.getInputStream(“test.page-views”);
    pageViews.map(s -> "processed " + s)
             .sendTo(graph.getOutputStream(“test.output”));
 }
}
 
CollectionStream<Integer> input = CollectionStream
	.of("test", "input", {1,2,3,4});

CollectionStream output = CollectionStream
	.empty("test", "output");
Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
/**
* Simple Test case using a collection as an input for a High level application
*/
 
// Initialize and run the test framework
TestApplicationTestRunner
	.create(configof(new MyStreamApplication());
    .getInputStream(CollectionStream.of("test-samza.input", {1,2,3})addInputStream(input)
    .map(s -> "processed " + s)addOutputStream(output)
    .addOverrideConfig("job.default.system", "test") 
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.input"output).contains(Arrays.asList({"processed 1", "processed 2", "processed 4")});

Implementation and Test Plan

  • Introduce the new interface and api for test framework
  • Use the same api to write comprehensive integration test for: 
    • Samza Low level Async Api (Single and Multi-Container Mode)
    • Samza Low level Synchronous Api (Single and Multi-Container Mode)
    • Samza High Level Api (Single and Multi-Container Mode)

Compatibility, Deprecation, and Migration Plan

As this is a new feature, no plans are required for compatibility, deprecation and migration.

Rejected Alternatives