Kafka Component
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their pom.xml
for this component.
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-kafka</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
And then the Scala libraries of choice. camel-kafka does not include that dependency, but assume its provided. For example to use Scala 2.10.4 add:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency>
URI format
kafka:server:port[?options]
Options
Property | Default | Description |
---|---|---|
zookeeperHost |
| The zookeeper host to use |
zookeeperPort | 2181 | The zookeeper port to use |
zookeeperConnect | Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | |
topic |
| The topic to use |
groupId | ||
partitioner | ||
consumerStreams | 10 | |
clientId | ||
zookeeperSessionTimeoutMs | ||
zookeeperConnectionTimeoutMs | ||
zookeeperSyncTimeMs | ||
consumersCount | 1 | Camel 2.15.0: The number of consumers that connect to kafka server |
batchSize | 100 | Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once. |
barrierAwaitTimeoutMs | 10000 | Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs. |
You can append query options to the URI in the following format, ?option=value&option=value&...
Producer Options
Property | Default | Description |
---|---|---|
producerType |
| |
compressionCodec | ||
compressedTopics | ||
messageSendMaxRetries | ||
retryBackoffMs | ||
topicMetadataRefreshIntervalMs | ||
sendBufferBytes | ||
requestRequiredAcks | ||
requestTimeoutMs | ||
queueBufferingMaxMs | ||
queueBufferingMaxMessages | ||
queueEnqueueTimeoutMs | ||
batchNumMessages | ||
serializerClass | ||
keySerializerClass |
Consumer Options
Property | Default | Description |
---|---|---|
consumerId |
| |
socketTimeoutMs | ||
socketReceiveBufferBytes | ||
fetchMessageMaxBytes | ||
autoCommitEnable | ||
autoCommitIntervalMs | ||
queuedMaxMessages | ||
rebalanceMaxRetries | ||
fetchMinBytes | ||
fetchWaitMaxMs | ||
rebalanceBackoffMs | ||
refreshLeaderBackoffMs | ||
autoOffsetReset | ||
consumerTimeoutMs |
Samples
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Endpoints
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
- createProducer() will create a Producer for sending message exchanges to the endpoint
- createConsumer() implements the Event Driven Consumer pattern for consuming message exchanges from the endpoint via a Processor when creating a Consumer
- createPollingConsumer() implements the Polling Consumer pattern for consuming message exchanges from the endpoint via a PollingConsumer