...
Code Block | ||
---|---|---|
| ||
public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> { @Override public Map.Entry<String, Integer> extract(String msg) { String[] tokens = msg.split("="); return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1])); } } |
Ignite provides SocketStreamer
- reference custom data streamer implementation. SocketStreamer
is TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter
and streams it into cache.
The default SocketMessageConverter
implementation uses standard Java serialization/deserialization mechanism assuming that data stream contains serialized Java objects.
SocketStreamer
supports two communication protocols:
SocketStreamer.setDelimiter(byte[] delim)
method).Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter
doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.
...