...
Where rdd
option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike
) from a Camel registry, while rddCallback
refers to the implementation of org.apache.camel.component.spark.RddCallback
interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
public interface RddCallback<T> {
T onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads);
} |
...
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
} |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
AbstractJavaRDDLikeJavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
} |
...
Option | Description | Default value |
---|
rdd | RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike ). | null |
rddCallback | Instance of org.apache.camel.component.spark.RddCallback interface. | null |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
RddCallback<Void> rddCallback() {
return new VoidRddCallback() {
@Override
public void doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
rdd.saveAsTextFile(output.getAbsolutePath());
}
};
} |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
return new ConvertingRddCallback<Long>(context, int.class, int.class) {
@Override
public Long doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
return rdd.count() * (int) payloads[0] * (int) payloads[1];
}
};
};
} |
...
Option | Description | Default value |
---|
dataFrame | DataFrame instance (subclass of org.apache.spark.sql.DataFrame ). | null |
dataFrameCallback | Instance of org.apache.camel.component.spark.DataFrameCallback interface. | null |
Hive jobs
Instead of working with RDDs or DataFrame Spark component can also receive Hive SQL queries as payloads. To send Hive query to Spark component, use the following URI:
Code Block |
---|
language | java |
---|
title | Spark RDD producer |
---|
|
spark:hive |
The following snippet demonstrates how to send message as an input to a job and return results:
Code Block |
---|
language | java |
---|
title | Calling spark job |
---|
|
long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class);
List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class); |
The table we want to execute query against should be registered in a HiveContext before we query it. For example in Spring such registration could look as follows:
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
DataFrame cars(HiveContext hiveContext) {
DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
jsonCars.registerTempTable("cars");
return jsonCars;
} |
Hive jobs options
Option | Description | Default value |
---|
collect | Indicates if results should be collected (as a list of org.apache.spark.sql.Row instances) or if count() should be called against those. | true |
Include Page |
---|
| Endpoint See Also |
---|
| Endpoint See Also |
---|
|