Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of org.apache.camel.component.spark.RddCallback class interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.

Code Block
languagejava
titleSpark RDD callback
public interface RddCallback<T> {
    T onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads);
}

...

Code Block
languagejava
titleCalling spark job
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?myRddrdd=#testFileRdd#myRdd&rddCallback=#countLinesContaining", pattern, long.class);

...

Code Block
languagejava
titleSpark RDD callback
@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

...

Code Block
languagejava
titleSpark RDD definition
@Bean
AbstractJavaRDDLikeJavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}

...

OptionDescriptionDefault value
rddRDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike).null
rddCallbackInstance of org.apache.camel.component.spark.RddCallback interface.null

...

Code Block
languagejava
titleSpark RDD definition
@Bean
RddCallback<Void> rddCallback() {
  return new VoidRddCallback() {
        @Override
        public void doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
            rdd.saveAsTextFile(output.getAbsolutePath());
        }
    };
}

...

Code Block
languagejava
titleSpark RDD definition
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
  return new ConvertingRddCallback<Long>(context, int.class, int.class) {
            @Override
            public Long doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
                return rdd.count() * (int) payloads[0] * (int) payloads[1];
            }
        };
    };
}

...

Code Block
languagejava
titleBody conversions for annotated RDD callbacks
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
 
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
    return annotatedRddCallback(new MyTransformation(), camelContext);
}
 
...

 
import org.apache.camel.component.spark.annotation.RddCallback;
 
public class MyTransformation {
 
    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }
 
}
 
...
 
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);

 

DataFrame jobs

 

Instead of working with RDDs Spark component can work with DataFrames as well. 

To invoke an DataFrame job, use the following URI:
Code Block
languagejava
titleSpark RDD producer
spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation

 Where dataFrame option refers to the name of an DataFrame instance (instance of of org.apache.spark.sql.DataFrame) from a Camel registry, while dataFrameCallback refers to the implementation of org.apache.camel.component.spark.DataFrameCallback interface (also from a registry). DataFrame callback provides a single method used to apply incoming messages against the given DataFrame. Results of callback computations are saved as a body to an exchange.

Code Block
languagejava
titleSpark RDD callback
public interface DataFrameCallback<T> {
    T onDataFrame(DataFrame dataFrame, Object... payloads);
}

The following snippet demonstrates how to send message as an input to a job and return results:

Code Block
languagejava
titleCalling spark job
String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);

The DataFrame callback for the snippet above registered as Spring bean could look as follows:

Code Block
languagejava
titleSpark RDD callback
@Bean
RddCallback<Long> findCarWithModel() {
    return new DataFrameCallback<Long>() {
    	@Override
    	public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
        	String model = (String) payloads[0];
        	return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
    	}
	};
}

The DataFrame definition in Spring could looks as follows:

Code Block
languagejava
titleSpark RDD definition
@Bean
DataFrame cars(HiveContext hiveContext) {
  	DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
 	jsonCars.registerTempTable("cars");
	return jsonCars;
}

 

DataFrame jobs options

OptionDescriptionDefault value
dataFrameDataFrame instance (subclass of org.apache.spark.sql.DataFrame).null
dataFrameCallbackInstance of org.apache.camel.component.spark.DataFrameCallback interface.null

 

Hive jobs

 Instead of working with RDDs or DataFrame Spark component can also receive Hive SQL queries as payloads. To send Hive query to Spark component, use the following URI:

Code Block
languagejava
titleSpark RDD producer
spark:hive

The following snippet demonstrates how to send message as an input to a job and return results:

Code Block
languagejava
titleCalling spark job
long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class);
List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class);

The table we want to execute query against should be registered in a HiveContext before we query it. For example in Spring such registration could look as follows:

Code Block
languagejava
titleSpark RDD definition
@Bean
DataFrame cars(HiveContext hiveContext) {
  	DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
 	jsonCars.registerTempTable("cars");
	return jsonCars;
}

 

Hive jobs options

OptionDescriptionDefault value
collectIndicates if results should be collected (as a list of org.apache.spark.sql.Row instances) or if count() should be called against those.true

 

Include Page
Endpoint See Also
Endpoint See Also