...
This guide describes how to implement custom data streamer and describes basic concepts and implementation details.
It is useful to take a look at:
SocketStreamer
- reference implementation of custom data streamer as TCP socket server.WordsSocketStreamerServer
and WordsSocketStreamerClient
- example of SocketStreamer
usage.The purpose of streamers is data ingesting from various sources (e.g. TCP socket or Kafka) and streaming them into Ignite cache for further processing. The Ignite provides IgniteDataStreamer
API that allows stream data into cache and convenient StreamAdapter
that wraps data streamer instance and provides basis for custom streamer implementation.
...
Code Block | ||
---|---|---|
| ||
public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> { @Override public Map.Entry<String, Integer> extract(String msg) { String[] tokens = msg.split("="); return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1])); } } |
Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter
doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.