You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Kafka Component

Available as of Camel 2.13

The kafka: component is used for communicating with Apache Kafka message broker.

Maven users will need to add the following dependency to their pom.xml for this component.

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-kafka</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

Camel 2.17 or newer

Scala is not longer used, as we use the kafka java client.

Camel 2.16 or older

And then the Scala libraries of choice. camel-kafka does not include that dependency, but assume its provided. For example to use Scala 2.10.4 add:

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.10.4</version>
    </dependency>

URI format

kafka:server:port[?options]

 

Options (Camel 2.16 or older)

Property

Default

Description

zookeeperHost

 

The zookeeper host to use

zookeeperPort

2181

The zookeeper port to use

zookeeperConnect Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used.

topic

 

The topic to use

groupId

  

partitioner

  

consumerStreams

10 

clientId

  

zookeeperSessionTimeoutMs

  

zookeeperConnectionTimeoutMs

  

zookeeperSyncTimeMs

  

consumersCount

1

Camel 2.15.0: The number of consumers that connect to kafka server

batchSize

100

Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once.

barrierAwaitTimeoutMs

10000

Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs.

bridgeEndpointfalseCamel 2.16.0: If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message.

You can append query options to the URI in the following format, ?option=value&option=value&...

Producer Options (Camel 2.16 or older)

Property

Default

Description

producerType

sync (Taken from native KafkaProducer class)

sync - send message/batch immediately, and wait until response is received

async - queue the message/batch to send. There is a thread per broker (Kafka node) which polls from this queue upon queueBufferingMaxMs or batchNumMessages

compressionCodec  
compressedTopics  
messageSendMaxRetries  
retryBackoffMs  
topicMetadataRefreshIntervalMs  
sendBufferBytes  
requestRequiredAcks  
requestTimeoutMs  
queueBufferingMaxMs  
queueBufferingMaxMessages  
queueEnqueueTimeoutMs  
batchNumMessages  
serializerClass  
keySerializerClass  

Consumer Options (Camel 2.16 or older)

Property

Default

Description

consumerId

 

 
socketTimeoutMs  
socketReceiveBufferBytes  
fetchMessageMaxBytes  
autoCommitEnable  
autoCommitIntervalMs  
queuedMaxMessages  
rebalanceMaxRetries  
fetchMinBytes  
fetchWaitMaxMs  
rebalanceBackoffMs  
refreshLeaderBackoffMs  
autoOffsetReset  
consumerTimeoutMs  

Samples

Camel 2.16 or older

Consuming messages:

from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");

Producing messages:

See unit tests of camel-kafka for more examples

Endpoints

Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.

From an Endpoint you can use the following methods

See Also

  • No labels