...
Code Block |
---|
language | java |
---|
title | Spark URI format |
---|
|
spark:{rdd|dataframe|hive} |
RDD jobs
...
jobs
To invoke an RDD job, use the following URI:
Code Block |
---|
language | java |
---|
title | Spark RDD producer |
---|
|
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation |
Where rdd
option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike
) from a Camel registry, while rddCallback
refers to the implementation of org.apache.camel.component.spark.RddCallback
class (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
public interface RddCallback<T> {
T onRdd(AbstractJavaRDDLike rdd, Object... payloads);
} |
...
Code Block |
---|
language | java |
---|
title | Calling spark job |
---|
|
String someInputpattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rddmyRdd=#testFileRdd&rddCallback=#countLinesContaining", someInputpattern, long.class); |
The RDD callback for the snippet above registered as Spring bean could look as follows:
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
} |
The RDD definition in Spring could looks as follows:
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
AbstractJavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
} |
Deploying KuraRouter
Bundle containing your Kura router class should import the following packages in the OSGi manifest:
...