THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
// Start Ignite node with default configuration. Ignite ignite = Ignition.start(); // Create cache with default configuration and name "wordsCache". IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache"); // Get data streamer reference for cache "wordsCache". IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache"); // Configure socket streamer. SocketStreamer<String, String, Integer> sockStmr = new SocketStreamer<>(); // Set socket server address. sockStmr.setAddr(InetAddress.getLocalHost()); // Set socket server port. sockStmr.setPort(5555); // Use message delimiter based protocol where delimiter is zero byte. sockStmr.setDelimiter(new byte[] {0}); // Set ignite instance sockStmr.setIgnite(ignite); // Set data streamer instance sockStmr.setStreamer(stmr); // Converter from zero-terminated string to Java string. sockStmr.setConverter(new SocketMessageConverter<String>() { @Override public String convert(byte[] msg) { return new String(msg); } }); // Set tuple extractor (see above). sockStmr.setTupleExtractor(new WordsCountTupleExtractor()); // Start socket streamer. sockStmr.start(); |
...
Now socket streamer is ready to receive data. Client for this socket streamer can be implemented as follows:
Code Block | ||
---|---|---|
| ||
byte[] delim = new byte[] {0};
try (
// Connect to server
Socket sock = new Socket(InetAddress.getLocalHost(), 5555);
OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
) {
for (String word : words) {
String item = word + "=1";
byte[] arr = item.getBytes();
// Write message.
oos.write(arr);
// Write delimiter.
oos.write(delim);
}
} |