...
- Creation of nodes in any of the ZooKeeper create modes.
- Get and Set the data contents of arbitrary cluster nodes. (data being set must be convertable to byte[])
- Create and retrieve the list the child nodes attached to a particular node.
- A Distributed RoutePolicy that leverages a Leader election coordinated by ZooKeeper to determine if exchanges should get processed.
Maven users will need to add the following dependency to their pom.xml
for this component:
Code Block |
---|
|
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-zookeeper</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
|
URI format
Code Block |
---|
zookeeper://zookeeper-server[:port][/path][?options]
|
The path from the uri specifies the node in the ZooKeeper server (aka znode) that will be the target of the endpoint.
Options
Div |
---|
class | confluenceTableSmall |
---|
|
Name | Default Value | Description |
---|
path
| | The node in the ZooKeeper server (aka znode) | listChildren
| false
| Whether the children of the node should be listed | repeat
| false
| Should changes to the znode be 'watched' and repeatedly processed. | backoff
| 5000
| The time interval to backoff for after an error before retrying. | timeout
| 5000
| The time interval to wait on connection before timing out. | create
| false
| Should the endpoint create the node if it does not currently exist. | createMode
| EPHEMERAL
| The create mode that should be used for the newly created node (see below). | sendEmptyMessageOnDelete
| true
| Camel 2.10: Upon the delete of a znode, should an empty message be send to the consumer |
|
Wiki Markup |
---|
{div:class=confluenceTableSmall}
|| Name || Default Value || Description ||
| {{path}} | | The node in the ZooKeeper server (aka znode) |
| {{listChildren}} | {{false}} | Whether the children of the node should be listed |
| {{repeat}} | {{false}} | Should changes to the znode be 'watched' and repeatedly processed. |
| {{backoff}} | {{5000}} | The time interval to backoff for after an error before retrying. |
| {{timeout}} | {{5000}} | The time interval to wait on connection before timing out. |
| {{create}} | {{false}} | Should the endpoint create the node if it does not currently exist. |
| {{createMode}} | {{EPHEMERAL}} | The create mode that should be used for the newly created node (see below). |
| {{listChildren}} | {{false}} | Whether to recursive get children of the zknode as well. |
| {{sendEmptyMessageOnDelete}} | {{true}} | *Camel 2.10:* Upon the delete of a znode, should an empty message be send to the consumer |
{div} |
Use cases
Reading from a znode.
The following snippet will read the data from the znode '/somepath/somenode/' provided that it already exists. The data retrieved will
be placed into an exchange and passed onto the rest of the route.
Code Block |
---|
|
from("zookeeper://localhost:39913/somepath/somenode").to("mock:result");
|
if the node does not yet exist then a flag can be supplied to have the endpoint await its creation
Code Block |
---|
|
from("zookeeper://localhost:39913/somepath/somenode?awaitCreation=true").to("mock:result");
|
...
The following snippet will write the payload of the exchange into the znode at '/somepath/somenode/' provided that it already exists
Code Block |
---|
|
from("direct:write-to-znode").to("zookeeper://localhost:39913/somepath/somenode");
|
For flexibility, the endpoint allows the target znode to be specified dynamically as a message header. If a header keyed by the string 'CamelZooKeeperNode' is present then the value of the header will be used as the path to the znode on the server. For instance using the same route definition above, the following code snippet will write the data not to '/somepath/somenode' but to the path from the header '/somepath/someothernode'. Notice the testPayload must be convertable to byte[] as the data stored in ZooKeeper is byte based.
Code Block |
---|
|
ExchangeObject etestPayload = createExchangeWithBody(testPayload);...
template.sendBodyAndHeader("direct:write-to-znode", etestPayload, "CamelZooKeeperNode", "/somepath/someothernode");
|
To also create the node if it does not exist the 'create' option should be used.
Code Block |
---|
|
from("direct:create-and-write-to-znode").to("zookeeper://localhost:39913/somepath/somenode?create=true");
|
Starting version 2.11 it is also possible to delete a node using the header 'CamelZookeeperOperation' by setting it to 'DELETE'.
Code Block |
---|
|
from("direct:delete-znode").setHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, constant("DELETE")).to("zookeeper://localhost:39913/somepath/somenode");
|
or equivalently
Code Block |
---|
|
<route>
<from uri="direct:delete-znode" />
<setHeader headerName="CamelZookeeperOperation">
<constant>DELETE</constant>
</setHeader>
<to uri="zookeeper://localhost:39913/somepath/somenode" />
</route>
|
...
For example to create a persistent znode via the URI config
Code Block |
---|
|
from("direct:create-and-write-to-persistent-znode").to("zookeeper://localhost:39913/somepath/somenode?create=true&createMode=PERSISTENT");
|
or using the header 'CamelZookeeperCreateMode'. Notice the testPayload must be convertable to byte[] as the data stored in ZooKeeper is byte based.
Code Block |
---|
|
ExchangeObject etestPayload = createExchangeWithBody(testPayload);...
template.sendBodyAndHeader("direct:create-and-write-to-persistent-znode", etestPayload, "CamelZooKeeperCreateMode", "PERSISTENT");
|
...
The following example uses the node '/someapplication/somepolicy' for the election and is set up to start only the top '1' entries in the node listing i.e. elect a master
Code Block |
---|
|
ZooKeeperRoutePolicy policy = new ZooKeeperRoutePolicy("zookeeper:localhost:39913/someapp/somepolicy", 1);
from("direct:policy-controlled").routePolicy(policy).to("mock:controlled");
|
Include Page |
---|
| Endpoint See Also |
---|
| Endpoint See Also |
---|
|