Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This guide describes how to implement custom data streamer and describes basic concepts and implementation details.

It is useful to take a look at:

  • SocketStreamer - reference implementation of custom data streamer as TCP socket server.
  • WordsSocketStreamerServer and WordsSocketStreamerClient - example of SocketStreamer usage.

Basic Concepts

The purpose of streamers is data ingesting from various sources (e.g. TCP socket or Kafka) and streaming them into Ignite cache for further processing. The Ignite provides IgniteDataStreamer API that allows stream data into cache and convenient StreamAdapter that wraps data streamer instance and provides basis for custom streamer implementation.

...

Code Block
languagejava
public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> {

	 @Override public Map.Entry<String, Integer> extract(String msg) {
        String[] tokens = msg.split("=");

 		return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1]));
    }
}

 

 

Streamer Lifecycle

Streamer Life Cycle

Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer. 

Implementation Tips