IDIEP-102
Author
Sponsor


Created

 

Status

IN PROGRESS


Motivation

Ignite needs an API to process an incoming stream of external data in a distributed fashion. The processing may or may not involve updating one or more Ignite tables.

Requirements

Functional

Data streamer API accepts a stream of data on the client side, distributes data entries in some way across server nodes, where the processing takes place.

The following two use cases should be supported:

  1. Upsert a stream of data into a specific table
    1. User provides the data and a table name.
    2. Ignite distributes batches of data to server nodes in a partition-aware way and performs upsert.
  2. Modify one or more tables (potentially colocated) with a custom logic from a stream of data
    1. User provides the data, table name, key function, and a receiver function.
    2. Ignite uses key function and table name on the client side to batch the data in a partition-aware way, then sends it to the server nodes according to partition distribution.
    3. Ignite invokes the receiver function on the server nodes with individual items of the source data. The user function is responsible for modifying table data using the table API. The function can return arbitrary results back to the client. In particular, returned results can indicate a failure to process a given item.


Exception Handling

Uncaught exception in receiver aborts the streamer. The exception is sent back to the client and passed to the CompletableFuture returned from streamData API.

To handle non-critical errors while processing individual items, user code can return arbitrary data from the receiver (see p2-c above).

Non Functional

  • Transactional guarantees: the streamer should not bypass any transaction mechanisms. Transaction per batch or transaction per entry could be used.
  • Asynchronous API with backpressure control. Avoid blocking threads and/or excessive resource usage on clients and servers.
  • Ordering guarantees: the streamer must guarantee that two objects added with the same key will be processed in the same order they were added by the client. In other words, the streamer can’t reorder items within a given partition. Receiver should not be invoked for item B if preceding item A is not yet processed.
  • At Least Once delivery guarantee:
    • Automatic retry of failed batches.
    • If a streamer completes without error, all items are guaranteed to be processed.
  • Partition awareness (best effort): receiver should run on a node where the given row is located. However, Ignite 3 does not provide partition pinning, so during topology changes this behavior is not guaranteed.

Description

Receiver Deployment

Receiver class should be deployed on the cluster prior to data streamer usage - see IEP-103: Code Deployment.

Reactive Java API

Java 9 Reactive Streams is a standard API for asynchronous stream processing (initially inspired by Akka Streams). Our API will accept Flow.Publisher from the user and pull the data from there as needed, at the speed of server-side ingestion and processing.


Java API
public interface StreamerTarget<T> {
   /**
    * Streams data into the table.
    *
    * @param publisher Producer.
    * @return Future that will be completed when the stream is finished.
    */
   CompletableFuture<Void> streamData(
           Flow.Publisher<T> publisher,
           @Nullable DataStreamerOptions options);


   /**
    * Streams data into the cluster with a receiver.
    *
    * @param publisher Producer.
    * @param keyAccessor Key accessor. Required to determine target node from the entry key.
    * @param receiver Stream receiver. Will be invoked on the target node.
    * @param resultSubscriber Optional stream result subscriber. Will be invoked on the current client
    * for every non-null result returned by the receiver.
    * @return Future that will be completed when the stream is finished.
    * @param <S> Source item type.
    * @param <R> Result item type.
    */
   <S, R> CompletableFuture<Void> streamData(
           Flow.Publisher<S> publisher,
           Function<S, T> keyAccessor,
           StreamReceiver<S, R> receiver,
           @Nullable Flow.Subscriber<R> resultSubscriber,
           @Nullable DataStreamerOptions options);
}


Non-Java Clients

Similar to Compute, server-side code (receiver) requires a pre-deployed Java class, which is addressed by a fully qualified name.

.NET API

Asynchronous stream with consumer-controlled flow (pull-based) is represented in .NET by IAsyncEnumerable interface. Both producer and consumer are async (non-blocking).


.NET API
public interface IStreamerTarget<in T>
{
   Task StreamDataAsync(IAsyncEnumerable<T> stream, StreamerOptions? options = null);


   Task StreamDataAsync<TItem, TResult>(
       IAsyncEnumerable<TItem> stream,
       Func<TItem, T> keySelector,
       string receiverClassName,
       IStreamerResultListener<TResult>? resultListener,
       StreamerOptions? options = null);
}


Partition Awareness

  • Use case 1 (stream directly to a table): same partition awareness logic as in Table API, based on the known schema and partition distribution. Batching should be performed on a per-node basis.
  • Use case 2 (stream of custom objects): keyFunction is used to map custom objects to keys, then existing partition awareness logic from Table API is used.


The client can be connected to a subset of nodes from the cluster (or even just 1 of many). Batches will include the target node id, and the receiving server node should reroute them to the corresponding node if necessary.

Some of the per-node batches may be less “popular” than others, filling up slowly or not filling at all, containing only a few entries. Use a configurable timeout to flush those batches.

Guarantees

  • At least once delivery is guaranteed thanks to the already existing retry mechanism on the client side.
    “At least once” means that the same batch can be processed multiple times (by one or more server nodes), because retry can be triggered due to a connection failure when the batch is already received by the server, but acknowledgement is not received by the client.


Transactional Behavior

  • Without receiver: upsertAll is called in a separate transaction for every batch.
  • With receiver: User function is provided with an instance of the public Ignite API, which maintains all transactional guarantees. It is up to the user to use zero or more transactions to update zero or more tables.


Data Serialization

  • Without receiver: table data is serialized as BinaryTuple (IEP-92) according to the known schema (the same way we do it for regular table operations).
  • With receiver: data entries are serialized according to User Objects Serialization specification (TBD - IEP is not ready yet).

Discussion Links

Reference Links

Tickets

key summary type created updated due assignee reporter customfield_12311032 customfield_12311037 customfield_12311022 customfield_12311027 priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

  • No labels