IDIEP-100
Author
Sponsor
Created

 

Status


IN PROGRESS

Motivation

The Apache Ignite 3.x must provide a distributed computing functionality, including because Apache Ignite 2 does it (so called Compute Grid). But there are some reasons which make us reconsider an approach to distributed computing in Apache Ignite 3:

  • A lot of entities designate the same things (IgniteRunnable, IgniteCallable, IgniteClosure).
  • Decoupling of a computing job and environment settings is not transparent. A user should get a new computing facade instance in case he wants to change a set of target nodes or job timeout.
  • ClusterGroup facade functionality should be limited by server nodes only. Also, there is no difference between data nodes and cache nodes.
  • ExecutorService interface implementation for Ignite Compute Grid is not needed in general cases.
  • Complex settings of balancing and priorities (so called CollisionSPI and LoadBalancingSPI). For example, FIFO queue is the priority queue with the same priority for every element in the queue from the user's point of view.


So we need to do the following:

  1. To copy the Compute Grid functionality from Apache Ignite 2 with some modifications which make it easier to use distributed computing and improve a user experience.
  2. To reconsider an approach to distributed computing in Apache Ignite and propose a new solution.


The feature is split into phases in order to provide the possibility of iterative design and development. This document describes phase1 - Simple remote job execution. 

The purpose of this document is to introduce the basic building blocks (entities and interface) for the simple remote job execution.

API in this phase is not final and could be changed in the next phases.

Definitions

Compute job - a unit of work that could be run on the cluster.

Ignite compute facade - the main interface which exposes the computing framework functionality.

Target nodes - a set of nodes (can be empty, will be defined in the next phase) where computation will be or can be performed.

Design

Ignite compute facade

Distributed computing functionality is accessible via Ignite compute facade (represented by IgniteCompute interface) which could be obtained via Ignite interface using Ignite.compute() method. 

IgniteCompute interface provides the following set of operations:

  • execute - executes job on an arbitrary node from the set of target nodes;
  • executeColocated - executes job on a node where target data are located;
  • broadcast - executes job on each node from the set of target nodes.


A compute job is represented by ComputeJob interface implementation and defined as follows:

ComputeJob interface
public interface ComputeJob<R> {
    R execute(JobExecutionContext ctx, Object … args);
}


IgniteCompute facade provides the following operations:

IgniteCompute
public interface IgniteCompute {
	// Executes a job represented by the given class on one node from the nodes set.
    <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, Class<? Extends ComputeJob<R>> cls, Object … args);
    
	// Executes a job represented by the given class on one node where the given key is located.
    <R> CompletableFuture<R> executeColocated(String table, K key, Class<? extends ComputeJob<R>> cls, Object … args);

	// Executes a job represented by the given class on all nodes from the given nodes set.
    <R> Map<ClusterNode,CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> cls, Object … args);
}

Access to Ignite

A job could require access to the Ignite instance in order to execute appropriate operations. The Ignite instance could be obtained through JobExecutionContext which is a parameter of the execute() method.

JobExecutionContext interface
interface JobExcutionContext {
    Ignite ignite();
}

Job execution

In the current phase, the job execution doesn’t support failover and load balancing. 

The job execution consists of the following steps:

  • Randomly choose a target node from the given set of nodes if applicable (e.g. IgniteCompute.execute() method).
  • Transfer a job class name and arguments to the target set of nodes.
  • Instantiate job’s instance on the target node. Deserialize arguments.
  • Execute job.
  • Return the result to the node that initiated the job execution (it could be an error). 

Code deployment

In this phase, the design assumes that all jobs are deployed on all nodes (clients and servers).

Interoperability

In order to provide the possibility to execute jobs from multilingual clients in this phase IgniteCompute interface introduces overloaded methods:

IgniteCompute interface and overloaded methods
	// Executes a job represented by the given class name on one node from the nodes set.
    <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String className, Object … args);
    
	// Executes a job represented by the given class name on one node where the given key is located.
    <R> CompletableFuture<R> executeColocated(String table, K key, String className, Object … args); // Q: should we provide methods which get views as a parameter?

	// Executes a job represented by the given class name on all nodes from the nodes set.
    <R> Map<ClusterNode,CompletableFuture<R>> broadcast(Set<ClusterNode> nodes, String className, Object … args);

In phase 1 the types of arguments and a job result are limited by:

  • Java wrappers for primitive types: Boolean, Byte, Short, Integer, Long, Float, Double
  • java.lang.String
  • byte[]
  • java.util.UUID
  • Arrays of all listed types above (excluding byte[])

Open questions

  • Serialization format for job arguments and job state in order to provide better interoperability and API clarity.

Open Tickets


key summary type created updated due assignee reporter customfield_12311032 customfield_12311037 customfield_12311022 customfield_12311027 priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

Closed Tickets

key summary type created updated due assignee reporter customfield_12311032 customfield_12311037 customfield_12311022 customfield_12311027 priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

  • No labels