You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Overview

PySpark is built on top of Spark's Java API. Data is processed in Python and cached / shuffled in the JVM:

In the Python driver program, SparkContext uses Py4J to launch a JVM and create a JavaSparkContext. Py4J is only used on the driver for local communication between the Python and Java SparkContext objects; large data transfers are performed through a different mechanism.

RDD transformations in Python are mapped to transformations on PythonRDD objects in Java. On remote worker machines, PythonRDD objects launch Python subprocesses and communicate with them using pipes, sending the user's code and the data to be processed.

Shipping code across the cluster

User-defined functions (e.g. lambdas or functions passed to map, flatMap) are serialized using PiCloud's cloudpickle library and shipped to remote Python workers. Serializing arbitrary functions / closures is tricky, but this library handles most common cases (including referencing objects in enclosing scopes).

In a few cases, PySpark's internal code needs take care to avoid including unserializable objects in function closures. For example, consider this excerpt from rdd.py that wraps a user-defined function to batch its output:

            oldfunc = self.func
            batchSize = self.ctx.batchSize
            def batched_func(split, iterator):
                return batched(oldfunc(split, iterator), batchSize)
            func = batched_func

In this example, batched_func} refers to {{oldfunc instead of self.func in order to avoid including self in the function closure.

To ship libraries, PySpark uses Spark's SparkContext.addFile() method to distribute the libraries to the workers, where they're added to the workers' PYTHONPATHs.

Serializing data

Data is currently serialized using the Python cPickle serializer. PySpark uses cPickle for serializing data because it's reasonably fast and supports nearly any Python data structure.

TODO: discuss why you didn't use JSON, BSON, ProtoBuf, MsgPack, etc.

Bulk pickling optimizations

As an optimization, we store and serialize objects in small batches. Batching amortizes fixed serialization costs and allows pickle to employ compression techniques (like run-length encoding or storing pointers to repeated objects).

Interestingly, there are some cases where a set of pickles can be combined to be decoded faster, even though this requires manipulation of the pickle bytecode. Although this is a fun result, this bulk de-pickling technique isn't used in PySpark.

Custom serializers

Customs serializers could improve performance for many workloads. This functionality hasn't been implemented yet, but pull requests are welcome!

The first prototype of custom serializers allowed serializers to be chosen on a per-RDD basis. A later prototype of custom serializers only allowed one serializer to be used for all RDDs, set when constructing SparkContext.

Even with only one serializer, there are still some subtleties here due to how PySpark handles text files. PySpark implements SparkContext.textFile() by directly calling its Java equivalent. This produces a JavaRDD[String] instead of a JavaRDD[byte[]]. When sending data to the worker processes, PySpark currently sends these strings as pickled UTF-8 strings by prepending the appropriate opcodes. From the worker's point of view, all of its incoming data is in the same pickled format, reducing complexity. If we support custom serializers, then we need to handle the bookkeeping to handle RDDs of strings.

For simple cases like sc.textFile(..).map(lambda x: ...).., we can allow a stage's input and output serialization/deserialization functions to come from different serializers, so the first stage of this pipeline would use an InputOutputSerializer(UTF8Deserializer, CustomSerializer)). For more complicated cases involving joins, we can use the lineage graph to generate appropriate composed serializers for each stage.

This doesn't support efficient the union of RDDs storing different types of serialized data, although that's probably an uncommon case.

Execution and pipelining

PySpark pipelines transformations by composing their functions. When using PySpark, there's a one-to-one correspondence between PySpark stages and Spark scheduler stages. Each PySpark stage corresponds to a PipelinedRDD instance.

TODO: describe PythonRDD

TODO: this section could benefit from an end-to-end example tracing the execution of an operation like reduceByKey()

Daemon for launching worker processes

Many JVMs use fork/exec to launch child processes in ProcessBuilder or Runtime.exec. These child processes initially have the same memory footprint as their parent. When the Spark worker JVM has a large heap and spawns many child processes, this can cause memory exhaustion, leading to "Cannot allocate memory" errors. In Spark, this affects both pipe() and PySpark.

Other developers have run into the same problem and discovered a variety of workarounds, including adding extra swap space or telling the kernel to overcommit memory. We can't use the java_posix_spawn library to solve this problem because it's too difficult to package due to its use of platform-specific native binaries.

For PySpark, we resolved this problem by forking a daemon when the JVM heap is small and using that daemon to launch and manage a pool of Python worker processes. Since the daemon uses very little memory, it won't exhaust the memory during fork().

Misc. design notes

Why not perform more of the processing in Java?

The first (unpublished) PySpark prototype was implemented in terms of PipedRDD. This prototype represented each Python RDD as a JavaRDD[String] of base64-encoded, pickled Python objects. The goal of this approach was to allow functions like join(), distinct(), union(), cogroup(), and groupByKey() to be implemented by directly calling the Java versions.

This approach required some complicated tricks in order to convert the results of Java operations back into pickled data. For example, a leftOuterJoin might produce an JavaRDD[(String, (String, Option[String])]:

>>> x = sc.parallelizePairs([("a", 1), ("b", 4)|("a", 1), ("b", 4)])
>>> y = sc.parallelizePairs([("a", 2)|("a", 2)])
>>> print x.leftOuterJoin(y) ._jrdd.collect().toString()
[(UydiJwou,(STQKLg==,None)), (UydhJwou,(STEKLg==,Some(STIKLg==)))|(UydiJwou,(STQKLg==,None)), (UydhJwou,(STEKLg==,Some(STIKLg==)))]

To handle these cases, JavaRDD contained routines to flatten nested data structures containing pickled objects.

This approach's correctness relied on serialized data equality being equivalent to Python object equality. However, there are some cases where logically identical objects can produce different pickles. This doesn't happen in most cases, but it can easily occur with dictionaries:

>>> import pickle
>>> a = {1: 0, 9: 0}
>>> b = {9: 0, 1: 0}
>>> a == b
True
>>> pickle.dumps(a) == pickle.dumps(b)
False

There are Python serializers that can guarantee the equality properties needed by this approach, but they were either slower than cPickle or supported fewer data types.

  • No labels