You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

IDIEP-102
Author
Sponsor


Created

 

Status

IN PROGRESS


Motivation

Ignite needs an API to process an incoming stream of external data in a distributed fashion. The processing may or may not involve updating one or more Ignite tables.

Requirements

Functional

Data streamer API accepts a stream of data on the client side, distributes data entries in some way across server nodes, where the processing takes place.

The following two use cases should be supported:

  1. Upsert a stream of data into a specific table
    1. User provides the data and a table name.
    2. Ignite distributes batches of data to server nodes in a partition-aware way and performs upsert.
  2. Modify one or more tables (potentially colocated) with a custom logic from a stream of data
    1. User provides the data, table name, key function, and a receiver function.
    2. Ignite uses key function and table name on the client side to batch the data in a partition-aware way, then sends it to the server nodes according to partition distribution.
    3. Ignite invokes the receiver function on the server nodes with individual items of the source data. The user function is responsible for modifying table data using the table API. The function can return arbitrary results back to the client. In particular, returned results can indicate a failure to process a given item.


Exception Handling

Uncaught exception in receiver aborts the streamer. The exception is sent back to the client and passed to the CompletableFuture returned from streamData API.

To handle non-critical errors while processing individual items, user code can return arbitrary data from the receiver (see p2-c above).

Non Functional

  • Transactional guarantees: the streamer should not bypass any transaction mechanisms. Transaction per batch or transaction per entry could be used.
  • Asynchronous API with backpressure control. Avoid blocking threads and/or excessive resource usage on clients and servers.
  • Ordering guarantees: the streamer must guarantee that two objects added with the same key will be processed in the same order they were added by the client. In other words, the streamer can’t reorder items within a given partition. Receiver should not be invoked for item B if preceding item A is not yet processed.
  • At Least Once is a must. Exactly Once is nice to have if technically feasible (probably not).
  • Partition pinning. Because the receiver usually expects the data to be colocated, we should prevent partitions from changing ownership while the receiver is running.

Description

Receiver Deployment

Receiver class should be deployed on all server nodes prior to data streamer usage using the Code Deployment mechanism.

Reactive Java API

Java 9 Reactive Streams is a standard API for asynchronous stream processing (initially inspired by Akka Streams). Our API will accept Flow.Publisher from the user and pull the data from there as needed, at the speed of server-side ingestion and processing.


Java API
public interface StreamerTarget<T> {
   /**
    * Streams data into the table.
    *
    * @param publisher Producer.
    * @return Future that will be completed when the stream is finished.
    */
   CompletableFuture<Void> streamData(
           Flow.Publisher<T> publisher,
           @Nullable DataStreamerOptions options);


   /**
    * Streams data into the cluster with a receiver.
    *
    * @param publisher Producer.
    * @param keyAccessor Key accessor. Required to determine target node from the entry key.
    * @param receiver Stream receiver. Will be invoked on the target node.
    * @param resultSubscriber Optional stream result subscriber. Will be invoked on the current client
    * for every non-null result returned by the receiver.
    * @return Future that will be completed when the stream is finished.
    * @param <S> Source item type.
    * @param <R> Result item type.
    */
   <S, R> CompletableFuture<Void> streamData(
           Flow.Publisher<S> publisher,
           Function<S, T> keyAccessor,
           StreamReceiver<S, R> receiver,
           @Nullable Flow.Subscriber<R> resultSubscriber,
           @Nullable DataStreamerOptions options);
}


Non-Java Clients

Similar to Compute, server-side code (receiver) requires a pre-deployed Java class, which is addressed by a fully qualified name.

.NET API

Asynchronous stream with consumer-controlled flow (pull-based) is represented in .NET by IAsyncEnumerable interface. Both producer and consumer are async (non-blocking).



public interface IStreamerTarget<in T>

{

   Task StreamDataAsync(IAsyncEnumerable<T> stream, StreamerOptions? options = null);


   Task StreamDataAsync<TItem, TResult>(

       IAsyncEnumerable<TItem> stream,

       Func<TItem, T> keySelector,

       string receiverClassName,

       IStreamerResultListener<TResult>? resultListener,

       StreamerOptions? options = null);

}

Partition Awareness

  • Use case 1 (stream directly to a table): same partition awareness logic as in Table API, based on the known schema and partition distribution. Batching should be performed on a per-node basis.
  • Use case 2 (stream of custom objects): keyFunction is used to map custom objects to keys, then existing partition awareness logic from Table API is used.


The client can be connected to a subset of nodes from the cluster (or even just 1 of many). Batches will include the target node id, and the receiving server node should reroute them to the corresponding node if necessary.

Some of the per-node batches may be less “popular” than others, filling up slowly or not filling at all, containing only a few entries. Use a configurable timeout to flush those batches.

Guarantees

  • At least once delivery is guaranteed thanks to the already existing retry mechanism on the client side.
    “At least once” means that the same batch can be processed multiple times (by one or more server nodes), because retry can be triggered due to a connection failure when the batch is already received by the server, but acknowledgement is not received by the client.


Transactional Behavior

  • Without receiver: upsertAll is called in a separate transaction for every batch.
  • With receiver: User function is provided with an instance of the public Ignite API, which maintains all transactional guarantees. It is up to the user to use zero or more transactions to update zero or more tables.


Data Serialization

  • Without receiver: table data is serialized as BinaryTuple (TODO - link) according to the known schema (the same way we do it for regular table operations).
  • With receiver: data entries are serialized according to User Objects Serialization specification (TODO - link).

Risks and Assumptions

TBD

Discussion Links

// Links to discussions on the devlist, if applicable.

Reference Links

TBD: Link to 2.x proposal

Tickets

key summary type created updated due assignee reporter customfield_12311032 customfield_12311037 customfield_12311022 customfield_12311027 priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

  • No labels