Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IDIEP-67
Author
Sponsor
Created

  

Status

Status
colour

Grey

Green
title

DRAFT

active


Table of Contents

Motivation

According to Changes proposed in IEP-61 we need a group membership/discovery service with likely less strict consistency guarantees than the current Discovery SPI. It is pretty convenient to extract such features to the extra low-level module and provide some API for interaction with the network.There are several problems to have the different interfaces for network communication(DiscoverySPI, CommunicationSPI) including undefined behavior when one port works perfectly while another is not,  duplication of interfaces for sending messages, duplication of the code for network interaction. So for resolving these problems, it is an idea to mix discovery and communication modules into one networking module with one port, API, and implementation.

Description

Terminology

  • endpoint - host(IP)/port which should be configured in discovery configuration. This is the analog 'node' but 'node' is not an appropriate word on such low-level and it represents only host/port rather than the whole node.
  • instanceId/setupId - unique identification of every endpoint which should be bounded with a local process(this identification should be generated on every restart). Something like current nodeId but belongs to network endpoint rather than the whole node.
  • network member/member - connection + instanceId + messages? It can be used for sending messages. It represents a cluster participant which is able to send or receive some messages.

Responsibility

  • Discover and automatically gather all found network endpoints to group membership.
  • Detect the network events - new endpoints appears, the connection fails, etc.
  • Provide cluster membership changes events for subsystems that do not require strict consistency guarantees.
  • Retry to establishing connection if it was lost.
  • Send and receive messages(including custom messages) to any member of the group.
  • Serialize/deserialize messages.
  • Provide API for sending messages to a specific member with/without a delivery guarantee.

Lifecycle

  • Start the process.
  • Bind to the configured local port
  • Iterate over configured ip/port(IP finder?) and connect to endpoint if possible
  • Start the handshake with a successfully connected endpoint for exchange the initial information(instanceId, recoveryMessageId)
  • Notify subscribers about establishing connection to the cluster(Generate network started event)
  • Reconnect to endpoint if connection unexpectedly fails (without any events to subscribers)
  • Notify subscribers about the appearance of a new network member if it finds one.
  • Notify subscribers about the disappearance of network member if connection unrecoverably fails.
  • Stop the process.

API

allows us to lower requirements for consistency guarantees of membership/discovery protocols. Current Discovery SPI implementations are not scalable enough or rely on external distributed systems (ZooKeeper), so relaxing consistency guarantees and improving scalability is a beneficial goal.

Other goals are:

  • Ports unification. Instead of two separate ports for Discovery and Communication subsystems single port should be used for all network interactions. It simplifies overall configuration and eliminates situations of undefined behavior when only one port is available (for discovery or communication) so Ignite node cannot work properly.
  • Code base unification. Instead of relying on custom protocols and network code base well known protocols and libraries will be used: SWIM or Rapid-based discovery and membership module and communication part developed on top of netty library.
  • API simplification. Bringing all network-related APIs into a single module makes it easier to developers to navigate among network capabilities and find necessary API methods.

Description

Configuration

Entry point class for all configuration of network module is NetworkConfiguration in org.apache.ignite.configuration.schemas.network package. It is an auto-generated class so project building is necessary to find it (configuration framework in greater details is described in IEP-55).

Right now we have to provide configuration for two main sub-components:

  • Discovery and membership subsystem; specific configuration classes for them are ClusterMembershipConfiguration and NodeFinderConfiguration.
  • P2P communication subsystem; its configuration is represented by InboundConfiguration and OutboundConfiguration classes.

API

Main interface of network module is ClusterService. It provides access to two other aspects of network: discovery and P2P communication.

Code Block
languagejava
titleDraft APIClusterService interface
/**
 * Class, that represents the network-related resources of a node and provides entry points for working with the network members of a
 * cluster.
 */
public interface ClusterService extends IgniteComponentpublic interface NetworkService {
    static NetworkService create(NetworkConfiguration cfg);
/**
     * Returns the {@link TopologyService} for working with the cluster topology.
     *
    void shutdown() throws ???;

 * @return Topology Service.
    NetworkMember localMember(); */
    
	Collection<NetworkMember>TopologyService remoteMemberstopologyService();

    
	void weakSend(NetworkMember member, Message msg);

    Future<?> guaranteedSend(NetworkMember member, Message msg);
/**
     * Returns the {@link MessagingService} for sending messages to the cluster members.
     
	void listenMembers(MembershipListener lsnr);
*
     * @return Messaging Service.
     
	void listenMessages(Consumer<MessagingEvent> lsnr*/
    MessagingService messagingService();
}

public interface MembershipListener {
    void onAppeared(NetworkMember member);    /**
     * Returns the local configuration of this node.
    void onDisappeared(NetworkMember member);
    void onAcceptedByGroup(List<NetworkMember> remoteMembers);
}

public interface NetworkMember { *
     * @return Configuration of the current node.
     */
    UUIDClusterLocalConfiguration idlocalConfiguration();

    //...
}

Implementation

  • It needs to implement the group membership over netty.
  • Integrate direct marshaling to netty(via netty handlers).
  • Implement sending idempotent messages with very weak guarantees:
    • no delivery guarantees required;
    • multiple copies of the same message might be sent;
    • no need to have any kind of acknowledgement;
  • Implement sending messages with delivery guarantees::
    • message must be sent exactly once with an acknowledgement that it has actually been received (not necessarily processed);
    • messages must be received in the same order they were sent.
    • These types of messages might utilize current recovery protocol with acks every 32 (or so) messages. This setting must be flexible enough so that we won't get OOM in big topologies.

...


Discovery information is available via methods of TopologyService interface. Information is provided about local node and current set of nodes presented in the cluster, also it is possible to subscribe to events regarding topology changes like new nodes joining or existing nodes leaving.

P2P communication subsystem can be accessed through MessagingService interface. Here could be found various methods to send messages to other nodes providing different levels of guarantees about messages delivery.
Handlers for different types of messages are also registered in MessagingService.

User Object Serialization

Overview

This section provides a description of arbitrary objects serialization mechanism which allows network module to handle custom user objects. Unlike NetworkMessage descendants, user objects are not known to the system in advance, and serialization layout must be resolved at runtime.

User objects serialization protocol aims to implement the following properties and operates under the following assumptions:

  • User objects may have different schemas on different Ignite nodes because of different versions of classes loaded on the nodes. Ignite guarantees a well-defined cross-version serialization (potentially resulting in a deserialization error if a breaking change to the object class is introduced). This feature allows users to change class versions and serialize arbitrary object graphs seamlessly without the need to explicitly define network messages.
  • User object layout must be fully determined by the object class (i.e. class name and class loader). The layout is encoded in a class descriptor that can be transferred over network via an alternative mechanism (e.g. the class descriptor can be a NetworkMessage or serialized via JDK marshaller).
  • User object serialization must follow Java Serialization API contracts: e.g. if an object implements Externalizable interface, corresponding methods must be used for serialization. Many of these methods, however, may cause a breaking change.
  • Assuming the class descriptor format can be implemented by a different platform, the protocol can be used for implementing custom user logic on a variety of platforms.

Class Descriptor Format

A class descriptor represents a sequence of field names and field types that is used during class instances serialization and deserialization. Since the class descriptor is fully determined by the class itself, the descriptor is an immutable structure and can be identified by a unique descriptor ID.

Upon creation, each descriptor is assigned a unique identifier by the local node. Descriptors for the same class name may have different identifiers on different nodes.

Additionally, the descriptor contains a set of flags that specify which JDK serialization methods are used (readExternal/ writeExternal, readResolve/writeReplace, etc), and final flag for final classes.

Internally, Ignite will have a set of built-in descriptors that correspond to objects with fixed serialization format that includes, but not limited to:

  • Primitives
  • Boxed primitives
  • java.util.String
  • java.util.UUID
  • Standard collections

Predefined descriptors allow for a more efficient serialization without compromising cross-version compatibility. Note that primitives and boxed primitives descriptors are effectively final.

Plain and Serializable classes

Fields of the class are enumerated in a strict order from parent class to the child class, and sorted lexicographically within each class hierarchy level. If an Externalizable class is encountered (no matter at which level of hierarchy) it is serialized via writeExternal(); call.

During the class instance (de)serialization, the (de)serializer traverses the fields from the descriptor and writes:

  • Backreference if a cycle is encountered and the object at hand has already been written.
  • Field value without field value class descriptor if declared field's class is final. In this case, the actual field's value type is guaranteed to be equal to the declared field type.
  • Field's value class descriptor ID and field data if the declared field's class is non-final. This allows polymorphic fields.

If a field value is not a primitive its Object ID is also serialized to handle cycles of object references and support polymorphic fields.

Externalizable classes

Externalizable classes delegate the (de)serialization logic to readExternal/ writeExternal methods. The size of the serialized output should be made available to the general layout so that the Externalizable class can be skipped entirely in the case when the class on the deserializing side does not implement Externalizable interface.

Cross-node interoperability

In order to support arbitrary class structure changes, the object serialization must be performed according to the local class descriptor, but the object deserialization must be performed according to the remote class descriptor from the node that actually serialized the object. Therefore, the descriptors must be shared between Ignite nodes to support the protocol.

Descriptor availability is tracked on a per-session p2p level. If there is a notion of a session between nodes, the sending side can track which descriptors were already sent to the remote side before actually sending the serialized object. If there are unsent descriptors, they are sent to the remote side prior to sending the serialized object. The receiving side must use the descriptor from the particular session to deserialize the object. In this case, even if two different nodes send the class with the same name and different structure, the receiving side will be able to properly deserialize the object.

Class-structure changes handling

When reading a serialized instance using a remote class descriptor, the read values will not necessarily be present in the local class descriptor (this can happen, for example, if the local node removed/renamed a field from the class, or the remote node added/renamed a field). Since the layout of the read object precisely matches the remote class descriptor, the unexpected value can always be read and either skipped or passed to an optional handler.

If a field was added in the local class descriptor, but was not present in the remote class descriptor, it will be skipped during the deserialization. Such skipped fields can be additionally handled in the optional handler

...

.

Dependencies

Nettyis an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. It will be used for interaction with network(read/write, serialize/deserialize).

...

Tickets

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,customfield_12311032,customfield_12311037,customfield_12311022,customfield_12311027,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,Priority,Priority,Priority,Priority,priority,status,resolution
maximumIssues20
jqlQueryproject=IGNITE and labels in (iep-66)
serverId5aa69414-a9e9-3523-82ec-879b028fb15b