Apache Spark component
Apache Spark component is available starting from Camel 2.17.
This documentation page covers the Apache Spark component for the Apache Camel. The main purpose of the Spark integration with Camel is to provide a bridge between Camel connectors and Spark tasks. In particular Camel connector provides a way to route message from various transports, dynamically choose a task to execute, use incoming message as input data for that task and finally deliver the results of the execution back to the Camel pipeline.
Supported architectural styles
Spark component can be used as a driver application deployed into an application server (or executed as a fat jar).
Spark component can also be submitted as a job directly into the Spark cluster.
While Spark component is primary designed to work as a long running job serving as an bridge between Spark cluster and the other endpoints, you can also use it as a fire-once short job.
Running Spark in OSGi servers
Currently the Spark component doesn't support execution in the OSGi container. Spark has been designed to be executed as a fat jar, usually submitted as a job to a cluster. For those reasons running Spark in an OSGi server is at least challenging and is not support by Camel as well.
URI format
Currently the Spark component supports only producers - it it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job.
spark:{rdd|dataframe|hive}
RDD jobs
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
Where rdd
option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike
) from a Camel registry, while rddCallback
refers to the implementation of org.apache.camel.component.spark.RddCallback
class (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
public interface RddCallback<T> { T onRdd(AbstractJavaRDDLike rdd, Object... payloads); }
The following snippet demonstrates how to send message as an input to the job and return results:
String pattern = "job input"; long linesCount = producerTemplate.requestBody("spark:rdd?myRdd=#testFileRdd&rddCallback=#countLinesContaining", pattern, long.class);
The RDD callback for the snippet above registered as Spring bean could look as follows:
@Bean RddCallback<Long> countLinesContaining() { return new RddCallback<Long>() { Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) { String pattern = (String) payloads[0]; return rdd.filter({line -> line.contains(pattern)}).count(); } } }
The RDD definition in Spring could looks as follows:
@Bean AbstractJavaRDDLike myRdd(JavaSparkContext sparkContext) { return sparkContext.textFile("testrdd.txt"); }
RDD jobs options
Option | Description | Default value |
---|---|---|
rdd | RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike ). | null |
rddCallback | Instance of org.apache.camel.component.spark.RddCallback interface. | null |
Void RDD callbacks
If your RDD callback doesn't return any value back to a Camel pipeline, you can either return null
value or use VoidRddCallback
base class:
@Bean RddCallback<Void> rddCallback() { return new VoidRddCallback() { @Override public void doOnRdd(AbstractJavaRDDLike rdd, Object... payloads) { rdd.saveAsTextFile(output.getAbsolutePath()); } }; }
Converting RDD callbacks
If you know what type of the input data will be sent to the RDD callback, you can use ConvertingRddCallback
and let Camel to automatically convert incoming messages before inserting those into the callback:
@Bean RddCallback<Long> rddCallback(CamelContext context) { return new ConvertingRddCallback<Long>(context, int.class, int.class) { @Override public Long doOnRdd(AbstractJavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }; }; }
ProducerTemplate
Protected member producerTemplate
is the ProducerTemplate
instance associated with the given Camel context.
public class MyKuraRouter extends KuraRouter { @Override public void configure() throws Exception { producerTemplate.sendBody("jms:temperature", 22.0); ... } }
KuraRouter activator callbacks
Kura router comes with the lifecycle callbacks that can be used to customize the way the Camel router works. For example to configure the CamelContext
instance associated with the router just before the former is started, override beforeStart
method of the KuraRouter
class:
public class MyKuraRouter extends KuraRouter { ... protected void beforeStart(CamelContext camelContext) { OsgiDefaultCamelContext osgiContext = (OsgiCamelContext) camelContext; osgiContext.setName("NameOfTheRouter"); } }
Loading XML routes from ConfigurationAdmin
Sometimes it is desired to read the XML definition of the routes from the server configuration. This a common scenario for IoT gateways where over-the-air redeployment cost may be significant. To address this requirement each KuraRouter
looks for the kura.camel.BUNDLE-SYMBOLIC-NAME.route
property from the kura.camel
PID using the OSGi ConfigurationAdmin. This approach allows you to define Camel XML routes file per deployed KuraRouter
. In order to update a route, just edit an appropriate configuration property and restart a bundle associated with it. The content of the kura.camel.BUNDLE-SYMBOLIC-NAME.route
property is expected to be Camel XML route file, for example:
<routes xmlns="http://camel.apache.org/schema/spring"> <route id="loaded"> <from uri="direct:bar"/> <to uri="mock:bar"/> </route> </routes>
Deploying Kura router as a declarative OSGi service
If you would like to deploy your Kura router as a declarative OSGi service, you can use activate
and deactivate
methods provided by KuraRouter
.
<scr:component name="org.eclipse.kura.example.camel.MyKuraRouter" activate="activate" deactivate="deactivate" enabled="true" immediate="true"> <implementation class="org.eclipse.kura.example.camel.MyKuraRouter"/> </scr:component>