--- apiVersion: v1 kind: List items: - apiVersion: v1 kind: ConfigMap metadata: labels: funktion.fabric8.io/kind: Connector provider: fabric8 project: connector-spark version: 1.1.21 group: io.fabric8.funktion.connector name: spark data: deployment.yml: | --- apiVersion: extensions/v1beta1 kind: Deployment metadata: labels: funktion.fabric8.io/kind: Subscription connector: spark spec: replicas: 1 template: metadata: labels: funktion.fabric8.io/kind: Subscription connector: spark spec: containers: - image: fabric8/connector-spark:1.1.21 name: connector schema.yml: | --- component: kind: component scheme: spark syntax: spark:endpointType title: Apache Spark description: The spark component can be used to send RDD or DataFrame jobs to Apache Spark cluster. label: bigdata,iot deprecated: false async: false producerOnly: true javaType: org.apache.camel.component.spark.SparkComponent groupId: org.apache.camel artifactId: camel-spark version: 2.18.1 componentProperties: rdd: kind: property type: object javaType: org.apache.spark.api.java.JavaRDDLike deprecated: false secret: false description: RDD to compute against. rddCallback: kind: property type: object javaType: org.apache.camel.component.spark.RddCallback deprecated: false secret: false description: Function performing action against an RDD. properties: endpointType: kind: path group: producer required: true type: string javaType: org.apache.camel.component.spark.EndpointType enum: - rdd - dataframe - hive deprecated: false secret: false description: Type of the endpoint (rdd dataframe hive). collect: kind: parameter group: producer type: boolean javaType: boolean deprecated: false secret: false defaultValue: true description: Indicates if results should be collected or counted. dataFrame: kind: parameter group: producer type: object javaType: org.apache.spark.sql.DataFrame deprecated: false secret: false description: DataFrame to compute against. dataFrameCallback: kind: parameter group: producer type: object javaType: org.apache.camel.component.spark.DataFrameCallback deprecated: false secret: false description: Function performing action against an DataFrame. rdd: kind: parameter group: producer type: object javaType: org.apache.spark.api.java.JavaRDDLike deprecated: false secret: false description: RDD to compute against. rddCallback: kind: parameter group: producer type: object javaType: org.apache.camel.component.spark.RddCallback deprecated: false secret: false description: Function performing action against an RDD. synchronous: kind: parameter group: advanced label: advanced type: boolean javaType: boolean deprecated: false secret: false defaultValue: false description: Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). documentation.adoc: |+ [[ApacheSpark-ApacheSparkcomponent]] Apache Spark component ~~~~~~~~~~~~~~~~~~~~~~ INFO: Apache Spark component is available starting from Camel *2.17*. This documentation page covers the http://spark.apache.org/[Apache Spark] component for the Apache Camel. The main purpose of the Spark integration with Camel is to provide a bridge between Camel connectors and Spark tasks. In particular Camel connector provides a way to route message from various transports, dynamically choose a task to execute, use incoming message as input data for that task and finally deliver the results of the execution back to the Camel pipeline. [[ApacheSpark-Supportedarchitecturalstyles]] Supported architectural styles ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Spark component can be used as a driver application deployed into an application server (or executed as a fat jar). image:apache-spark.data/camel_spark_driver.png[image] + Spark component can also be submitted as a job directly into the Spark cluster. image:apache-spark.data/camel_spark_cluster.png[image] + While Spark component is primary designed to work as a _long running job_??serving as an bridge between Spark cluster and the other endpoints, you can also use it as a _fire-once_ short job. ???? [[ApacheSpark-RunningSparkinOSGiservers]] Running Spark in OSGi servers ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Currently the Spark component doesn't support execution in the OSGi container. Spark has been designed to be executed as a fat jar, usually submitted as a job to a cluster. For those reasons running Spark in an OSGi server is at least challenging and is not support by Camel as well. [[ApacheSpark-URIformat]] URI format ^^^^^^^^^^ Currently the Spark component supports only producers - it it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job. *Spark URI format* [source,java] -------------------------- spark:{rdd|dataframe|hive} -------------------------- [[ApacheSpark-options]] Spark options +++++++++++++ // component options: START The Apache Spark component supports 2 options which are listed below. {% raw %} [width="100%",cols="2,1m,7",options="header"] |======================================================================= | Name | Java Type | Description | rdd | JavaRDDLike | RDD to compute against. | rddCallback | RddCallback | Function performing action against an RDD. |======================================================================= {% endraw %} // component options: END // endpoint options: START The Apache Spark component supports 7 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] |======================================================================= | Name | Group | Default | Java Type | Description | endpointType | producer | | EndpointType | *Required* Type of the endpoint (rdd dataframe hive). | collect | producer | true | boolean | Indicates if results should be collected or counted. | dataFrame | producer | | DataFrame | DataFrame to compute against. | dataFrameCallback | producer | | DataFrameCallback | Function performing action against an DataFrame. | rdd | producer | | JavaRDDLike | RDD to compute against. | rddCallback | producer | | RddCallback | Function performing action against an RDD. | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). |======================================================================= {% endraw %} // endpoint options: END ?? [[ApacheSpark-RDDjobs]] RDD jobs?? ^^^^^^^^^ To invoke an RDD job, use the following URI: *Spark RDD producer* [source,java] ------------------------------------------------------ spark:rdd?rdd=#testFileRdd&rddCallback=#transformation ------------------------------------------------------ ??Where `rdd` option refers to the name of an RDD instance (subclass of `org.apache.spark.api.java.JavaRDDLike`) from a Camel registry, while `rddCallback` refers to the implementation of??`org.apache.camel.component.spark.RddCallback` interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange. *Spark RDD callback* [source,java] ------------------------------------------------- public interface RddCallback { T onRdd(JavaRDDLike rdd, Object... payloads); } ------------------------------------------------- The following snippet demonstrates how to send message as an input to the job and return results: *Calling spark job* [source,java] ------------------------------------------------------------------------------------------------------------------------------ String pattern = "job input"; long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class); ------------------------------------------------------------------------------------------------------------------------------ The RDD callback for the snippet above registered as Spring bean could look as follows: *Spark RDD callback* [source,java] ------------------------------------------------------------------------ @Bean RddCallback countLinesContaining() { return new RddCallback() { Long onRdd(JavaRDDLike rdd, Object... payloads) { String pattern = (String) payloads[0]; return rdd.filter({line -> line.contains(pattern)}).count(); } } } ------------------------------------------------------------------------ The RDD definition in Spring could looks as follows: *Spark RDD definition* [source,java] -------------------------------------------------- @Bean JavaRDDLike myRdd(JavaSparkContext sparkContext) { return sparkContext.textFile("testrdd.txt"); } -------------------------------------------------- [[ApacheSpark-VoidRDDcallbacks]] Void RDD callbacks ++++++++++++++++++ If your RDD callback doesn't return any value back to a Camel pipeline, you can either return `null` value or use??`VoidRddCallback` base class: *Spark RDD definition* [source,java] ------------------------------------------------------------------ @Bean RddCallback rddCallback() { return new VoidRddCallback() { @Override public void doOnRdd(JavaRDDLike rdd, Object... payloads) { rdd.saveAsTextFile(output.getAbsolutePath()); } }; } ------------------------------------------------------------------ [[ApacheSpark-ConvertingRDDcallbacks]] Converting RDD callbacks ++++++++++++++++++++++++ If you know what type of the input data will be sent to the RDD callback, you can use??`ConvertingRddCallback` and let Camel to automatically convert incoming messages before inserting those into the callback: *Spark RDD definition* [source,java] --------------------------------------------------------------------------- @Bean RddCallback rddCallback(CamelContext context) { return new ConvertingRddCallback(context, int.class, int.class) { @Override public Long doOnRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }; }; } --------------------------------------------------------------------------- [[ApacheSpark-AnnotatedRDDcallbacks]] Annotated RDD callbacks +++++++++++++++++++++++ Probably the easiest way to work with the RDD callbacks is to provide class with method marked with??`@RddCallback` annotation: *Annotated RDD callback definition* [source,java] ----------------------------------------------------------------------------------------------------- import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; ?? @Bean RddCallback rddCallback() { return annotatedRddCallback(new MyTransformation()); } ?? ... ?? import org.apache.camel.component.spark.annotation.RddCallback; ?? public class MyTransformation { ?? @RddCallback long countLines(JavaRDD textFile, int first, int second) { return textFile.count() * first * second; } ?? } ----------------------------------------------------------------------------------------------------- If you will pass CamelContext to the annotated RDD callback factory method, the created callback will be able to convert incoming payloads to match the parameters of the annotated method: *Body conversions for annotated RDD callbacks* [source,java] ------------------------------------------------------------------------------------------------------------------------------ import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; ?? @Bean RddCallback rddCallback(CamelContext camelContext) { return annotatedRddCallback(new MyTransformation(), camelContext); } ?? ... ?? import org.apache.camel.component.spark.annotation.RddCallback; ?? public class MyTransformation { ?? @RddCallback long countLines(JavaRDD textFile, int first, int second) { return textFile.count() * first * second; } ?? } ?? ... ?? // Convert String "10" to integer long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class); ------------------------------------------------------------------------------------------------------------------------------ ?? [[ApacheSpark-DataFramejobs]] DataFrame jobs ^^^^^^^^^^^^^^ Instead of working with RDDs Spark component can work with DataFrames as well.?? To invoke an DataFrame job, use the following URI: *Spark RDD producer* [source,java] -------------------------------------------------------------------------- spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation -------------------------------------------------------------------------- ??Where??`dataFrame`??option refers to the name of an DataFrame instance (`instance of of??org.apache.spark.sql.DataFrame`) from a Camel registry, while??`dataFrameCallback`??refers to the implementation of??`org.apache.camel.component.spark.DataFrameCallback`??interface (also from a registry). DataFrame callback provides a single method used to apply incoming messages against the given DataFrame. Results of callback computations are saved as a body to an exchange. *Spark RDD callback* [source,java] ----------------------------------------------------------- public interface DataFrameCallback { T onDataFrame(DataFrame dataFrame, Object... payloads); } ----------------------------------------------------------- The following snippet demonstrates how to send message as an input to a job and return results: *Calling spark job* [source,java] ----------------------------------------------------------------------------------------------------------------------------------------- String model = "Micra"; long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class); ----------------------------------------------------------------------------------------------------------------------------------------- The DataFrame callback for the snippet above registered as Spring bean could look as follows: *Spark RDD callback* [source,java] ------------------------------------------------------------------------------------- @Bean RddCallback findCarWithModel() { return new DataFrameCallback() { @Override public Long onDataFrame(DataFrame dataFrame, Object... payloads) { String model = (String) payloads[0]; return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count(); } }; } ------------------------------------------------------------------------------------- The DataFrame definition in Spring could looks as follows: *Spark RDD definition* [source,java] ------------------------------------------------------------------------ @Bean DataFrame cars(HiveContext hiveContext) { DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json"); jsonCars.registerTempTable("cars"); return jsonCars; } ------------------------------------------------------------------------ [[ApacheSpark-Hivejobs]] Hive jobs ^^^^^^^^^ ??Instead of working with RDDs or DataFrame Spark component can also receive Hive SQL queries as payloads.??To send Hive query to Spark component, use the following URI: *Spark RDD producer* [source,java] ---------- spark:hive ---------- The following snippet demonstrates how to send message as an input to a job and return results: *Calling spark job* [source,java] ---------------------------------------------------------------------------------------------------- long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class); List cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class); ---------------------------------------------------------------------------------------------------- The table we want to execute query against should be registered in a HiveContext before we query it. For example in Spring such registration could look as follows: *Spark RDD definition* [source,java] ------------------------------------------------------------------------ @Bean DataFrame cars(HiveContext hiveContext) { DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json"); jsonCars.registerTempTable("cars"); return jsonCars; } ------------------------------------------------------------------------ [[ApacheSpark-SeeAlso]] See Also ^^^^^^^^ * link:configuring-camel.html[Configuring Camel] * link:component.html[Component] * link:endpoint.html[Endpoint] * link:getting-started.html[Getting Started]