Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • In Memory Data Stream
    Users have ability to produce to & consume from in memory system partitions after SEP-8. In Memory Data system alleviates the need of serialization/deserialization of data since it does not persist data.
    We take advantage of this and provide very succinct Stream classes to serve as input data sources, which are the following:
    • Collection Stream
      Users can plug a collection (either List or Map) to create an in-memory input stream
      e.g:  CollectionStream.of(...,{1,2,3,4})
    • File Stream
      Users can create an in memory input stream reading from a local file
      e.g:  FileStream.of("/path/to/file")
    • Event Builder Stream
      Event builder helps a user to mimic runtime samza processing environment in its tests, for example adding an exception in the steam, advancing time for window functions
               
  • Local Kafka Stream
    Users can also consume bounded streams from a kafka topic which serves as initial input. Samza already provide api's to consume from and produce to kafka. For kafka streams we will need serde configurations

...

Samza provides complete flexibility in usage of different data types for input steams, this framework will also provide complete flexibility for usage of primitive and derived data types. Th

...


 

Public Interfaces

 

Code Block
languagejava
themeEclipse
titleSampleTest
linenumberstrue
collapsetrue
/**
* Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with minimal(eg. here none) configs and it reads an input stream of integers and multiplies 
* each integer with 10
*/
 
// Create a StreamTask
StreamTask task = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(systemName: "test-samza", new StreamTask(), config)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty(streamName: "output"))
    .run();
 
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40});

 


Implementation and Test Plan

...