...
Custom serializers
Anchor | ||||
---|---|---|---|---|
|
Customs serializers could improve performance for many workloads. This functionality hasn't been implemented yet, but pull requests are welcome!Pluggable serialization of Python objects was added in spark/146, which should be included in a future Spark 0.9 release.
The first prototype of custom serializers allowed serializers to be chosen on a per-RDD basis. A later prototype of custom serializers only allowed The current implementation only allows one serializer to be used for all RDDs, set data serialization; this serializer is configured when constructing SparkContext.
Wiki Markup |
---|
Even with only one serializer, there are still some subtleties here due to how PySpark handles text files. PySpark implements {{SparkContext.textFile()}} by directly calling its Java equivalent. This produces a {{JavaRDD\[String\]}} instead of a {{JavaRDD\[byte\[\]\]}}. JavaRDD transfers these strings to Python workers using Java's MUTF-8 encoding. <ac:structured-macro When sending data to the worker processes, PySpark currently sends these stringsac:name="footnote" ac:schema-version="1" ac:macro-id="dd27e691-a2b6-4013-bdd1-09093f860c08"><ac:parameter ac:name="atlassian-macro-output-type">INLINE</ac:parameter><ac:rich-text-body><p>Prior to this pull request, JavaRDD would send strings to Python as pickled UTF-8 strings by prepending the appropriate pickle opcodes. From the worker's point of view, all of its incoming data iswas in the same pickled format, reducing complexity. If we support custom serializers, then we need to handle the bookkeeping to handle RDDs of strings. |
. The pull request removed all Python-pickle-specific code from JavaRDD.</p></ac:rich-text-body></ac:structured-macro> To handle these cases, PySpark allows a stage's input deserialization and output serialization functions to come from different serializers. For example, in {{sc.textFile(..).map(lambda x: ...). |
groupByKey()}} the first pipeline stage would use a MUTF8Deserializer and PickleSerializer, and subsequent stages would use PickleSerializers for their inputs and outputs. PySpark uses the lineage graph to perform the bookkeeping to select the appropriate deserializers.
At the moment, union() requires that its inputs were serialized with the same serializer. When unioning an untransformed RDD created with sc.textFile() with a transformed RDD, cartesian() product, or RDD created with parallelize(), PySpark will force some of the RDDs to be re-serialized using the default serializer. We could add support for this later, but it would add extra complexity and these particular union() usages seem uncommon. |
In the long run, it would be nice to refactor the Java-side serialization logic so that it can apply different interpretations to the bytes that it receives from Python (e.g. unpack them into UTF-8 strings or MsgPack objects). We could also try to remove the assumption that Python sends framed input back to Java, but this this might require a separate socket for sending control messages and exceptions). In the very long term, we might be able to generalize PythonRDD's protocol to a point where we can use the same code to support backends written in other languages (this would effectively be like pipe(), but with a more complicated binary protocol).
Footnotes Display |
---|
InputOutputSerializer(UTF8Deserializer, CustomSerializer))
. For more complicated cases involving joins, we can use the lineage graph to generate appropriate composed serializers for each stage.This doesn't support efficient the union
of RDDs storing different types of serialized data, although that's probably an uncommon case.Execution and pipelining
PySpark pipelines transformations by composing their functions. When using PySpark, there's a one-to-one correspondence between PySpark stages and Spark scheduler stages. Each PySpark stage corresponds to a PipelinedRDD instance.
...
TODO: this section could benefit from an end-to-end example tracing the execution of an operation like reduceByKey()
Daemon for launching worker processes
...