Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

This guide describes how to implement custom data streamer and describes , basic concepts and implementation details.

...

Code Block
languagejava
// Start Ignite node with default configuration.

Ignite ignite = Ignition.start();

// Create and start cache with default configuration and name "wordsCache".

IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");

// Get data streamer reference for cache with name "wordsCache".

IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");

// Create custom data streamer implementation.

CustomStreamer customStmr = new CustomStreamer(stmr, new WordCountTupleExtractor());

// Start custom data streamer if it provides corresponding methods.
customStmr.start();

The following code snippet demonstrates correct data streamer shutdown assuming that there is CustomStreamer implementation with stop method:

Code Block
languagejava
// Stop custom data streamer. NOTE: IgniteDataStreamer instance and Ignite node are still in running state.
customStmr.start.stop();

// Stop data streamer and release all associated resources.
stmr.close();

// Stop Ignite node.
ignite.close();

Reference Implementation

Ignite provides SocketStreamer - reference custom data streamer implementation. SocketStreamer is NIO-based TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter and streams it into cache.

...

SocketStreamer implementation has two life cycle management start and stop methods that initialize and shutdown TCP socket server respectively.

The following code snippet demonstrates

 

...

example demonstrates how SocketStreamer can be used:

Code Block
languagejava
// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();

// Create cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");

// Get data streamer reference for cache "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");

// Configure socket streamer.
SocketStreamer<String, String, Integer> sockStmr = new SocketStreamer<>();

// Set socket server address.
sockStmr.setAddr(InetAddress.getLocalHost());

// Set socket server port.
sockStmr.setPort(5555);

// Use message delimiter based protocol where delimiter is zero byte.
sockStmr.setDelimiter(new byte[] {0});

// Set ignite instance
sockStmr.setIgnite(ignite);

// Set data streamer instance
sockStmr.setStreamer(stmr);

// Converter from zero-terminated string to Java string.
sockStmr.setConverter(new SocketMessageConverter<String>() {
    @Override public String convert(byte[] msg) {
        return new String(msg);
    }
});

// Set tuple extractor (see above).
sockStmr.setTupleExtractor(new WordCountTupleExtractor());

// Start socket streamer.
sockStmr.start();

Now socket streamer is ready to receive data. Client for this socket streamer can be implemented as follows:

Code Block
languagejava
byte[] delim = new byte[] {0};

try (
    // Connect to server
    Socket sock = new Socket(InetAddress.getLocalHost(), 5555);
    OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
) {
    for (String word : words) {
		String item = word + "=1";

		byte[] arr = item.getBytes();

		// Write message.
		oos.write(arr);

		// Write delimiter.
		oos.write(delim);
    }
}

Implementation Steps

The following steps should be performed in order to implement custom data streamer:

  1. Chose model that data streamer will be implement (server or client).
  2. Define new data streamer class that extends StreamAdapter.
  3. Provide API for data streamer configuration and implement parameters validation if needed. Hosts, ports, other data source specific parameters.
  4. Implement life cycle management methods if needed. Usually developer will want to have control over data source resources such as JMS sessions, HTTP/socket connections, Kafka consumers and so on.
  5. Implement logic of data consuming from data source. For example: JMS message processing in corresponding listener, accepting client connections and reading data from it, consuming messages from Kafka topics and so on.
  6. Implement data conversion from data source specific format to Java object. For example: JMS Message instances conversion, conversion of bytes received from socket, conversion of bytes consumed from Kafka topics and so on.
  7. Provide abstraction and API for user defined data converters. For example: JMS defines several message types (TextMessage, MapMessage), socket servers can use different protocols and so on.