...
RDD transformations in Python are mapped to transformations on PythonRDD objects in Java. On remote worker machines, PythonRDD objects launch Python subprocesses and communicate with them using pipes, sending the user's code and the data to be processed.
...
Shipping code across the cluster
User-defined functions (e.g. lambdas or functions passed to map, flatMap) are serialized using PiCloud's cloudpickle library and shipped to remote Python workers. Serializing arbitrary functions / closures is tricky, but this library handles most common cases (including referencing objects in enclosing scopes).
In a few cases, PySpark's internal code needs take care to avoid including unserializable objects in function closures. For example, consider this excerpt from rdd.py that wraps a user-defined function to batch its output:
Code Block | ||||
---|---|---|---|---|
| ||||
oldfunc = self.func
batchSize = self.ctx.batchSize
def batched_func(split, iterator):
return batched(oldfunc(split, iterator), batchSize)
func = batched_func
|
In this example, batched_func} refers to {{oldfunc
instead of self.func
in order to avoid including self
in the function closure.
To ship libraries, PySpark uses Spark's SparkContext.addFile()
method to distribute the libraries to the workers, where they're added to the workers' PYTHONPATHs.
Serializing data
Data is currently serialized using the Python cPickle serializer.
Why cPickle instead of x
?
PySpark uses cPickle for serializing data because it's reasonably fast and supports nearly any Python data structure.
...
Interestingly, there are some cases where a set of pickles can be combined to be decoded faster, even though this requires manipulation of the pickle bytecode. Although this is a fun result, this bulk de-pickling technique isn't used in PySpark.
Custom serializers
Anchor | ||||
---|---|---|---|---|
|
Customs serializers could improve performance for many workloads. This functionality hasn't been implemented yet, but pull requests are welcome!
...
This doesn't support efficient the union
of RDDs storing different types of serialized data, although that's probably an uncommon case.
Execution and pipelining
PySpark pipelines transformations by composing their functions. When using PySpark, there's a one-to-one correspondence between PySpark stages and Spark scheduler stages. Each PySpark stage corresponds to a PipelinedRDD instance.
TODO: describe PythonRDD
TODO: this section could benefit from an end-to-end example tracing the execution of an operation like reduceByKey()
Daemon for launching worker processes
...