Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

ResultPartition (RP) represents a chunk of data that a BufferWriter writes to, i.e., a chunk of data produced by a single task. A RP is a collection of Result Subpartitions (RSs). This is to distinguish between data that is destined to different receivers, e.g., in the case of a partitioning shuffle for a reduce or a join.

 

ResultSubpartition (RS) represents one partition of the data that is created by an operator, together with the logic for forwarding this data to the receiving operator. The specific implementation of a RS determines the actual data transfer logic, and this is the pluggable mechanism that allows the system to support a variety of data transfers. For example, the PipelinedSubpartition is a pipelined implementation to support streaming data exchange. The SpillableSubpartition is a blocking implementation to support batch data exchange.

...

Serializers and deserializers reliably convert typed records into raw byte buffers and vice versa, handling records that span multiple buffers, etc.

...

Control flow for data exchange

This picture represents a simple map-reduce job with two parallel tasks. We have two TaskManagers, with two tasks each (one map task and one reduce task) running in two different nodes, and one JobManager running in a third node. We focus on the initiation of the transfer between tasks M1 and R2. Data transfers are fat arrows, and messages are thin arrows. First, M1 produces a ResultPartition (RP1) (arrow 1). When the RP becomes available for consumption (we discuss when this is later), it informs the JobManager (arrow 2). The JobManager notifies the intended receivers of this partition (tasks R1 and R2) that the partition is ready. If the receivers have not been scheduled yet, this will actually trigger the deployment of the tasks (arrows 3a, 3b). Then, the receivers will request data from the RP (arrows 4a and 4b). This will initiate the data transfer between the tasks (arrows 5a and 5b), either locally (case 5a), or passing through the network stack of the TaskManagers (5b). This process leaves as a degree of freedom the when a RP decides to inform the JobManager of its availability. For example, if RP1 fully produces itself (and is perhaps written to a file) before informing the JM, the data exchange corresponds roughly to a batch exchange as implemented in Hadoop. If the IRP informs the JM as soon as its first record is produced, we have a streaming data exchange.

Transfer of a byte buffer between two tasks

 

This picture presents in more detail the lifetime of data records as they are shipped from a producer to a consumer. Initially the MapDriver is producing records (collected by a Collector) that are passed to a RecordWriter object. RecordWriters contain a number of serializers (RecordSerializer objects), one per consumer task that will possibly consume these records. For example, in a shuffle or broadcast, there will be as many serializers as the number of consumer tasks. A ChannelSelector selects one or more serializers to place the record to. For example, if records are broadcast, they will be placed in every serializer. If records are hash-partitioned, the ChannelSelector will evaluate the hash value on the record and select the appropriate serializer.

The serializers serialize the records into their binary representation, and place them in fixed-size buffers (records can span multiple buffers). These buffers and handed over to a BufferWriter and written out to an ResultPartition (RP). The RP consists of several subpartitions (ResultSubpartitions - RSs) that collect buffers for specific consumers. In the picture, the buffer is destined for the second reducer (in TaskManager 2), and it is placed in IRS2. Since this is the first buffer, RS2 becomes available for consumption (note that this behavior implements a streaming shuffle), and notifies the JobManager of the fact.

The JobManager looks up the consumers of RS2, and notifies TaskManager 2 that a chunck of data is available. The message to TM2 is propagated down to the InputChannel that is supposed to receive this buffer, which in turn notifies RS2 that a network transfer can be initiated. Then, RS2 hands over the buffer to the network stack of  TM1, which in turns hands it over to netty for shipping. Network connections are long-running and exist between TaskManagers, not individual tasks.

Once the buffer is received by TM2, it passes through a similar object hierarchy, starting at the InputChannel (the receiver-side equivalent to the IRPQ), going to the InputGate (which contains several ICs), and finally ending up in a RecordDeserializer that produces typed records from buffers and hands them over to the receiving task, in this case a ReduceDriver.