---
apiVersion: v1
kind: List
items:
- apiVersion: v1
kind: ConfigMap
metadata:
labels:
funktion.fabric8.io/kind: Connector
provider: fabric8
project: connector-flink
version: 1.1.55
group: io.fabric8.funktion.connector
name: flink
data:
deployment.yml: |
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
funktion.fabric8.io/kind: Subscription
connector: flink
spec:
replicas: 1
template:
metadata:
labels:
funktion.fabric8.io/kind: Subscription
connector: flink
spec:
containers:
- image: funktion/connector-flink:1.1.55
name: connector
schema.yml: |
---
component:
kind: component
scheme: flink
syntax: flink:endpointType
title: Apache Flink
description: The flink component can be used to send DataSet jobs to Apache Flink cluster.
label: hadoop
deprecated: false
async: false
producerOnly: true
javaType: org.apache.camel.component.flink.FlinkComponent
groupId: org.apache.camel
artifactId: camel-flink
version: 2.18.1
componentProperties:
dataSet:
kind: property
type: object
javaType: org.apache.flink.api.java.DataSet
deprecated: false
secret: false
description: DataSet to compute against.
order: 0
dataStream:
kind: property
type: object
javaType: org.apache.flink.streaming.api.datastream.DataStream
deprecated: false
secret: false
description: DataStream to compute against.
order: 1
dataSetCallback:
kind: property
type: object
javaType: org.apache.camel.component.flink.DataSetCallback
deprecated: false
secret: false
description: Function performing action against a DataSet.
order: 2
dataStreamCallback:
kind: property
type: object
javaType: org.apache.camel.component.flink.DataStreamCallback
deprecated: false
secret: false
description: Function performing action against a DataStream.
order: 3
properties:
endpointType:
kind: path
group: producer
required: true
type: string
javaType: org.apache.camel.component.flink.EndpointType
enum:
- dataset
- datastream
deprecated: false
secret: false
description: Type of the endpoint (dataset datastream).
order: 0
collect:
kind: parameter
group: producer
type: boolean
javaType: boolean
deprecated: false
secret: false
defaultValue: true
description: Indicates if results should be collected or counted.
order: 1
dataSet:
kind: parameter
group: producer
type: object
javaType: org.apache.flink.api.java.DataSet
deprecated: false
secret: false
description: DataSet to compute against.
order: 2
dataSetCallback:
kind: parameter
group: producer
type: object
javaType: org.apache.camel.component.flink.DataSetCallback
deprecated: false
secret: false
description: Function performing action against a DataSet.
order: 3
dataStream:
kind: parameter
group: producer
type: object
javaType: org.apache.flink.streaming.api.datastream.DataStream
deprecated: false
secret: false
description: DataStream to compute against.
order: 4
dataStreamCallback:
kind: parameter
group: producer
type: object
javaType: org.apache.camel.component.flink.DataStreamCallback
deprecated: false
secret: false
description: Function performing action against a DataStream.
order: 5
synchronous:
kind: parameter
group: advanced
label: advanced
type: boolean
javaType: boolean
deprecated: false
secret: false
defaultValue: false
description: Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
order: 6
documentation.adoc: |+
[[camel-flink-CamelFlinkComponent]]
Camel Flink Component
~~~~~~~~~~~~~~~~~~~~~
*Available as of Camel 2.18*
This documentation page covers the https://flink.apache.org[Apache Flink]
component for the Apache Camel. The??*camel-flink*??component provides a
bridge between Camel connectors and Flink tasks. +
This Camel Flink connector provides a way to route message from various
transports, dynamically choosing a flink task to execute, use incoming
message as input data for the task and finally deliver the results back to
the Camel pipeline.
Maven users will need to add the following dependency to
their??`pom.xml`??for this component:
[source,xml]
------------------------------------------------------------
org.apache.camel
camel-flink
x.x.x
------------------------------------------------------------
[[camel-flink-URIFormat]]
URI Format
^^^^^^^^^^
Currently, the Flink Component supports only Producers. One can create DataSet, DataStream jobs.
[source,java]
-------------------------------------------------
flink:dataset?dataset=#myDataSet&dataSetCallback=#dataSetCallback
flink:datastream?datastream=#myDataStream&dataStreamCallback=#dataStreamCallback
-------------------------------------------------
[[Flink-FlinkEndpointOptions]]
FlinkEndpoint Options
// endpoint options: START
The Apache Flink component supports 7 endpoint options which are listed below:
{% raw %}
[width="100%",cols="2,1,1m,1m,5",options="header"]
|=======================================================================
| Name | Group | Default | Java Type | Description
| endpointType | producer | | EndpointType | *Required* Type of the endpoint (dataset datastream).
| collect | producer | true | boolean | Indicates if results should be collected or counted.
| dataSet | producer | | DataSet | DataSet to compute against.
| dataSetCallback | producer | | DataSetCallback | Function performing action against a DataSet.
| dataStream | producer | | DataStream | DataStream to compute against.
| dataStreamCallback | producer | | DataStreamCallback | Function performing action against a DataStream.
| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
|=======================================================================
{% endraw %}
// endpoint options: END
[[Flink-FlinkComponentOptions]]
FlinkComponent Options
^^^^^^^^^^^^^^^^^^^^
// component options: START
The Apache Flink component supports 4 options which are listed below.
{% raw %}
[width="100%",cols="2,1m,7",options="header"]
|=======================================================================
| Name | Java Type | Description
| dataSet | DataSet | DataSet to compute against.
| dataStream | DataStream | DataStream to compute against.
| dataSetCallback | DataSetCallback | Function performing action against a DataSet.
| dataStreamCallback | DataStreamCallback | Function performing action against a DataStream.
|=======================================================================
{% endraw %}
// component options: END
Flink DataSet Callback
^^^^^^^^^^^^^^^^^^^^^^
[source,java]
-----------------------------------
@Bean
public DataSetCallback dataSetCallback() {
return new DataSetCallback() {
public Long onDataSet(DataSet dataSet, Object... objects) {
try {
dataSet.print();
return new Long(0);
} catch (Exception e) {
return new Long(-1);
}
}
};
}
-----------------------------------
Flink DataStream Callback
^^^^^^^^^^^^^^^^^^^^^^^^^
[source,java]
---------------------------
@Bean
public VoidDataStreamCallback dataStreamCallback() {
return new VoidDataStreamCallback() {
@Override
public void doOnDataStream(DataStream dataStream, Object... objects) throws Exception {
dataStream.flatMap(new Splitter()).print();
environment.execute("data stream test");
}
};
}
---------------------------
Camel-Flink Producer call
^^^^^^^^^^^^^^^^^^^^^^^^^
[source,java]
-----------------------------------
CamelContext camelContext = new SpringCamelContext(context);
String pattern = "foo";
try {
ProducerTemplate template = camelContext.createProducerTemplate();
camelContext.start();
Long count = template.requestBody("flink:dataSet?dataSet=#myDataSet&dataSetCallback=#countLinesContaining", pattern, Long.class);
} finally {
camelContext.stop();
}
-----------------------------------
See Also
^^^^^^^^
* link:configuring-camel.html[Configuring Camel]
* link:component.html[Component]
* link:endpoint.html[Endpoint]
* link:getting-started.html[Getting Started]