Table of Contents |
---|
This guide describes how to implement custom data streamer and describes , basic concepts and implementation details.
It is useful to take a look at:
SocketStreamer
- reference implementation of custom data streamer as TCP socket server.WordsSocketStreamerServer
and WordsSocketStreamerClient
- example of SocketStreamer
usage.The purpose of streamers is data ingesting from various sources (e.g. TCP socket or Kafka) and streaming them into Ignite cache for further processing. The Ignite provides IgniteDataStreamer
API that allows stream data into cache and convenient StreamAdapter
that wraps data streamer instance and provides basis for custom streamer implementation.
...
String
);Data From connection initiating stand point data streamer can be implemented as a server or a client with respect to the of data source and requirmentsrequirements. For example, HTTP data streamer that implemented as a client can request the data from external web services. On the other hand, HTTP data streamer that implemented as a server can process a large number of request from internal micro servicesexternal HTTP clients.
In order to implement custom data streamer a developer should optionally extend StreamAdapter
class, add functionality related with particular data source and streamer life cycle logic if needed. As mentioned above
wraps StreamAdapter
IgniteDataStreamer
instance and also needs StreamTupleExtractor
instance, that could be provided via constructor or corresponding setters.
...
Code Block | ||
---|---|---|
| ||
public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> { @Override public Map.Entry<String, Integer> extract(String msg) { String[] tokens = msg.split("="); return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1])); } } |
Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter
doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.
So far as data streamer requires IgniteDataStreamer
instance, Ignite node with a cache should be started and IgniteDataStreamer
should be obtained. Note that StreamAdapter
doesn't manage by IgniteDataStreamer
life cycle.
The following code snippet demonstrates correct data streamer initialization assuming that there is CustomStreamer
implementation with start
method:
Code Block | ||
---|---|---|
| ||
// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();
// Create and start cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");
// Get data streamer reference for cache with name "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");
// Create custom data streamer implementation.
CustomStreamer customStmr = new CustomStreamer(stmr, new WordCountTupleExtractor());
// Start custom data streamer if it provides corresponding methods.
customStmr.start();
|
The following code snippet demonstrates correct data streamer shutdown assuming that there is CustomStreamer
implementation with stop
method:
Code Block | ||
---|---|---|
| ||
// Stop custom data streamer. NOTE: IgniteDataStreamer instance and Ignite node are still in running state.
customStmr.stop();
// Stop data streamer and release all associated resources.
stmr.close();
// Stop Ignite node.
ignite.close(); |
Ignite provides SocketStreamer
- reference custom data streamer implementation.
...
SocketStreamer
is NIO-based TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter
and streams it into cache.
The default SocketMessageConverter
implementation uses standard Java serialization/deserialization mechanism assuming that data stream contains serialized Java objects.
SocketStreamer
supports two communication protocols:
SocketStreamer.setDelimiter(byte[] delim)
method).SocketStreamer
implementation has two life cycle management start
and stop
methods that initialize and shutdown TCP socket server respectively.
The following example demonstrates how SocketStreamer
can be used:
Code Block | ||
---|---|---|
| ||
// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();
// Create cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");
// Get data streamer reference for cache "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");
// Configure socket streamer.
SocketStreamer<String, String, Integer> sockStmr = new SocketStreamer<>();
// Set socket server address.
sockStmr.setAddr(InetAddress.getLocalHost());
// Set socket server port.
sockStmr.setPort(5555);
// Use message delimiter based protocol where delimiter is zero byte.
sockStmr.setDelimiter(new byte[] {0});
// Set ignite instance
sockStmr.setIgnite(ignite);
// Set data streamer instance
sockStmr.setStreamer(stmr);
// Converter from zero-terminated string to Java string.
sockStmr.setConverter(new SocketMessageConverter<String>() {
@Override public String convert(byte[] msg) {
return new String(msg);
}
});
// Set tuple extractor (see above).
sockStmr.setTupleExtractor(new WordCountTupleExtractor());
// Start socket streamer.
sockStmr.start(); |
Now socket streamer is ready to receive data. Client for this socket streamer can be implemented as follows:
Code Block | ||
---|---|---|
| ||
byte[] delim = new byte[] {0};
try (
// Connect to server
Socket sock = new Socket(InetAddress.getLocalHost(), 5555);
OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
) {
for (String word : words) {
String item = word + "=1";
byte[] arr = item.getBytes();
// Write message.
oos.write(arr);
// Write delimiter.
oos.write(delim);
}
} |
The following steps should be performed in order to implement custom data streamer:
StreamAdapter
.