Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> {

	 @Override public Map.Entry<String, Integer> extract(String msg) {
        String[] tokens = msg.split("=");

 		return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1]));
    }
}

Streamer Life Cycle

Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.

So far as data streamer requires IgniteDataStreamer instance, Ignite node with a cache should be started and IgniteDataStreamer should be obtained. Note that StreamAdapter doesn't manage by IgniteDataStreamer life cycle.

The following code snippet demonstrates correct

Reference Implementation

Ignite provides SocketStreamer - reference custom data streamer implementation. SocketStreamer is NIO-based TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter and streams it into cache.

The default SocketMessageConverter implementation uses standard Java serialization/deserialization mechanism assuming that data stream contains serialized Java objects.

...

  • message size based protocol (default) where each message in the stream is prepended by 4-byte integer header containing message size;
  • message delimiter based protocol where each message in the stream is appended with user defined delimiter (see SocketStreamer.setDelimiter(byte[] delim) method).

Reference Implementation

...

Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.

See SocketStreamer implementation with SocketStreamer implementation has two life cycle management start and stop methods that initialize and shutdown TCP socket server respectively.

The following code snippet demonstrates

 

Implementation Tips