Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

TaskManagers, the worker nodes. A TaskManager is (TM) executes many tasks concurrently in threads. Each TM also contains one CommunicationManager (CM - shared between tasks), and one MemoryManager (MM - also shared between tasks). TMs can exchange data with each other via standing TCP connections, which are created when needed.

Note that in Flink, it is TaskManagers, not tasks, that exchange data over the network, i.e., data exchange between tasks that live in the same TM is multiplexed over one network connection.

IRS2IRS2

ExecutionGraph: The execution graph is a data structure that contains the “ground truth” about the job computation. It consists of vertices (ExecutionVertex) that represent computation tasks, and intermediate results (IntermediateResultPartition), that represent data produced by tasks. Vertices are linked to the intermediate results they consume via ExecutionEdges (EE):

Image Added

These are logical data structures that live in the JobManager. They have their runtime equivalent structures that are responsible for the actual data processing that live at the TaskManagers. The runtime equivalent of the IntermediateResultPartition is called ResultPartition.

...

Serializers and deserializers reliably convert typed records into raw byte buffers and vice versa, handling records that span multiple buffers, etc.

Control flow for data exchange

Image Added

This The picture represents a simple map-reduce job with two parallel tasks. We have two TaskManagers, with two tasks each (one map task and one reduce task) running in two different nodes, and one JobManager running in a third node. We focus on the initiation of the transfer between tasks M1 and R2. Data transfers are fat arrowsrepresented using thick arrows, and messages are represented using thin arrows. First, M1 produces a ResultPartition (RP1) (arrow 1). When the RP becomes available for consumption (we discuss when this is later), it informs the JobManager (arrow 2). The JobManager notifies the intended receivers of this partition (tasks R1 and R2) that the partition is ready. If the receivers have not been scheduled yet, this will actually trigger the deployment of the tasks (arrows 3a, 3b). Then, the receivers will request data from the RP (arrows 4a and 4b). This will initiate the data transfer between the tasks (arrows 5a and 5b), either locally (case 5a), or passing through the network stack of the TaskManagers (5b). This process leaves as a degree of freedom the when a RP decides to inform the JobManager of its availability. For example, if RP1 fully produces itself (and is perhaps written to a file) before informing the JM, the data exchange corresponds roughly to a batch exchange as implemented in Hadoop. If the IRP RP1 informs the JM as soon as its first record is produced, we have a streaming data exchange.

Transfer of a byte buffer between two tasks

 Image Added

This picture presents in more detail the lifetime of data records as they are shipped from a producer to a consumer. Initially the MapDriver is producing records (collected by a Collector) that are passed to a RecordWriter object. RecordWriters contain a number of serializers (RecordSerializer objects), one per consumer task that will possibly consume these records. For example, in a shuffle or broadcast, there will be as many serializers as the number of consumer tasks. A ChannelSelector selects one or more serializers to place the record to. For example, if records are broadcast, they will be placed in every serializer. If records are hash-partitioned, the ChannelSelector will evaluate the hash value on the record and select the appropriate serializer.

The serializers serialize the records into their binary representation, and place them in fixed-size buffers (records can span multiple buffers). These buffers and handed over to a BufferWriter and written out to an ResultPartition (RP). The RP consists of several subpartitions (ResultSubpartitions - RSs) that collect buffers for specific consumers. In the picture, the buffer is destined for the second reducer (in TaskManager 2), and it is placed in IRS2RS2. Since this is the first buffer, RS2 becomes available for consumption (note that this behavior implements a streaming shuffle), and notifies the JobManager of the fact.

...