Kafka Component
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their pom.xml
for this component.
Code Block |
---|
|
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
|
And then the Scala libraries of choice. camel-kafka does not include that dependency, but assume its provided. For example to use Scala 2.10.4 add:
Code Block |
---|
|
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency> |
Code Block |
---|
kafka:server:port[?options]
|
Options
Div |
---|
class | confluenceTableSmall |
---|
|
Property | Default | Description |
---|
zookeeperHost | | The zookeeper host to use | zookeeperPort | 2181 | The zookeeper port to use | zookeeperConnect | | Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | topic | | The topic to use | groupId | | | partitioner | | | consumerStreams | 10 | | clientId | | | zookeeperSessionTimeoutMs | | | zookeeperConnectionTimeoutMs | | | zookeeperSyncTimeMs | | | consumersCount | 1 | Camel 2.15.0: The number of consumers that connect to kafka server | batchSize | 100 | Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once.
| barrierAwaitTimeoutMs | 10000 | Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs.
|
|
You can append query options to the URI in the following format, ?option=value&option=value&...
Producer Options
Div |
---|
class | confluenceTableSmall |
---|
|
Property | Default | Description |
---|
producerType | | | compressionCodec | | | compressedTopics | | | messageSendMaxRetries | | | retryBackoffMs | | | topicMetadataRefreshIntervalMs | | | sendBufferBytes | | | requestRequiredAcks | | | requestTimeoutMs | | | queueBufferingMaxMs | | | queueBufferingMaxMessages | | | queueEnqueueTimeoutMs | | | batchNumMessages | | | serializerClass | | | keySerializerClass | | |
|
Consumer Options
Div |
---|
class | confluenceTableSmall |
---|
|
Property | Default | Description |
---|
consumerId | | | socketTimeoutMs | | | socketReceiveBufferBytes | | | fetchMessageMaxBytes | | | autoCommitEnable | | | autoCommitIntervalMs | | | queuedMaxMessages | | | rebalanceMaxRetries | | | fetchMinBytes | | | fetchWaitMaxMs | | | rebalanceBackoffMs | | | refreshLeaderBackoffMs | | | autoOffsetReset | | | consumerTimeoutMs | | |
|
Samples
Consuming messages:
Code Block |
---|
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
|
Producing messages:
See unit tests of camel-kafka for more examples