Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

While Spark component is primary designed to work as a long running job serving as an bridge between Spark cluster and the other endpoints, you can also use it as a fire-once short job.   

Running Spark in OSGi servers

Currently the Spark component doesn't support execution in the OSGi container. Spark has been designed to be executed as a fat jar, usually submitted as a job to a cluster. For those reasons running Spark in an OSGi server is at least challenging and is not support by Camel as well.

 

URI format

...

Currently the Spark component supports only producers - it it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job. 

 

Code Block
languagejava
titleSpark URI format
spark:{rdd|dataframe|hive}

 

 

RDD jobsjobs 

 

To invoke an RDD job, use the following URI:

 

Code Block
languagejava
titleSpark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation

 Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of org.apache.camel.component.spark.RddCallback class interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.

...

Code Block
languagejava
titleSpark RDD callback
public interface RddCallback<T> {
    T onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads);
}

...

Code Block
languagejava
titleCalling spark job
String someInputpattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#testFileRdd#myRdd&rddCallback=#countLinesContaining", someInputpattern, long.class);

The RDD callback for the snippet above registered as Spring bean could look as follows:

Code Block
languagejava
titleSpark RDD callback
@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

Deploying KuraRouter

Bundle containing your Kura router class should import the following packages in the OSGi manifestThe RDD definition in Spring could looks as follows:

Import-Package: org.osgi.framework;version="1.3.0", org.slf4j;version="1.6.4", org.apache.camel,org.apache.camel.impl,org.apache.camel.core.osgi,
Code Block
xmlxml
languagejava
titleSpark RDD definition
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}

 

RDD jobs options

OptionDescriptionDefault value
rddRDD instance (subclass of org.apache.spark.api.java.JavaRDDLike).null
rddCallbackInstance of org.apache.camel.

...

component.

...

spark.

...

Keep in mind that you don't have to import every Camel component bundle you plan to use in your routes, as Camel components are resolved as the services on the runtime level.

Before you deploy your router bundle, be sure that you have deployed (and started) the following Camel core bundles (using Kura GoGo shell)...

...

install file:///home/user/.m2/repository/org/apache/camel/camel-core/2.15.0/camel-core-2.15.0.jar
start <camel-core-bundle-id>
install file:///home/user/.m2/repository/org/apache/camel/camel-core-osgi/2.15.0/camel-core-osgi-2.15.0.jar
start <camel-core-osgi-bundle-id>
install file:///home/user/.m2/repository/org/apache/camel/camel-kura/2.15.0/camel-kura-2.15.0.jar 
start <camel-kura-bundle-id>
RddCallback interface.null

Void RDD callbacks

If your RDD callback doesn't return any value back to a Camel pipeline, you can either return null value or use VoidRddCallback base class:

Code Block
languagejava
titleSpark RDD definition
@Bean
RddCallback<Void> rddCallback() {
  return new VoidRddCallback() {
        @Override
    

...and all the components you plan to use in your routes:

...

install file:///home/user/.m2/repository/org/apache/camel/camel-stream/2.15.0/camel-stream-2.15.0.jar
start <camel-stream-bundle-id>

Then finally deploy your router bundle:

...

install file:///home/user/.m2/repository/com/example/myrouter/1.0/myrouter-1.0.jar
start <your-bundle-id>

KuraRouter utilities 

 Kura router base class provides many useful utilities. This section explores each of them.

SLF4J logger

Kura uses SLF4J facade for logging purposes. Protected member log returns SLF4J logger instance associated with the given Kura router.

Code Block
languagejava
public class MyKuraRouter extends KuraRouter {

    @Override
    public void configure() throws Exception {
		log.info("Configuring Camel routes!"doOnRdd(JavaRDDLike rdd, Object... payloads) {
            rdd.saveAsTextFile(output.getAbsolutePath());
        ...}
    };

}

CamelContext

Converting RDD callbacks

If you know what type of the input data will be sent to the RDD callback, you can use ConvertingRddCallback and let Camel to automatically convert incoming messages before inserting those into the callback:Protected member camelContext is the CamelContext associated with the given Kura router.

Code Block
languagejava
titleSpark RDD definition
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
  return new ConvertingRddCallback<Long>(context, int.class, int.class) {
        public class MyKuraRouter extends KuraRouter {

    @Override
            public voidLong configure() throws Exception {
		camelContext.getStatus();
doOnRdd(JavaRDDLike rdd, Object... payloads) {
                return rdd.count() * (int) payloads[0] * (int) payloads[1];
         ...   }
        };
    };
}

ProducerTemplate

Annotated RDD callbacks

Probably the easiest way to work with the RDD callbacks is to provide class with method marked with @RddCallback annotation:Protected member producerTemplate is the ProducerTemplate instance associated with the given Camel context.

Code Block
languagejava
titleAnnotated RDD callback definition
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
 
@Bean
RddCallback<Long> rddCallback() {
    return annotatedRddCallback(new MyTransformation());
}
 
...
 
import org.apache.camel.component.spark.annotation.RddCallback;
 
public class MyKuraRouterMyTransformation extends KuraRouter {
 
    @Override@RddCallback
     public void configure() throws Exception {
		producerTemplate.sendBody("jms:temperature", 22.0);
   		...long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }
 
}

...

Kura router comes with the lifecycle callbacks that can be used to customize the way the Camel router works. For example to configure the CamelContext instance associated with the router just before the former is started, override beforeStart method of the KuraRouter classIf you will pass CamelContext to the annotated RDD callback factory method, the created callback will be able to convert incoming payloads to match the parameters of the annotated method:

Code Block
languagejava
titleBody conversions for annotated RDD callbacks
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
 
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
    return annotatedRddCallback(new MyTransformation(), camelContext);
}
 
...

 
import org.apache.camel.component.spark.annotation.RddCallback;
 
public class MyTransformation {
 
    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int secondpublic class MyKuraRouter extends KuraRouter {
 
  ...

  protected void beforeStart(CamelContext camelContext) {
    OsgiDefaultCamelContext   osgiContext =return textFile.count(OsgiCamelContext) camelContext;
    osgiContext.setName("NameOfTheRouter");
  }

}

Loading XML routes from ConfigurationAdmin

Sometimes it is desired to read the XML definition of the routes from the server configuration. This a common scenario for IoT gateways where over-the-air redeployment cost may be significant. To address this requirement each KuraRouter looks for the kura.camel.BUNDLE-SYMBOLIC-NAME.route property from the kura.camel PID using the OSGi ConfigurationAdmin. This approach allows you to define Camel XML routes file per deployed KuraRouter. In order to update a route, just edit an appropriate configuration property and restart a bundle associated with it. The content of the kura.camel.BUNDLE-SYMBOLIC-NAME.route property is expected to be Camel XML route file, for example:

 * first * second;
    }
 
}
 
...
 
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);

 

DataFrame jobs

 

Instead of working with RDDs Spark component can work with DataFrames as well. 

To invoke an DataFrame job, use the following URI:
Code Block
languagejava
titleSpark RDD producer
spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation

 Where dataFrame option refers to the name of an DataFrame instance (instance of of org.apache.spark.sql.DataFrame) from a Camel registry, while dataFrameCallback refers to the implementation of org.apache.camel.component.spark.DataFrameCallback interface (also from a registry). DataFrame callback provides a single method used to apply incoming messages against the given DataFrame. Results of callback computations are saved as a body to an exchange.

Code Block
languagejava
titleSpark RDD callback
public interface DataFrameCallback<T> {
    T onDataFrame(DataFrame dataFrame, Object... payloads);
}

The following snippet demonstrates how to send message as an input to a job and return results:

Code Block
languagejava
titleCalling spark job
String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);

The DataFrame callback for the snippet above registered as Spring bean could look as follows:

Code Block
languagejava
titleSpark RDD callback
@Bean
RddCallback<Long> findCarWithModel() {
    return new DataFrameCallback<Long>() {
    	@Override
    	public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
        	String model = (String) payloads[0];
Code Block
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="loaded">
        <from uri="direct:bar"/>
        <to uri="mock:bar"/>
    </route>
</routes>

 

Deploying Kura router as a declarative OSGi service

If you would like to deploy your Kura router as a declarative OSGi service, you can use activate and deactivate methods provided by KuraRouter.

...

	return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
    	}
	};
}

The DataFrame definition in Spring could looks as follows:

Code Block
languagejava
titleSpark RDD definition
@Bean
DataFrame cars(HiveContext hiveContext) {
  	DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
 	jsonCars.registerTempTable("cars");
	return jsonCars;
}

 

DataFrame jobs options

OptionDescriptionDefault value
dataFrameDataFrame instance (subclass of org.apache.spark.sql.DataFrame).null
dataFrameCallbackInstance of org.apache.camel.component.spark.DataFrameCallback interface.null

 

Hive jobs

 Instead of working with RDDs or DataFrame Spark component can also receive Hive SQL queries as payloads. To send Hive query to Spark component, use the following URI:

Code Block
languagejava
titleSpark RDD producer
spark:hive

The following snippet demonstrates how to send message as an input to a job and return results:

Code Block
languagejava
titleCalling spark job
long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class);
List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class);

The table we want to execute query against should be registered in a HiveContext before we query it. For example in Spring such registration could look as follows:

Code Block
languagejava
titleSpark RDD definition
@Bean
DataFrame cars(HiveContext hiveContext) {
  	DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
 	jsonCars.registerTempTable("cars");
	return jsonCars;
}

 

Hive jobs options

OptionDescriptionDefault value
collectIndicates if results should be collected (as a list of org.apache.spark.sql.Row instances) or if count() should be called against those.true

 

Include Page
Endpoint See Also
Endpoint See Also