...
Code Block |
---|
language | java |
---|
title | Spark URI format |
---|
|
spark:{rdd|dataframe|hive} |
RDD jobs
To invoke an RDD job, use the following URI:
...
Where rdd
option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike
) from a Camel registry, while rddCallback
refers to the implementation of org.apache.camel.component.spark.RddCallback
class interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
public interface RddCallback<T> {
T onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads);
} |
...
Code Block |
---|
language | java |
---|
title | Calling spark job |
---|
|
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?myRddrdd=#testFileRdd#myRdd&rddCallback=#countLinesContaining", pattern, long.class); |
...
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
} |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
AbstractJavaRDDLikeJavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
} |
...
Option | Description | Default value |
---|
rdd | RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike ). | null |
rddCallback | Instance of org.apache.camel.component.spark.RddCallback interface. | null |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
RddCallback<Void> rddCallback() {
return new VoidRddCallback() {
@Override
public void doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
rdd.saveAsTextFile(output.getAbsolutePath());
}
};
} |
...
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
return new ConvertingRddCallback<Long>(context, int.class, int.class) {
@Override
public Long doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
return rdd.count() * (int) payloads[0] * (int) payloads[1];
}
};
};
} |
ProducerTemplate
Annotated RDD callbacks
Probably the easiest way to work with the RDD callbacks is to provide class with method marked with @RddCallback
annotation:Protected member producerTemplate
is the ProducerTemplate
instance associated with the given Camel context.
Code Block |
---|
language | java |
---|
title | Annotated RDD callback definition |
---|
|
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback() {
return annotatedRddCallback(new MyTransformation());
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyKuraRouter extends KuraRouterMyTransformation {
@Override@RddCallback
public void configure() throws Exception {
producerTemplate.sendBody("jms:temperature", 22.0);
...long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
} |
...
Kura router comes with the lifecycle callbacks that can be used to customize the way the Camel router works. For example to configure the CamelContext
instance associated with the router just before the former is started, override beforeStart
method of the KuraRouter
classIf you will pass CamelContext to the annotated RDD callback factory method, the created callback will be able to convert incoming payloads to match the parameters of the annotated method:
Code Block |
---|
|
public class MyKuraRouter extends KuraRouter {
...
protected void beforeStart(CamelContext camelContext) {
OsgiDefaultCamelContext osgiContext = (OsgiCamelContext) camelContext;
osgiContext.setName("NameOfTheRouter");
}
} |
Loading XML routes from ConfigurationAdmin
Sometimes it is desired to read the XML definition of the routes from the server configuration. This a common scenario for IoT gateways where over-the-air redeployment cost may be significant. To address this requirement each KuraRouter
looks for the kura.camel.BUNDLE-SYMBOLIC-NAME.route
property from the kura.camel
PID using the OSGi ConfigurationAdmin. This approach allows you to define Camel XML routes file per deployed KuraRouter
. In order to update a route, just edit an appropriate configuration property and restart a bundle associated with it. The content of the kura.camel.BUNDLE-SYMBOLIC-NAME.route
property is expected to be Camel XML route file, for example:
| title | Body conversions for annotated RDD callbacks |
---|
|
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
return annotatedRddCallback(new MyTransformation(), camelContext);
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
...
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class); |
DataFrame jobs
Instead of working with RDDs Spark component can work with DataFrames as well.
To invoke an DataFrame job, use the following URI:
Code Block |
---|
language | java |
---|
title | Spark RDD producer |
---|
|
spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation |
Where dataFrame
option refers to the name of an DataFrame instance (instance of of org.apache.spark.sql.DataFrame
) from a Camel registry, while dataFrameCallback
refers to the implementation of org.apache.camel.component.spark.DataFrameCallback
interface (also from a registry). DataFrame callback provides a single method used to apply incoming messages against the given DataFrame. Results of callback computations are saved as a body to an exchange.
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
public interface DataFrameCallback<T> {
T onDataFrame(DataFrame dataFrame, Object... payloads);
} |
The following snippet demonstrates how to send message as an input to a job and return results:
Code Block |
---|
language | java |
---|
title | Calling spark job |
---|
|
String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class); |
The DataFrame callback for the snippet above registered as Spring bean could look as follows:
Code Block |
---|
language | java |
---|
title | Spark RDD callback |
---|
|
@Bean
RddCallback<Long> findCarWithModel() {
return new DataFrameCallback<Long>() {
@Override
public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
String model = (String) payloads[0]; |
Code Block |
---|
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="loaded">
<from uri="direct:bar"/>
<to uri="mock:bar"/>
</route>
</routes> |
Deploying Kura router as a declarative OSGi service
If you would like to deploy your Kura router as a declarative OSGi service, you can use activate
and deactivate
methods provided by KuraRouter
.
...
return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
}
};
} |
The DataFrame definition in Spring could looks as follows:
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
DataFrame cars(HiveContext hiveContext) {
DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
jsonCars.registerTempTable("cars");
return jsonCars;
} |
DataFrame jobs options
Option | Description | Default value |
---|
dataFrame | DataFrame instance (subclass of org.apache.spark.sql.DataFrame ). | null |
dataFrameCallback | Instance of org.apache.camel.component.spark.DataFrameCallback interface. | null |
Hive jobs
Instead of working with RDDs or DataFrame Spark component can also receive Hive SQL queries as payloads. To send Hive query to Spark component, use the following URI:
Code Block |
---|
language | java |
---|
title | Spark RDD producer |
---|
|
spark:hive |
The following snippet demonstrates how to send message as an input to a job and return results:
Code Block |
---|
language | java |
---|
title | Calling spark job |
---|
|
long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class);
List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class); |
The table we want to execute query against should be registered in a HiveContext before we query it. For example in Spring such registration could look as follows:
Code Block |
---|
language | java |
---|
title | Spark RDD definition |
---|
|
@Bean
DataFrame cars(HiveContext hiveContext) {
DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
jsonCars.registerTempTable("cars");
return jsonCars;
} |
Hive jobs options
Option | Description | Default value |
---|
collect | Indicates if results should be collected (as a list of org.apache.spark.sql.Row instances) or if count() should be called against those. | true |
Include Page |
---|
| Endpoint See Also |
---|
| Endpoint See Also |
---|
|