Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> {

	 @Override public Map.Entry<String, Integer> extract(String msg) {
        String[] tokens = msg.split("=");

 		return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1]));
    }
}

Ignite provides SocketStreamer - reference custom data streamer implementation. SocketStreamer is TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter and streams it into cache.

The default SocketMessageConverter implementation uses standard Java serialization/deserialization mechanism assuming that data stream contains serialized Java objects.

SocketStreamer supports two communication protocols:

  • message size based protocol (default) where each message in the stream is prepended by 4-byte integer header containing message size;
  • message delimiter based protocol where each message in the stream is appended with user defined delimiter (see SocketStreamer.setDelimiter(byte[] delim) method).

Streamer Life Cycle

Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.

...