Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();

// Create cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");

// Get data streamer reference for cache "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");

// Configure socket streamer.
SocketStreamer<String, String, Integer> sockStmr = new SocketStreamer<>();

// Set socket server address.
sockStmr.setAddr(InetAddress.getLocalHost());

// Set socket server port.
sockStmr.setPort(5555);

// Use message delimiter based protocol where delimiter is zero byte.
sockStmr.setDelimiter(new byte[] {0});

// Set ignite instance
sockStmr.setIgnite(ignite);

// Set data streamer instance
sockStmr.setStreamer(stmr);

// Converter from zero-terminated string to Java string.
sockStmr.setConverter(new SocketMessageConverter<String>() {
    @Override public String convert(byte[] msg) {
        return new String(msg);
    }
});

// Set tuple extractor (see above).
sockStmr.setTupleExtractor(new WordsCountTupleExtractorWordCountTupleExtractor());

// Start socket streamer.
sockStmr.start();

...