...
While Spark component is primary designed to work as a long running job serving as an bridge between Spark cluster and the other endpoints, you can also use it as a fire-once short job.
Running Spark in OSGi servers
Currently the Spark component doesn't support execution in the OSGi container. Spark has been designed to be executed as a fat jar, usually submitted as a job to a cluster. For those reasons running Spark in an OSGi server is at least challenging and is not support by Camel as well.
KuraRouter activator
URI format
Currently the Spark component supports only producers - it it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job.
Code Block | ||||
---|---|---|---|---|
| ||||
spark:{rdd|dataframe|hive} |
RDD jobs
Code Block | ||||
---|---|---|---|---|
| ||||
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation |
Where rdd
option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike
) from a Camel registry, while rddCallback
refers to the implementation of Bundles deployed to the Eclipse Kura are usually developed as bundle activators. So the easiest way to deploy Apache Camel routes into the Kura is to create an OSGi bundle containing the class extending org.apache.camel.kuracomponent.spark.KuraRouter
class:RddCallback
class (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface RddCallback<T> {
T onRdd(AbstractJavaRDDLike rdd, Object... payloads);
} |
The following snippet demonstrates how to send message as an input to the job and return results:
Code Block | ||||
---|---|---|---|---|
| ||||
String someInput = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#testFileRdd&rddCallback=#countLinesContaining", someInput, long.class); |
The RDD callback for the snippet above could look as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
@Bean RddCallback<Long> countLinesContaining() { return new RddCallback<Long>() { Long onRdd(AbstractJavaRDDLike rdd, Object... payloads)public class MyKuraRouter extends KuraRouter { @Override public void configure() throws Exception { from("timer:trigger"). String pattern = (String) payloads[0]; to("netty-http:http://app.mydatacenter.com/api" return rdd.filter({line -> line.contains(pattern)}).count(); } } |
Keep in mind that KuraRouter
implements the org.osgi.framework.BundleActivator
interface, so you need to register its start
and stop
lifecycle methods while creating Kura bundle component class.
...
}
} |
Deploying KuraRouter
Bundle containing your Kura router class should import the following packages in the OSGi manifest:
...