Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

RDD transformations in Python are mapped to transformations on PythonRDD objects in Java. On remote worker machines, PythonRDD objects launch Python subprocesses and communicate with them using pipes, sending the user's code and the data to be processed.

...

Shipping code across the cluster

User-defined functions (e.g. lambdas or functions passed to map, flatMap) are serialized using PiCloud's cloudpickle library and shipped to remote Python workers. Serializing arbitrary functions / closures is tricky, but this library handles most common cases (including referencing objects in enclosing scopes).

In a few cases, PySpark's internal code needs take care to avoid including unserializable objects in function closures. For example, consider this excerpt from rdd.py that wraps a user-defined function to batch its output:

Code Block
python
python

            oldfunc = self.func
            batchSize = self.ctx.batchSize
            def batched_func(split, iterator):
                return batched(oldfunc(split, iterator), batchSize)
            func = batched_func

In this example, batched_func} refers to {{oldfunc instead of self.func in order to avoid including self in the function closure.

To ship libraries, PySpark uses Spark's SparkContext.addFile() method to distribute the libraries to the workers, where they're added to the workers' PYTHONPATHs.

Serializing data

Data is currently serialized using the Python cPickle serializer.

Why cPickle instead of x?

PySpark uses cPickle for serializing data because it's reasonably fast and supports nearly any Python data structure.

...

Interestingly, there are some cases where a set of pickles can be combined to be decoded faster, even though this requires manipulation of the pickle bytecode. Although this is a fun result, this bulk de-pickling technique isn't used in PySpark.

Custom serializers
Anchor
custom-serializers
custom-serializers

Customs serializers could improve performance for many workloads. This functionality hasn't been implemented yet, but pull requests are welcome!

...

This doesn't support efficient the union of RDDs storing different types of serialized data, although that's probably an uncommon case.

Execution and pipelining

PySpark pipelines transformations by composing their functions. When using PySpark, there's a one-to-one correspondence between PySpark stages and Spark scheduler stages. Each PySpark stage corresponds to a PipelinedRDD instance.

TODO: describe PythonRDD

TODO: this section could benefit from an end-to-end example tracing the execution of an operation like reduceByKey()

Daemon for launching worker processes

...