Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Data exchange in Flink is built around the following design principles:(1)

  1. The control flow for data exchange (i.e., the message passing in order to initiate the exchange) is receiver-initiated, much like the original MapReduce.

...

  1. The data flow for data exchange, i.e., the actual transfer of data over the wire is abstracted by the notion of an IntermediateResult, and is pluggable. This means that the system can support both streaming data transfer and batch data transfer with the same implementation.

Data exchange involves several objects, including:

JobManager, the master node, is responsible for scheduling tasks, recovery, and coordination, and holds the big picture of a job via the ExecutionGraph data structure.

TaskManagers, the worker nodes. A TaskManager is executes many tasks concurrently in threads. Each TM also contains one CommunicationManager (CM - shared between tasks), and one MemoryManager (MM - also shared between tasks). TMs can exchange data with each other via standing TCP connections, which are created when needed.

Note that in Flink, it is TaskManagers, not tasks, that exchange data over the network, i.e., data exchange between tasks that live in the same TM is multiplexed over one network connection.


ExecutionGraph: The execution graph is a data structure that contains the “ground truth” about the job computation. It consists of vertices (ExecutionVertex) that represent computation tasks, and intermediate results (IntermediateResult), that represent data produced by tasks. IntermediateResults are comprised by IntermediateResultPartitions that represent data to be consumed by a specific receiving task.


These are logical data structures that live in the JobManager. They have their runtime equivalent structures that are responsible for the actual data processing that live at the TaskManagers. The runtime equivalent of the IntermediateResult is called ResultPartition, and the the runtime equivalent of the IntermediateResultPartition is called ResultSubpartition.


ResultPartition (RP) represents a chunk of data that a BufferWriter writes to, i.e., a chunk of data produced by a single task. A RP is a collection of Result Subpartitions (RSs). This is to distinguish between data that is destined to different receivers, e.g., in the case of a partitioning shuffle for a reduce or a join.


 

ResultSubpartition (RS) represents one partition of the data that is created by an operator, together with the logic for forwarding this data to the receiving operator. The specific implementation of a RS determines the actual data transfer logic, and this is the pluggable mechanism that allows the system to support a variety of data transfers. For example, the PipelinedSubpartition is a pipelined implementation to support streaming data exchange. The SpillableSubpartition is a blocking implementation to support batch data exchange.



InputGate: The logical equivalent of the RP at the receiving side. It is responsible for collecting buffers of data and handing them upstream.


InputChannel: The logical equivalent of the RS at the receiving side. It is responsible for collecting buffers of data for a specific partition.


Buffer: See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525


Serializers and deserializers reliably convert typed records into raw byte buffers and vice versa, handling records that span multiple buffers, etc.

 

Control flow for data exchange


This picture represents a simple map-reduce job with two parallel tasks. We have two TaskManagers, with two tasks each (one map task and one reduce task) running in two different nodes, and one JobManager running in a third node. We focus on the initiation of the transfer between tasks M1 and R2. Data transfers are fat arrows, and messages are thin arrows. First, M1 produces a ResultPartition (RP1) (arrow 1). When the RP becomes available for consumption (we discuss when this is later), it informs the JobManager (arrow 2). The JobManager notifies the intended receivers of this partition (tasks R1 and R2) that the partition is ready. If the receivers have not been scheduled yet, this will actually trigger the deployment of the tasks (arrows 3a, 3b). Then, the receivers will request data from the RP (arrows 4a and 4b). This will initiate the data transfer between the tasks (arrows 5a and 5b), either locally (case 5a), or passing through the network stack of the TaskManagers (5b). This process leaves as a degree of freedom the when a RP decides to inform the JobManager of its availability. For example, if RP1 fully produces itself (and is perhaps written to a file) before informing the JM, the data exchange corresponds roughly to a batch exchange as implemented in Hadoop. If the IRP informs the JM as soon as its first record is produced, we have a streaming data exchange.