ZooKeeper
Available as of Camel
...
2.9
...
The
...
ZooKeeper
...
component
...
allows
...
interaction
...
with
...
a
...
...
cluster
...
and
...
exposes
...
the
...
following
...
features
...
to
...
Camel:
...
- Creation
...
- of
...
- nodes
...
- in
...
- any
...
- of
...
- the
...
- ZooKeeper
...
- create
...
- modes.
...
- Get
...
- and
...
- Set
...
- the
...
- data
...
- contents
...
- of
...
- arbitrary
...
- cluster
...
- nodes.
...
- Create
...
- and
...
- retrieve
...
- the
...
- list
...
- the
...
- child
...
- nodes
...
- attached
...
- to
...
- a
...
- particular
...
- node.
...
- A
...
- Distributed
...
...
- that
...
- leverages
...
- a
...
- Leader
...
- election
...
- coordinated
...
- by
...
- ZooKeeper
...
- to
...
- determine
...
- if
...
- exchanges
...
- should
...
- get
...
- processed.
...
Maven
...
users
...
will
...
need
...
to
...
add
...
the
...
following
...
dependency
...
to
...
their
...
pom.xml
...
for
...
this
...
component:
Code Block | ||||
---|---|---|---|---|
| ||||
{code:xml} <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-zookeeper</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency> {code} h3. URI format {code} |
URI format
Code Block |
---|
zookeeper://zookeeper-server[:port][/path][?options]
{code}
|
The
...
path
...
from
...
the
...
uri
...
specifies
...
the
...
node
...
in
...
the
...
ZooKeeper
...
server
...
(aka
...
znode)
...
that
...
will
...
be
...
the
...
target
...
of
...
the
...
endpoint.
Options
Wiki Markup |
---|
h3. Options {div:class=confluenceTableSmall} || Name || Default Value || Description || | {{path}} | | The node in the ZooKeeper server (aka znode) | | {{listChildren}} | {{false}} | Whether the children of the node should be listed | | {{repeat}} | {{false}} | Should changes to the znode be 'watched' and repeatedly processed. | | {{backoff}} | {{5000}} | The time interval to backoff for after an error before retrying. | | {{timeout}} | {{5000}} | The time interval to wait on connection before timing out. | | {{create}} | {{false}} | Should the endpoint create the node if it does not currently exist. | | {{createMode}} | {{EPHEMERAL}} | The create mode that should be used for the newly created node (see below). | | {{listChildren}} | {{false}} | Whether to recursive get children of the zknode as well. | | {{sendEmptyMessageOnDelete}} | {{true}} | *Camel 2.10:* Upon the delete of a znode, should an empty message be send to the consumer | {div} h3. |
Use
...
cases
...
Reading
...
from
...
a
...
znode.
...
The
...
following
...
snippet
...
will
...
read
...
the
...
data
...
from
...
the
...
znode
...
'/somepath/somenode/'
...
provided
...
that
...
it
...
already
...
exists.
...
The
...
data
...
retrieved
...
will
...
be
...
placed
...
into
...
an
...
exchange
...
and
...
passed
...
onto
...
the
...
rest
...
of
...
the
...
route.
Code Block | ||||
---|---|---|---|---|
| ||||
{code:java} from("zookeeper://localhost:39913/somepath/somenode").to("mock:result"); {code} |
if
...
the
...
node
...
does
...
not
...
yet
...
exist
...
then
...
a
...
flag
...
can
...
be
...
supplied
...
to
...
have
...
the
...
endpoint
...
await
...
its
...
creation
Code Block | ||||
---|---|---|---|---|
| ||||
{code:java} from("zookeeper://localhost:39913/somepath/somenode?awaitCreation=true").to("mock:result"); {code} h4. Reading from a znode - (additional Camel |
Reading from a znode - (additional Camel 2.10
...
onwards)
...
When
...
data
...
is
...
read
...
due
...
to
...
a
...
WatchedEvent
...
received
...
from
...
the
...
ZooKeeper
...
ensemble,
...
the
...
CamelZookeeperEventType
...
header
...
holds
...
ZooKeeper's
...
...
value
...
from
...
that
...
WatchedEvent.
...
If
...
the
...
data
...
is
...
read
...
initially
...
(not
...
triggered
...
by
...
a
...
WatchedEvent)
...
the
...
CamelZookeeperEventType
...
header
...
will
...
not
...
be
...
set.
...
Writing to a znode.
The following snippet will write the payload of the exchange into the znode at '/somepath/somenode/'
...
provided
...
that
...
it
...
already
...
exists
Code Block | ||||
---|---|---|---|---|
| ||||
{code:java} from("direct:write-to-znode").to("zookeeper://localhost:39913/somepath/somenode"); {code} |
For
...
flexibility,
...
the
...
endpoint
...
allows
...
the
...
target
...
znode
...
to
...
be
...
specified
...
dynamically
...
as
...
a
...
message
...
header.
...
If
...
a
...
header
...
keyed
...
by
...
the
...
string
...
'CamelZooKeeperNode'
...
is
...
present
...
then
...
the
...
value
...
of
...
the
...
header
...
will
...
be
...
used
...
as
...
the
...
path
...
to
...
the
...
znode
...
on
...
the
...
server.
...
For
...
instance
...
using
...
the
...
same
...
route
...
definition
...
above,
...
the
...
following
...
code
...
snippet
...
will
...
write
...
the
...
data
...
not
...
to
...
'/somepath/somenode'
...
but
...
to
...
the
...
path
...
from
...
the
...
header
...
'/somepath/someothernode'
Code Block | ||||
---|---|---|---|---|
| ||||
{code:java} Exchange e = createExchangeWithBody(testPayload); template.sendBodyAndHeader("direct:write-to-znode", e, "CamelZooKeeperNode", "/somepath/someothernode"); {code} |
To
...
also
...
create
...
the
...
node
...
if
...
it
...
does
...
not
...
exist
...
the
...
'create'
...
option
...
should
...
be
...
used.
Code Block | ||||
---|---|---|---|---|
| ||||
{code:java} from("direct:create-and-write-to-znode").to("zookeeper://localhost:39913/somepath/somenode?create=true"); {code} Starting *version |
Starting version 2.11
...
it
...
is
...
also
...
possible
...
to
...
delete
...
a
...
node
...
using
...
the
...
header
...
'CamelZookeeperOperation'
...
by
...
setting
...
it
...
to
...
'DELETE'.
Code Block | ||||
---|---|---|---|---|
| ||||
{code:java} from("direct:delete-znode").setHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, constant("DELETE")).to("zookeeper://localhost:39913/somepath/somenode"); {code} |
or
...
equivalently
Code Block | ||||
---|---|---|---|---|
| ||||
{code:xml} <route> <from uri="direct:delete-znode" /> <setHeader headerName="CamelZookeeperOperation"> <constant>DELETE</constant> </setHeader> <to uri="zookeeper://localhost:39913/somepath/somenode" /> </route> {code} |
ZooKeeper
...
nodes
...
can
...
have
...
different
...
types;
...
they
...
can
...
be
...
'Ephemeral'
...
or
...
'Persistent'
...
and
...
'Sequenced'
...
or
...
'Unsequenced'.
...
For
...
further
...
information
...
of
...
each
...
type
...
you
...
can
...
check
...
...
.
...
By
...
default
...
endpoints
...
will
...
create
...
unsequenced,
...
ephemeral
...
nodes,
...
but
...
the
...
type
...
can
...
be
...
easily
...
manipulated
...
via
...
a
...
uri
...
config
...
parameter
...
or
...
via
...
a
...
special
...
message
...
header.
...
The
...
values
...
expected
...
for
...
the
...
create
...
mode
...
are
...
simply
...
the
...
names
...
from
...
the
...
CreateMode
...
enumeration
...
- PERSISTENT
- PERSISTENT_SEQUENTIAL
...
- EPHEMERAL
- EPHEMERAL_SEQUENTIAL
...
For
...
example
...
to
...
create
...
a
...
persistent
...
znode
...
via
...
the
...
URI
...
config
Code Block | ||||
---|---|---|---|---|
| ||||
{code:java} from("direct:create-and-write-to-persistent-znode").to("zookeeper://localhost:39913/somepath/somenode?create=true&createMode=PERSISTENT"); {code} |
or
...
using
...
the
...
header
...
'CamelZookeeperCreateMode'
Code Block | ||||
---|---|---|---|---|
| ||||
{code:java}
Exchange e = createExchangeWithBody(testPayload);
template.sendBodyAndHeader("direct:create-and-write-to-persistent-znode", e, "CamelZooKeeperCreateMode", "PERSISTENT");
|
ZooKeeper enabled Route policy.
ZooKeeper allows for very simple and effective leader election out of the box; This component exploits this election capability in a RoutePolicy to control when and how routes are enabled. This policy would typically be used in fail-over scenarios, to control identical instances of a route across a cluster of Camel based servers. A very common scenario is a simple 'Master-Slave' setup where there are multiple instances of a route distributed across a cluster but only one of them, that of the master, should be running at a time. If the master fails, a new master should be elected from the available slaves and the route in this new master should be started.
The policy uses a common znode path across all instances of the RoutePolicy that will be involved in the election. Each policy writes its id into this node and zookeeper will order the writes in the order it received them. The policy then reads the listing of the node to see what postion of its id; this postion is used to determine if the route should be started or not. The policy is configured at startup with the number of route instances that should be started across the cluster and if its position in the list is less than this value then its route will be started. For a Master-slave scenario, the route is configured with 1 route instance and only the first entry in the listing will start its route. All policies watch for updates to the listing and if the listing changes they recalculate if their route should be started. For more info on Zookeeper's Leader election capability see this page.
The following example uses the node '/someapplication/somepolicy' for the election and is set up to start only the top '1' entries in the node listing i.e. elect a master
Code Block | ||||
---|---|---|---|---|
| ||||
{code} h3. ZooKeeper enabled Route policy. ZooKeeper allows for very simple and effective leader election out of the box; This component exploits this election capability in a [RoutePolicy] to control when and how routes are enabled. This policy would typically be used in fail-over scenarios, to control identical instances of a route across a cluster of Camel based servers. A very common scenario is a simple 'Master-Slave' setup where there are multiple instances of a route distributed across a cluster but only one of them, that of the master, should be running at a time. If the master fails, a new master should be elected from the available slaves and the route in this new master should be started. The policy uses a common znode path across all instances of the RoutePolicy that will be involved in the election. Each policy writes its id into this node and zookeeper will order the writes in the order it received them. The policy then reads the listing of the node to see what postion of its id; this postion is used to determine if the route should be started or not. The policy is configured at startup with the number of route instances that should be started across the cluster and if its position in the list is less than this value then its route will be started. For a Master-slave scenario, the route is configured with 1 route instance and only the first entry in the listing will start its route. All policies watch for updates to the listing and if the listing changes they recalculate if their route should be started. For more info on Zookeeper's Leader election capability see [this page|http://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection]. The following example uses the node '/someapplication/somepolicy' for the election and is set up to start only the top '1' entries in the node listing i.e. elect a master {code:java} ZooKeeperRoutePolicy policy = new ZooKeeperRoutePolicy("zookeeper:localhost:39913/someapp/somepolicy", 1); from("direct:policy-controlled").routePolicy(policy).to("mock:controlled"); {code} {include:Endpoint See Also} |
Include Page | ||||
---|---|---|---|---|
|