Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of org.apache.camel.component.spark.RddCallback interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.

Code Block
languagejava
titleSpark RDD callback
public interface RddCallback<T> {
    T onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads);
}

...

Code Block
languagejava
titleSpark RDD callback
@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

...

Code Block
languagejava
titleSpark RDD definition
@Bean
AbstractJavaRDDLikeJavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}

...

OptionDescriptionDefault value
rddRDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLikeJavaRDDLike).null
rddCallbackInstance of org.apache.camel.component.spark.RddCallback interface.null

...

Code Block
languagejava
titleSpark RDD definition
@Bean
RddCallback<Void> rddCallback() {
  return new VoidRddCallback() {
        @Override
        public void doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
            rdd.saveAsTextFile(output.getAbsolutePath());
        }
    };
}

...

Code Block
languagejava
titleSpark RDD definition
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
  return new ConvertingRddCallback<Long>(context, int.class, int.class) {
            @Override
            public Long doOnRdd(AbstractJavaRDDLikeJavaRDDLike rdd, Object... payloads) {
                return rdd.count() * (int) payloads[0] * (int) payloads[1];
            }
        };
    };
}

...