---
apiVersion: v1
kind: List
items:
- apiVersion: v1
kind: ConfigMap
metadata:
labels:
funktion.fabric8.io/kind: Connector
provider: fabric8
project: connector-zookeeper
version: 1.1.37
group: io.fabric8.funktion.connector
name: zookeeper
data:
deployment.yml: |
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
funktion.fabric8.io/kind: Subscription
connector: zookeeper
spec:
replicas: 1
template:
metadata:
labels:
funktion.fabric8.io/kind: Subscription
connector: zookeeper
spec:
containers:
- image: funktion/connector-zookeeper:1.1.37
name: connector
schema.yml: |
---
component:
kind: component
scheme: zookeeper
syntax: zookeeper:serverUrls/path
title: ZooKeeper
description: The zookeeper component allows interaction with a ZooKeeper cluster.
label: clustering
deprecated: false
async: false
javaType: org.apache.camel.component.zookeeper.ZooKeeperComponent
groupId: org.apache.camel
artifactId: camel-zookeeper
version: 2.18.1
componentProperties:
configuration:
kind: property
type: object
javaType: org.apache.camel.component.zookeeper.ZooKeeperConfiguration
deprecated: false
secret: false
description: To use a shared ZooKeeperConfiguration
order: 0
properties:
serverUrls:
kind: path
group: common
required: true
type: string
javaType: java.lang.String
deprecated: false
secret: false
description: The zookeeper server hosts (multiple servers can be separated by comma)
order: 0
path:
kind: path
group: common
required: true
type: string
javaType: java.lang.String
deprecated: false
secret: false
description: The node in the ZooKeeper server (aka znode)
order: 1
awaitExistence:
kind: parameter
group: common
type: boolean
javaType: boolean
deprecated: true
secret: false
defaultValue: true
description: Not in use
order: 2
listChildren:
kind: parameter
group: common
type: boolean
javaType: boolean
deprecated: false
secret: false
defaultValue: false
description: Whether the children of the node should be listed
order: 3
timeout:
kind: parameter
group: common
type: integer
javaType: int
deprecated: false
secret: false
defaultValue: "5000"
description: The time interval to wait on connection before timing out.
order: 4
backoff:
kind: parameter
group: consumer
label: consumer
type: integer
javaType: long
deprecated: false
secret: false
defaultValue: "5000"
description: The time interval to backoff for after an error before retrying.
order: 5
bridgeErrorHandler:
kind: parameter
group: consumer
label: consumer
type: boolean
javaType: boolean
optionalPrefix: consumer.
deprecated: false
secret: false
defaultValue: false
description: Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
order: 6
repeat:
kind: parameter
group: consumer
label: consumer
type: boolean
javaType: boolean
deprecated: false
secret: false
defaultValue: false
description: Should changes to the znode be 'watched' and repeatedly processed.
order: 7
sendEmptyMessageOnDelete:
kind: parameter
group: consumer
label: consumer
type: boolean
javaType: boolean
deprecated: false
secret: false
defaultValue: true
description: Upon the delete of a znode should an empty message be send to the consumer
order: 8
exceptionHandler:
kind: parameter
group: consumer (advanced)
label: consumer,advanced
type: object
javaType: org.apache.camel.spi.ExceptionHandler
optionalPrefix: consumer.
deprecated: false
secret: false
description: To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
order: 9
exchangePattern:
kind: parameter
group: consumer (advanced)
label: consumer,advanced
type: string
javaType: org.apache.camel.ExchangePattern
enum:
- InOnly
- RobustInOnly
- InOut
- InOptionalOut
- OutOnly
- RobustOutOnly
- OutIn
- OutOptionalIn
deprecated: false
secret: false
description: Sets the exchange pattern when the consumer creates an exchange.
order: 10
create:
kind: parameter
group: producer
label: producer
type: boolean
javaType: boolean
deprecated: false
secret: false
defaultValue: false
description: Should the endpoint create the node if it does not currently exist.
order: 11
createMode:
kind: parameter
group: producer
label: producer
type: string
javaType: java.lang.String
enum:
- PERSISTENT
- PERSISTENT_SEQUENTIAL
- EPHEMERAL
- EPHEMERAL_SEQUENTIAL
deprecated: false
secret: false
defaultValue: EPHEMERAL
description: The create mode that should be used for the newly created node
order: 12
synchronous:
kind: parameter
group: advanced
label: advanced
type: boolean
javaType: boolean
deprecated: false
secret: false
defaultValue: false
description: Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
order: 13
documentation.adoc: |
ifdef::env-github[]
:caution-caption: :boom:
:important-caption: :exclamation:
:note-caption: :information_source:
:tip-caption: :bulb:
:warning-caption: :warning:
endif::[]
[[Zookeeper-ZooKeeper]]
ZooKeeper
~~~~~~~~~
*Available as of Camel 2.9*
The ZooKeeper component allows interaction with a
http://hadoop.apache.org/zookeeper/[ZooKeeper] cluster and exposes the
following features to Camel:
1. Creation of nodes in any of the ZooKeeper create modes.
2. Get and Set the data contents of arbitrary cluster nodes (data
being set must be convertible to `byte[]`).
3. Create and retrieve the list the child nodes attached to a
particular node.
4. A Distributed link:routepolicy.html[`RoutePolicy`] that leverages a
Leader election coordinated by ZooKeeper to determine if exchanges
should get processed.
Maven users will need to add the following dependency to their `pom.xml`
for this component:
[source,xml]
----
org.apache.camel
camel-zookeeper
x.x.x
----
[[Zookeeper-URIformat]]
URI format
^^^^^^^^^^
[source]
----
zookeeper://zookeeper-server[:port][/path][?options]
----
The path from the URI specifies the node in the ZooKeeper server (a.k.a.
_znode_) that will be the target of the endpoint:
[[Zookeeper-Options]]
Options
^^^^^^^
// component options: START
The ZooKeeper component supports 1 options which are listed below.
{% raw %}
[width="100%",cols="2,1m,7",options="header"]
|=======================================================================
| Name | Java Type | Description
| configuration | ZooKeeperConfiguration | To use a shared ZooKeeperConfiguration
|=======================================================================
{% endraw %}
// component options: END
// endpoint options: START
The ZooKeeper component supports 14 endpoint options which are listed below:
{% raw %}
[width="100%",cols="2,1,1m,1m,5",options="header"]
|=======================================================================
| Name | Group | Default | Java Type | Description
| serverUrls | common | | String | *Required* The zookeeper server hosts (multiple servers can be separated by comma)
| path | common | | String | *Required* The node in the ZooKeeper server (aka znode)
| awaitExistence | common | true | boolean | Not in use
| listChildren | common | false | boolean | Whether the children of the node should be listed
| timeout | common | 5000 | int | The time interval to wait on connection before timing out.
| backoff | consumer | 5000 | long | The time interval to backoff for after an error before retrying.
| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
| repeat | consumer | false | boolean | Should changes to the znode be 'watched' and repeatedly processed.
| sendEmptyMessageOnDelete | consumer | true | boolean | Upon the delete of a znode should an empty message be send to the consumer
| exceptionHandler | consumer (advanced) | | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
| exchangePattern | consumer (advanced) | | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.
| create | producer | false | boolean | Should the endpoint create the node if it does not currently exist.
| createMode | producer | EPHEMERAL | String | The create mode that should be used for the newly created node
| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
|=======================================================================
{% endraw %}
// endpoint options: END
[[Zookeeper-Usecases]]
Use cases
^^^^^^^^^
[[Zookeeper-Readingfromaznode]]
Reading from a _znode_
+++++++++++++++++++++
The following snippet will read the data from the _znode_
`/somepath/somenode/` provided that it already exists. The data
retrieved will be placed into an exchange and passed onto
the rest of the route:
[source,java]
----
from("zookeeper://localhost:39913/somepath/somenode").to("mock:result");
----
If the node does not yet exist then a flag can be supplied to have the
endpoint await its creation:
[source,java]
----
from("zookeeper://localhost:39913/somepath/somenode?awaitCreation=true").to("mock:result");
----
[[Zookeeper-ReadingfromaznodeAdditionalCamel210onwards]]
Reading from a _znode_ (additional Camel 2.10 onwards)
++++++++++++++++++++++++++++++++++++++++++++++++++++++
When data is read due to a `WatchedEvent` received from the ZooKeeper
ensemble, the `CamelZookeeperEventType` header holds ZooKeeper's
http://zookeeper.apache.org/doc/current/api/org/apache/zookeeper/Watcher.Event.EventType.html[`EventType`]
value from that `WatchedEvent`. If the data is read initially (not
triggered by a `WatchedEvent`) the `CamelZookeeperEventType` header will not
be set.
[[Zookeeper-Writingtoaznode]]
Writing to a _znode_
++++++++++++++++++++
The following snippet will write the payload of the exchange into the
znode at `/somepath/somenode/` provided that it already exists:
[source,java]
----
from("direct:write-to-znode")
.to("zookeeper://localhost:39913/somepath/somenode");
----
For flexibility, the endpoint allows the target _znode_ to be specified
dynamically as a message header. If a header keyed by the string
`CamelZooKeeperNode` is present then the value of the header will be
used as the path to the _znode_ on the server. For instance using the same
route definition above, the following code snippet will write the data
not to `/somepath/somenode` but to the path from the header
`/somepath/someothernode`.
WARNING: the `testPayload` must be convertible
to `byte[]` as the data stored in ZooKeeper is byte based.
[source,java]
----
Object testPayload = ...
template.sendBodyAndHeader("direct:write-to-znode", testPayload, "CamelZooKeeperNode", "/somepath/someothernode");
----
To also create the node if it does not exist the `create` option should
be used.
[source,java]
----
from("direct:create-and-write-to-znode")
.to("zookeeper://localhost:39913/somepath/somenode?create=true");
----
Starting *version 2.11* it is also possible to *delete* a node using the
header `CamelZookeeperOperation` by setting it to `DELETE`:
[source,java]
----
from("direct:delete-znode")
.setHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, constant("DELETE"))
.to("zookeeper://localhost:39913/somepath/somenode");
----
or equivalently:
[source,xml]
----
DELETE
----
ZooKeeper nodes can have different types; they can be 'Ephemeral' or
'Persistent' and 'Sequenced' or 'Unsequenced'. For further information
of each type you can check
http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#Ephemeral+Nodes[here].
By default endpoints will create unsequenced, ephemeral nodes, but the
type can be easily manipulated via a uri config parameter or via a
special message header. The values expected for the create mode are
simply the names from the `CreateMode` enumeration:
* `PERSISTENT`
* `PERSISTENT_SEQUENTIAL`
* `EPHEMERAL`
* `EPHEMERAL_SEQUENTIAL`
For example to create a persistent _znode_ via the URI config:
[source,java]
----
from("direct:create-and-write-to-persistent-znode")
.to("zookeeper://localhost:39913/somepath/somenode?create=true&createMode=PERSISTENT");
----
or using the header `CamelZookeeperCreateMode`.
WARNING: the `testPayload` must be convertible to `byte[]` as the data stored in
ZooKeeper is byte based.
[source,java]
----
Object testPayload = ...
template.sendBodyAndHeader("direct:create-and-write-to-persistent-znode", testPayload, "CamelZooKeeperCreateMode", "PERSISTENT");
----
[[Zookeeper-ZooKeeperenabledRoutepolicy]]
ZooKeeper enabled Route policy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ZooKeeper allows for very simple and effective leader election out of
the box. This component exploits this election capability in a
link:routepolicy.html[`RoutePolicy`] to control when and how routes are
enabled. This policy would typically be used in fail-over scenarios, to
control identical instances of a route across a cluster of Camel based
servers. A very common scenario is a simple 'Master-Slave' setup where
there are multiple instances of a route distributed across a cluster but
only one of them, that of the master, should be running at a time. If
the master fails, a new master should be elected from the available
slaves and the route in this new master should be started.
The policy uses a common _znode_ path across all instances of the
`RoutePolicy` that will be involved in the election. Each policy writes
its id into this node and Zookeeper will order the writes in the order
it received them. The policy then reads the listing of the node to see
what position of its id; this position is used to determine if the route
should be started or not. The policy is configured at startup with the
number of route instances that should be started across the cluster and
if its position in the list is less than this value then its route will
be started. For a Master-slave scenario, the route is configured with 1
route instance and only the first entry in the listing will start its
route. All policies watch for updates to the listing and if the listing
changes they recalculate if their route should be started. For more info
on Zookeeper's leader election capability see
http://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection[this
page].
The following example uses the node `/someapplication/somepolicy` for
the election and is set up to start only the top '1' entries in the node
listing i.e. elect a master:
[source,java]
----
ZooKeeperRoutePolicy policy = new ZooKeeperRoutePolicy("zookeeper:localhost:39913/someapp/somepolicy", 1);
from("direct:policy-controlled")
.routePolicy(policy)
.to("mock:controlled");
----
[[Zookeeper-SeeAlso]]
See Also
^^^^^^^^
* link:configuring-camel.html[Configuring Camel]
* link:component.html[Component]
* link:endpoint.html[Endpoint]
* link:getting-started.html[Getting Started]