...
Where rdd
option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike
) from a Camel registry, while rddCallback
refers to the implementation of org.apache.camel.component.spark.RddCallback
class interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
...
Code Block |
---|
language | java |
---|
title | Calling spark job |
---|
|
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?myRddrdd=#testFileRdd#myRdd&rddCallback=#countLinesContaining", pattern, long.class); |
...
Code Block |
---|
language | java |
---|
title | Body conversions for annotated RDD callbacks |
---|
|
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
return annotatedRddCallback(new MyTransformation(), camelContext);
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
...
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class); |
DataFrame jobs
Instead of working with RDDs Spark component can work with DataFrames as well.
To invoke an DataFrame job, use the following URI:
Code Block |
---|
language | java |
---|
title | Spark RDD producer |
---|
|
spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation |
Where dataFrame
option refers to the name of an DataFrame instance (instance of of org.apache.spark.sql.DataFrame
) from a Camel registry, while dataFrameCallback
refers to the implementation of org.apache.camel.component.spark.DataFrameCallback
interface (also from a registry). DataFrame callback provides a single method used to apply incoming messages against the given DataFrame. Results of callback computations are saved as a body to an exchange.
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
public interface DataFrameCallback<T> {
T onDataFrame(DataFrame dataFrame, Object... payloads);
} |
The following snippet demonstrates how to send message as an input to a job and return results:
Code Block |
---|
language | java |
---|
title | Calling spark job |
---|
|
String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class); |
The DataFrame callback for the snippet above registered as Spring bean could look as follows:
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
@Bean
RddCallback<Long> findCarWithModel() {
return new DataFrameCallback<Long>() {
@Override
public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
String model = (String) payloads[0];
return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
}
};
} |
The DataFrame definition in Spring could looks as follows:
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
DataFrame cars(HiveContext hiveContext) {
DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
jsonCars.registerTempTable("cars");
return jsonCars;
} |
DataFrame jobs options
Option | Description | Default value |
---|
dataFrame | DataFrame instance (subclass of org.apache.spark.sql.DataFrame ). | null |
dataFrameCallback | Instance of org.apache.camel.component.spark.DataFrameCallback interface. | null |
Include Page |
---|
| Endpoint See Also |
---|
| Endpoint See Also |
---|
|