You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Overview

This guide describes how to implement custom data streamer and describes basic concepts and implementation details.

Basic Concepts

The purpose of streamers is data ingesting from various sources (e.g. TCP socket or Kafka) and streaming them into Ignite cache for further processing. The Ignite provides IgniteDataStreamer API that allows stream data into cache and convenient StreamAdapter that wraps data streamer instance and provides basis for custom streamer implementation.

Data stream consists of messages that can be represented as Java objects of some type that should be converted to cache entries (key-value pairs or tuples). StreamTupleExtractor is responsible for such conversion.

The following parts of custom data streamer are missing and should be implemented by developer:

  • conversion of stream specific messages to Java object (e.g. byte array to String);
  • stream wrapper that encapsulates functionality related with particular data source.

Data streamer can be implemented as server or client with respect to the data source and requirments. For example HTTP data streamer that implemented as client can request the data from external web services. On the other hand, HTTP data streamer that implemented as server can process a large number of request from internal micro services.

In order to implement custom data streamer a developer should optionally extend StreamAdapter class, add functionality related with particular data source and streamer life cycle logic if needed. As mentioned above StreamAdapter wraps IgniteDataStreamer instance and also needs StreamTupleExtractor instance, that could be provided via constructor or corresponding setters.

The central StreamAdapater's method is addMessage(T msg) that just converts message to cache entry using StreamTupleExtractor and streams entry into cache. This method doesn't block current thread due to the nature of IgniteDataStreamer.

StreamTupleExtractor interface expose the extract(T msg) method that returns a cache entry as Map.Entry<K, V> instance. For example, if message represents a string with two values that separated by '=' symbol where left part is a word and right one - integer number, then extractor could be implemented as:

 

public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> {

    @Override public Map.Entry<String, Integer> extract(String msg) {

        String[] tokens = msg.split("=");

        return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1]));

    }

}

 

 

Streamer Lifecycle

 

Implementation Tips

 

  • No labels