Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

NOTE: This Wiki is obsolete as of November 2016 and is retained for reference only.


Overview

PySpark is built on top of Spark's Java API. Data is processed in Python and cached / shuffled in the JVM:

...

RDD transformations in Python are mapped to transformations on PythonRDD objects in Java. On remote worker machines, PythonRDD objects launch Python subprocesses and communicate with them using pipes, sending the user's code and the data to be processed.

 

Tips for contributing to PySpark

Unit Testing

PySpark's tests are a mixture of doctests and unittests.  The doctests serve as simple usage examples and are a lightweight way to test new RDD transformations and actions.  The unittests are used for more involved testing, such as testing job cancellation.

To run the entire PySpark test suite, run ./python/run-tests.  When adding a new file that contains doctests or unittests, make sure to update run-tests so that the new tests are automatically executed by Jenkins.

To run individual test suites:

  • For unittest suites run SPARK_TESTING=1 ./bin/pyspark python/pyspark/my_file.py.  Some of our doctest suites (such as the tests in rdd.py) have a custom __main__ method that sets up a global SparkContext that's shared among the tests.  These suites should be also be run with this command.
  • For pure-doctest suites (without special __main__ setup code), run SPARK_TESTING=1 ./bin/pyspark python/pyspark/my_file.py.

Shipping code across the cluster

...

In a few cases, PySpark's internal code needs take care to avoid including unserializable objects in function closures. For example, consider this excerpt from rdd.py that wraps a user-defined function to batch its output:

Code Block
python
python

            oldfunc = self.func
            batchSize = self.ctx.batchSize
            def batched_func(split, iterator):
                return batched(oldfunc(split, iterator), batchSize)
            func = batched_func

...

Custom serializers
Anchor
custom-serializers
custom-serializers

Customs serializers could improve performance for many workloads. This functionality hasn't been implemented yet, but pull requests are welcome!Pluggable serialization of Python objects was added in spark/146, which should be included in a future Spark 0.9 release.

The first prototype of custom serializers allowed serializers to be chosen on a per-RDD basis. A later prototype of custom serializers only allowed The current implementation only allows one serializer to be used for all RDDs, set data serialization; this serializer is configured when constructing SparkContext.unmigrated-wiki-markup

Even with only one serializer, there are still some subtleties here due to how PySpark handles text files. PySpark implements {{SparkContext.textFile()}} by directly calling its Java equivalent. This produces a {{JavaRDD\[String\]}} instead of a {{JavaRDD\[byte\[\]\]}}. When sending data to the worker processes, PySpark currently sends these strings as pickled UTF-8 strings by prepending the appropriate opcodes. From the worker's point of view, all of its incoming data is in the same pickled format, reducing complexity. If we support custom serializers, then we need to handle the bookkeeping to handle RDDs of strings.]. JavaRDD transfers these strings to Python workers using UTF-8 encoding (For 0.9.0 only, this used MUTF-8 encoding, but that was fixed for 0.9.1; see SPARK-1043).

Prior to the custom serializers pull request, JavaRDD would send strings to Python as pickled UTF-8 strings by prepending the appropriate pickle opcodes. From the worker's point of view, all of its incoming data was in the same pickled format. The pull request removed all Python-pickle-specific code from JavaRDD.

To handle these cases, PySpark allows a stage's input deserialization and output serialization functions to come from different serializers. For example, in For simple cases like sc.textFile(..).map(lambda x: ...).., we can allow a stage's input and output serialization/deserialization functions to come from different serializers, so the first stage of this pipeline would use an InputOutputSerializer(UTF8Deserializer, CustomSerializer)). For more complicated cases involving joins, we can use the lineage graph to generate appropriate composed serializers for each stage.This doesn't support efficient the union of RDDs storing different types of serialized data, although that's probably an uncommon casegroupByKey() the first pipeline stage would use a MUTF8Deserializer and PickleSerializer, and subsequent stages would use PickleSerializers for their inputs and outputs. PySpark uses the lineage graph to perform the bookkeeping to select the appropriate deserializers.

At the moment, union() requires that its inputs were serialized with the same serializer. When unioning an untransformed RDD created with sc.textFile() with a transformed RDD, cartesian() product, or RDD created with parallelize(), PySpark will force some of the RDDs to be re-serialized using the default serializer. We might be able to add code to avoid this re-serialization, but it would add extra complexity and these particular union() usages seem uncommon.

In the long run, it would be nice to refactor the Java-side serialization logic so that it can apply different interpretations to the bytes that it receives from Python (e.g. unpack them into UTF-8 strings or MsgPack objects). We could also try to remove the assumption that Python sends framed input back to Java, but this this might require a separate socket for sending control messages and exceptions). In the very long term, we might be able to generalize PythonRDD's protocol to a point where we can use the same code to support backends written in other languages (this would effectively be like pipe(), but with a more complicated binary protocol).

Execution and pipelining

PySpark pipelines transformations by composing their functions. When using PySpark, there's a one-to-one correspondence between PySpark stages and Spark scheduler stages. Each PySpark stage corresponds to a PipelinedRDD instance.

TODO: describe PythonRDD

TODO: this section could benefit from an end-to-end example tracing the execution of an operation like reduceByKey()

Daemon for launching worker processes

Many JVMs use fork/exec to launch child processes in ProcessBuilder or Runtime.exec. These child processes initially have the same memory footprint as their parent. When the Spark worker JVM has a large heap and spawns many child processes, this can cause memory exhaustion, leading to "Cannot allocate memory" errors. In Spark, this affects both pipe() and PySpark.

Other developers have run into the same problem and discovered a variety of workarounds, including adding extra swap space or telling the kernel to overcommit memory. We can't use the java_posix_spawn library to solve this problem because it's too difficult to package due to its use of platform-specific native binaries.

...

Why not perform more of the processing in Java?

Wiki MarkupThe first (unpublished) PySpark prototype was implemented in terms of PipedRDD. This prototype represented each Python RDD as a {{JavaRDD\as a JavaRDD[String\]}} of base64-encoded, pickled Python objects. The goal of this approach was to allow functions like join(), distinct(), union(), cogroup(), and groupByKey() to be implemented by directly calling the Java versions.

Wiki MarkupThis approach required some complicated tricks in order to convert the results of Java operations back into pickled data. For example, a leftOuterJoin might produce an {{JavaRDD\[(String, (String, Option\[String\])\]}}:

Code Block
python
python

>>> x = sc.parallelizePairs([("a", 1), ("b", 4)|("a", 1), ("b", 4)])
>>> y = sc.parallelizePairs([("a", 2)|("a", 2)])
>>> print x.leftOuterJoin(y) ._jrdd.collect().toString()
[(UydiJwou,(STQKLg==,None)), (UydhJwou,(STEKLg==,Some(STIKLg==)))|(UydiJwou,(STQKLg==,None)), (UydhJwou,(STEKLg==,Some(STIKLg==)))]

...

This approach's correctness relied on serialized data equality being equivalent to Python object equality. However, there are some cases where logically identical objects can produce different pickles. This doesn't happen in most cases, but it can easily occur with dictionaries:

Code Block
python
python

>>> import pickle
>>> a = {1: 0, 9: 0}
>>> b = {9: 0, 1: 0}
>>> a == b
True
>>> pickle.dumps(a) == pickle.dumps(b)
False

...