You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Apache Spark component

 Apache Spark component is available starting from Camel 2.17.

 

This documentation page covers the Apache Spark component for the Apache Camel. The main purpose of the Spark integration with Camel is to provide a bridge between Camel connectors and Spark tasks. In particular Camel connector provides a way to route message from various transports, dynamically choose a task to execute, use incoming message as input data for that task and finally deliver the results of the execution back to the Camel pipeline.

Supported architectural styles

Spark component can be used as a driver application deployed into an application server (or executed as a fat jar).


Spark component can also be submitted as a job directly into the Spark cluster.


While Spark component is primary designed to work as a long running job serving as an bridge between Spark cluster and the other endpoints, you can also use it as a fire-once short job.  

 

Running Spark in OSGi servers

Currently the Spark component doesn't support execution in the OSGi container. Spark has been designed to be executed as a fat jar, usually submitted as a job to a cluster. For those reasons running Spark in an OSGi server is at least challenging and is not support by Camel as well.

 

URI format

 

Currently the Spark component supports only producers - it it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job.

 

 

Spark URI format
spark:{rdd|dataframe|hive}

 

RDD jobs 

To invoke an RDD job, use the following URI:
Spark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation

 Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of org.apache.camel.component.spark.RddCallback class (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.

Spark RDD callback
public interface RddCallback<T> {
    T onRdd(AbstractJavaRDDLike rdd, Object... payloads);
}

The following snippet demonstrates how to send message as an input to the job and return results:

Calling spark job
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?myRdd=#testFileRdd&rddCallback=#countLinesContaining", pattern, long.class);

The RDD callback for the snippet above registered as Spring bean could look as follows:

Spark RDD callback
@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

The RDD definition in Spring could looks as follows:

Spark RDD definition
@Bean
AbstractJavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}

Deploying KuraRouter

Bundle containing your Kura router class should import the following packages in the OSGi manifest:

Import-Package: org.osgi.framework;version="1.3.0",
  org.slf4j;version="1.6.4",
  org.apache.camel,org.apache.camel.impl,org.apache.camel.core.osgi,org.apache.camel.builder,org.apache.camel.model,
  org.apache.camel.component.kura

Keep in mind that you don't have to import every Camel component bundle you plan to use in your routes, as Camel components are resolved as the services on the runtime level.

Before you deploy your router bundle, be sure that you have deployed (and started) the following Camel core bundles (using Kura GoGo shell)...

install file:///home/user/.m2/repository/org/apache/camel/camel-core/2.15.0/camel-core-2.15.0.jar
start <camel-core-bundle-id>
install file:///home/user/.m2/repository/org/apache/camel/camel-core-osgi/2.15.0/camel-core-osgi-2.15.0.jar
start <camel-core-osgi-bundle-id>
install file:///home/user/.m2/repository/org/apache/camel/camel-kura/2.15.0/camel-kura-2.15.0.jar 
start <camel-kura-bundle-id>

...and all the components you plan to use in your routes:

install file:///home/user/.m2/repository/org/apache/camel/camel-stream/2.15.0/camel-stream-2.15.0.jar
start <camel-stream-bundle-id>

Then finally deploy your router bundle:

install file:///home/user/.m2/repository/com/example/myrouter/1.0/myrouter-1.0.jar
start <your-bundle-id>

KuraRouter utilities 

 Kura router base class provides many useful utilities. This section explores each of them.

SLF4J logger

Kura uses SLF4J facade for logging purposes. Protected member log returns SLF4J logger instance associated with the given Kura router.

public class MyKuraRouter extends KuraRouter {

    @Override
    public void configure() throws Exception {
		log.info("Configuring Camel routes!");
        ...
    }

}

CamelContext

Protected member camelContext is the CamelContext associated with the given Kura router.

public class MyKuraRouter extends KuraRouter {

    @Override
    public void configure() throws Exception {
		camelContext.getStatus();
        ...
    }

}

ProducerTemplate

Protected member producerTemplate is the ProducerTemplate instance associated with the given Camel context.

public class MyKuraRouter extends KuraRouter {

    @Override
    public void configure() throws Exception {
		producerTemplate.sendBody("jms:temperature", 22.0);
   		...
    }

}


KuraRouter activator callbacks

Kura router comes with the lifecycle callbacks that can be used to customize the way the Camel router works. For example to configure the CamelContext instance associated with the router just before the former is started, override beforeStart method of the KuraRouter class:

public class MyKuraRouter extends KuraRouter {
 
  ...

  protected void beforeStart(CamelContext camelContext) {
    OsgiDefaultCamelContext osgiContext = (OsgiCamelContext) camelContext;
    osgiContext.setName("NameOfTheRouter");
  }

}

Loading XML routes from ConfigurationAdmin

Sometimes it is desired to read the XML definition of the routes from the server configuration. This a common scenario for IoT gateways where over-the-air redeployment cost may be significant. To address this requirement each KuraRouter looks for the kura.camel.BUNDLE-SYMBOLIC-NAME.route property from the kura.camel PID using the OSGi ConfigurationAdmin. This approach allows you to define Camel XML routes file per deployed KuraRouter. In order to update a route, just edit an appropriate configuration property and restart a bundle associated with it. The content of the kura.camel.BUNDLE-SYMBOLIC-NAME.route property is expected to be Camel XML route file, for example:

<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="loaded">
        <from uri="direct:bar"/>
        <to uri="mock:bar"/>
    </route>
</routes>

 

Deploying Kura router as a declarative OSGi service

If you would like to deploy your Kura router as a declarative OSGi service, you can use activate and deactivate methods provided by KuraRouter.

<scr:component name="org.eclipse.kura.example.camel.MyKuraRouter" activate="activate" deactivate="deactivate" enabled="true" immediate="true">
  <implementation class="org.eclipse.kura.example.camel.MyKuraRouter"/>
</scr:component>

  • No labels