Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

This guide describes how to implement custom data streamer and describes , basic concepts and implementation details.

...

Code Block
languagejava
// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();

// Create cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");

// Get data streamer reference for cache "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");

// Configure socket streamer.
SocketStreamer<String, String, Integer> sockStmr = new SocketStreamer<>();

// Set socket server address.
sockStmr.setAddr(InetAddress.getLocalHost());

// Set socket server port.
sockStmr.setPort(5555);

// Use message delimiter based protocol where delimiter is zero byte.
sockStmr.setDelimiter(new byte[] {0});

// Set ignite instance
sockStmr.setIgnite(ignite);

// Set data streamer instance
sockStmr.setStreamer(stmr);

// Converter from zero-terminated string to Java string.
sockStmr.setConverter(new SocketMessageConverter<String>() {
    @Override public String convert(byte[] msg) {
        return new String(msg);
    }
});

// Set tuple extractor (see above).
sockStmr.setTupleExtractor(new WordsCountTupleExtractorWordCountTupleExtractor());

// Start socket streamer.
sockStmr.start();

...

The following steps should be done performed in order to implement custom data streamer:

  1. Chose model that data streamer will be implement (server or client).
  2. Define new data streamer class that extends StreamAdapter.
  3. Provide API for data streamer configuration and implement parameters validation if needed. Hosts, ports, other data source specific parameters.
  4. Implement life cycle management methods if needed. Usually developer will want to have control over data source resources such as JMS sessions, HTTP/socket connections, Kafka consumers and so on.
  5. Implement logic of data consuming from data source. For example: JMS message process JMS message ing in corresponding listener, accept accepting client connections and read reading data from it, consume consuming messages from Kafka topics and so on.
  6. Implement data conversion from data source specific format to Java object. For example: convert JMS Message instances conversion, convert conversion of bytes received from socket, convert conversion of bytes consumed from Kafka topics and so on.
  7. Provide abstraction and API for user defined data converters. For example: JMS defines several message types (TextMessage, MapMessage), socket servers can use different protocols and so on.

...