Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of org.apache.camel.component.spark.RddCallback class interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.

...

Code Block
languagejava
titleCalling spark job
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?myRddrdd=#testFileRdd#myRdd&rddCallback=#countLinesContaining", pattern, long.class);

...

Code Block
languagejava
titleBody conversions for annotated RDD callbacks
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
 
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
    return annotatedRddCallback(new MyTransformation(), camelContext);
}
 
...

 
import org.apache.camel.component.spark.annotation.RddCallback;
 
public class MyTransformation {
 
    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }
 
}
 
...
 
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);

 

DataFrame jobs

 

Instead of working with RDDs Spark component can work with DataFrames as well. 

To invoke an DataFrame job, use the following URI:
Code Block
languagejava
titleSpark RDD producer
spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation

 Where dataFrame option refers to the name of an DataFrame instance (instance of of org.apache.spark.sql.DataFrame) from a Camel registry, while dataFrameCallback refers to the implementation of org.apache.camel.component.spark.DataFrameCallback interface (also from a registry). DataFrame callback provides a single method used to apply incoming messages against the given DataFrame. Results of callback computations are saved as a body to an exchange.

Code Block
languagejava
titleSpark RDD callback
public interface DataFrameCallback<T> {
    T onDataFrame(DataFrame dataFrame, Object... payloads);
}

The following snippet demonstrates how to send message as an input to a job and return results:

Code Block
languagejava
titleCalling spark job
String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);

The DataFrame callback for the snippet above registered as Spring bean could look as follows:

Code Block
languagejava
titleSpark RDD callback
@Bean
RddCallback<Long> findCarWithModel() {
    return new DataFrameCallback<Long>() {
    	@Override
    	public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
        	String model = (String) payloads[0];
        	return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
    	}
	};
}

The DataFrame definition in Spring could looks as follows:

Code Block
languagejava
titleSpark RDD definition
@Bean
DataFrame cars(HiveContext hiveContext) {
  	DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
 	jsonCars.registerTempTable("cars");
	return jsonCars;
}

 

DataFrame jobs options

OptionDescriptionDefault value
dataFrameDataFrame instance (subclass of org.apache.spark.sql.DataFrame).null
dataFrameCallbackInstance of org.apache.camel.component.spark.DataFrameCallback interface.null

 

Include Page
Endpoint See Also
Endpoint See Also