Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

While Spark component is primary designed to work as a long running job serving as an bridge between Spark cluster and the other endpoints, you can also use it as a fire-once short job.  

 

Running Spark in OSGi servers

 

Currently the Spark component doesn't support execution in the OSGi container. Spark has been designed to be executed as a fat jar, usually submitted as a job to a cluster. For those reasons running Spark in an OSGi server is at least challenging and is not support by Camel as well.

KuraRouter activator

 

URI format

 

Currently the Spark component supports only producers - it it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job.

 

 

Code Block
languagejava
titleSpark URI format
spark:{rdd|dataframe|hive}

 

 

RDD jobs

 

To invoke an RDD job, use the following URI:

 

Code Block
languagejava
titleSpark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation

 Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of Bundles deployed to the Eclipse Kura are usually developed as bundle activators. So the easiest way to deploy Apache Camel routes into the Kura is to create an OSGi bundle containing the class extending org.apache.camel.kuracomponent.spark.KuraRouter class:RddCallback class (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.

 

Code Block
languagejava
titleSpark RDD callback
public interface RddCallback<T> {
    T onRdd(AbstractJavaRDDLike rdd, Object... payloads);
}

The following snippet demonstrates how to send message as an input to the job and return results:

Code Block
languagejava
titleCalling spark job
String someInput = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#testFileRdd&rddCallback=#countLinesContaining", someInput, long.class);

The RDD callback for the snippet above could look as follows:

Code Block
languagejava
titleSpark RDD callback
@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(AbstractJavaRDDLike rdd, Object... payloads)public class MyKuraRouter extends KuraRouter {

  @Override
  public void configure() throws Exception {
    from("timer:trigger").        String pattern = (String) payloads[0];
      to("netty-http:http://app.mydatacenter.com/api"      return rdd.filter({line -> line.contains(pattern)}).count();
        }

}

Keep in mind that KuraRouter implements the org.osgi.framework.BundleActivator interface, so you need to register its start and stop lifecycle methods while creating Kura bundle component class.

...

    }
}

Deploying KuraRouter

Bundle containing your Kura router class should import the following packages in the OSGi manifest:

...