Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
languagejava
titleSpark URI format
spark:{rdd|dataframe|hive}

 

 

RDD jobs

...

jobs 

To invoke an RDD job, use the following URI:

 

Code Block
languagejava
titleSpark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation

 Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of org.apache.camel.component.spark.RddCallback class (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.

 

Code Block
languagejava
titleSpark RDD callback
public interface RddCallback<T> {
    T onRdd(AbstractJavaRDDLike rdd, Object... payloads);
}

...

Code Block
languagejava
titleCalling spark job
String someInputpattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rddmyRdd=#testFileRdd&rddCallback=#countLinesContaining", someInputpattern, long.class);

The RDD callback for the snippet above registered as Spring bean could look as follows:

Code Block
languagejava
titleSpark RDD callback
@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

The RDD definition in Spring could looks as follows:

Code Block
languagejava
titleSpark RDD definition
@Bean
AbstractJavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}

Deploying KuraRouter

Bundle containing your Kura router class should import the following packages in the OSGi manifest:

...