...
This guide describes how to implement custom data streamer and describes , basic concepts and implementation details.
...
Code Block | ||
---|---|---|
| ||
// Stop custom data streamer. NOTE: IgniteDataStreamer instance and Ignite node are still in running state. customStmr.stop(); // Stop data streamer and release all associatesassociated resources. stmr.close(); // Stop Ignite node. ignite.close(); |
...
Ignite provides SocketStreamer
- reference custom data streamer implementation. SocketStreamer
is NIO-based TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter
and streams it into cache.
...
SocketStreamer
implementation has two life cycle management start
and stop
methods that initialize and shutdown TCP socket server respectively.
The following code snippet demonstrates
...
example demonstrates how SocketStreamer
can be used:
Code Block | ||
---|---|---|
| ||
// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();
// Create cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");
// Get data streamer reference for cache "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");
// Configure socket streamer.
SocketStreamer<String, String, Integer> sockStmr = new SocketStreamer<>();
// Set socket server address.
sockStmr.setAddr(InetAddress.getLocalHost());
// Set socket server port.
sockStmr.setPort(5555);
// Use message delimiter based protocol where delimiter is zero byte.
sockStmr.setDelimiter(new byte[] {0});
// Set ignite instance
sockStmr.setIgnite(ignite);
// Set data streamer instance
sockStmr.setStreamer(stmr);
// Converter from zero-terminated string to Java string.
sockStmr.setConverter(new SocketMessageConverter<String>() {
@Override public String convert(byte[] msg) {
return new String(msg);
}
});
// Set tuple extractor (see above).
sockStmr.setTupleExtractor(new WordCountTupleExtractor());
// Start socket streamer.
sockStmr.start(); |
Now socket streamer is ready to receive data. Client for this socket streamer can be implemented as follows:
Code Block | ||
---|---|---|
| ||
byte[] delim = new byte[] {0};
try (
// Connect to server
Socket sock = new Socket(InetAddress.getLocalHost(), 5555);
OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
) {
for (String word : words) {
String item = word + "=1";
byte[] arr = item.getBytes();
// Write message.
oos.write(arr);
// Write delimiter.
oos.write(delim);
}
} |
The following steps should be performed in order to implement custom data streamer:
StreamAdapter
.