Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Simplify quoting.

...

  • Input PCollection that is fed into the cross-language transform in Python side uses a Python specific type. In this case, the PTranform that produced the input PCollection has to be updated to produce outputs that use  Beam's Standard Coders  to make sure Java side can interpret such coders.
  • Input PCollection uses standard coders but Python type inferencing results in picking up the PickleCoder. This can usually be resolved by annotating the predecessor Python transform with the correct type annotation using the with_output_types tag. See  this Jira  for an example.


"Unknown Coder URN beam:coder:pickled_python:v1" when running a Java pipeline that uses Python cross-language transforms

This usually means that Python SDK was not able to properly determine a portable output PCollection type when expanding the cross-language transform. So it ended up picking the default PickleCoder.

But Java SDK is unable to interpret this, so it will fail when trying to parse the expansion response from the Python SDK.

The solution is to provide a hint to the Python SDK regarding the element type(s) of the output PCollection(s) of the cross-language transform. This can be provided using the withOutputCoder or withOutputCoders methods of the PythonExternalTransform API.


How to set Java log level from a Python pipeline that uses Java transforms

For supported runners (e.g. portable runners and Dataflow runner), you can set the log level of Java transforms in the same way of setting python module log level overrides, specifically, using the --sdk_harness_log_level_overrides pipeline option. The python_underline_style option names will be automatically translated to Java smallCamel style and recognized by the Java SDK harness.

If the runner does not support the automatic mapping of options, One can try adding the corresponding pipeline option as a local pipeline option explicitly in Python side. For example, to suppress all logs from Java package org.apache.kafka package you can do following.

  1. Add a Python PipelineOption that represents the corresponding Java PipelineOption available here. This can be simply added to your Python program that starts up the Beam job.


    Code Block
    languagepy
    class JavaLoggingOptions(PipelineOptions):
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--sdkHarnessLogLevelOverrides',
            default={},
            type=json.loads,
            help=(
              'Java log level overrides'))


  2. Specify the additional PipelineOption as a parameter when running the Beam pipeline.

    Code Block
    languagebash
    --sdkHarnessLogLevelOverrides 

...

  1. '{

...

  1. "org.apache.kafka

...

  1. ":

...

  1. "ERROR

...

  1. "}

...

  1. '


     

Debugging a Python Test that calls a Java transform

...

In Additional Arguments  , add the arguments. For this example, we add 

Code Block
--test-pipeline-options="--runner=FlinkRunner --beam_services='{\":sdks:java:extensions:sql:expansion-service:shadowJar\": \"localhost:8097\"}'"

...

You can choose to run only Python in debug mode and the expansion service not in debug mode; Java expansion service in debug mode and the Python code not in debug mode; both in debug mode; or neither in debug mode.

Running a cross-language transform that uses a different version of an external transform

By default, cross-language transforms released with Beam will automatically startup an expansion service that includes external transforms. Usually these transforms will be from the same Beam release as the pipeline SDK. For example, when using cross-language Kafka transforms from Python SDK, underlying Java KafkaIO transforms will be from the same released SDK version. If you need to use a different external SDK version you can do the following.

  • Startup an expansion service that includes external transforms from a compatible SDK version. See here for more details.
  • Specify the expansion service when defining the cross-language transform in your pipeline. For example, expansion service used by Python ReadFromKafka transform can be overridden here.