...
Where rdd
option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike
) from a Camel registry, while rddCallback
refers to the implementation of org.apache.camel.component.spark.RddCallback
interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
public interface RddCallback<T> {
T onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads);
} |
...
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
} |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
AbstractJavaRDDLikeJavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
} |
...
Option | Description | Default value |
---|
rdd | RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike ). | null |
rddCallback | Instance of org.apache.camel.component.spark.RddCallback interface. | null |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
RddCallback<Void> rddCallback() {
return new VoidRddCallback() {
@Override
public void doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
rdd.saveAsTextFile(output.getAbsolutePath());
}
};
} |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
return new ConvertingRddCallback<Long>(context, int.class, int.class) {
@Override
public Long doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
return rdd.count() * (int) payloads[0] * (int) payloads[1];
}
};
};
} |
...