Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();

// Create and start cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");

// Get data streamer reference for cache with name "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");

// Create custom data streamer implementation.
CustomStreamer customStmr = new CustomStreamer(stmr, new WordCountTupleExtractor());

// Start custom data streamer if it provides corresponding methods.
customStmr.start();

The following code snippet demonstrates correct data streamer shutdown assuming that there is CustomStreamer implementation with stop method:

Code Block
languagejava
// Stop custom data streamer. NOTE: IgniteDataStreamer instance and Ignite node are still in running state.
customStmr.stop();

// Stop data streamer and release all associates resources.
stmr.close();

// Stop Ignite node.
ignite.close();

 

Reference Implementation

Ignite provides SocketStreamer - reference custom data streamer implementation. SocketStreamer is NIO-based TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter and streams it into cache.

...