...
Code Block | ||
---|---|---|
| ||
public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> { @Override public Map.Entry<String, Integer> extract(String msg) { String[] tokens = msg.split("="); return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1])); } } |
Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter
doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.
So far as data streamer requires IgniteDataStreamer
instance, Ignite node with a cache should be started and IgniteDataStreamer
should be obtained. Note that StreamAdapter
doesn't manage by IgniteDataStreamer
life cycle.
The following code snippet demonstrates correct
Ignite provides SocketStreamer
- reference custom data streamer implementation. SocketStreamer
is NIO-based TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter
and streams it into cache.
The default SocketMessageConverter
implementation uses standard Java serialization/deserialization mechanism assuming that data stream contains serialized Java objects.
...
SocketStreamer.setDelimiter(byte[] delim)
method)....
Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter
doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.
See SocketStreamer
implementation with SocketStreamer
implementation has two life cycle management start
and stop
methods that initialize and shutdown TCP socket server respectively.
The following code snippet demonstrates