Kafka Component
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their pom.xml
for this component.
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-kafka</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
Camel 2.17 or newer
Scala is no longer used, as we use the kafka java client.
Camel 2.16 or older
And then the Scala libraries of choice. camel-kafka does not include that dependency, but assume its provided. For example to use Scala 2.10.4 add:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency>
URI format
kafka:server:port[?options]
Options (Camel 2.16 or older)
Property | Default | Description |
---|---|---|
zookeeperHost |
| The zookeeper host to use |
zookeeperPort | 2181 | The zookeeper port to use |
zookeeperConnect | Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | |
topic |
| The topic to use |
groupId | ||
partitioner | ||
consumerStreams | 10 | |
clientId | ||
zookeeperSessionTimeoutMs | ||
zookeeperConnectionTimeoutMs | ||
zookeeperSyncTimeMs | ||
consumersCount | 1 | Camel 2.15.0: The number of consumers that connect to kafka server |
batchSize | 100 | Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once. |
barrierAwaitTimeoutMs | 10000 | Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs. |
bridgeEndpoint | false | Camel 2.16.0: If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message. |
You can append query options to the URI in the following format, ?option=value&option=value&...
Producer Options (Camel 2.16 or older)
Property | Default | Description |
---|---|---|
producerType | sync (Taken from native KafkaProducer class) | sync - send message/batch immediately, and wait until response is received async - queue the message/batch to send. There is a thread per broker (Kafka node) which polls from this queue upon queueBufferingMaxMs or batchNumMessages |
compressionCodec | ||
compressedTopics | ||
messageSendMaxRetries | ||
retryBackoffMs | ||
topicMetadataRefreshIntervalMs | ||
sendBufferBytes | ||
requestRequiredAcks | ||
requestTimeoutMs | ||
queueBufferingMaxMs | ||
queueBufferingMaxMessages | ||
queueEnqueueTimeoutMs | ||
batchNumMessages | ||
serializerClass | ||
keySerializerClass |
Consumer Options (Camel 2.16 or older)
Property | Default | Description |
---|---|---|
consumerId |
| |
socketTimeoutMs | ||
socketReceiveBufferBytes | ||
fetchMessageMaxBytes | ||
autoCommitEnable | ||
autoCommitIntervalMs | ||
queuedMaxMessages | ||
rebalanceMaxRetries | ||
fetchMinBytes | ||
fetchWaitMaxMs | ||
rebalanceBackoffMs | ||
refreshLeaderBackoffMs | ||
autoOffsetReset | ||
consumerTimeoutMs |
Options (Camel 2.17 or newer)
Property | Default | Description |
---|---|---|
topic | Topic to use | |
groupId | ||
consumerStreams | 10 | |
clientId | ||
consumersCount | 1 | The number of consumers that connect to kafka server |
batchSize | 100 | Commit Size if auto commit is false |
bridgeEndpoint | false | Camel 2.16.0: If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message. |
Producer Options (Camel 2.17 or newer)
Property | Default & Description Reference |
---|---|
keyDeserializer | http://kafka.apache.org/documentation.html#producerconfigs |
keyDeserializer | |
requestRequiredAcks | |
bufferMemorySize | |
compressionCodec | |
retries | |
sslKeyPassword | |
sslKeystoreLocation | |
sslKeystorePassword | |
sslTruststoreLocation | |
sslTruststorePassword | |
producerBatchSize | |
clientId | |
connectionMaxIdleMs | |
lingerMs | |
maxBlockMs | |
maxRequestSize | |
partitioner | |
receiveBufferBytes | |
requestTimeoutMs | |
saslKerberosServiceName | |
securityProtocol | |
sendBufferBytes | |
sslEnabledProtocols | |
sslKeystoreType | |
sslProtocol | |
sslProvider | |
sslTruststoreType | |
timeoutMs | |
blockOnBufferFull | |
maxInFlightRequest | |
metadataFetchTimeoutMs | |
metadataMaxAgeMs | |
metricReporters | |
noOfMetricsSample | |
metricsSampleWindowMs | |
reconnectBackoffMs | |
retryBackoffMs | |
kerberosInitCmd | |
kerberosBeforeReloginMinTime | |
kerberosRenewJitter | |
kerberosRenewWindowFactor | |
sslCipherSuites | |
sslEndpointAlgorithm | |
sslKeymanagerAlgorithm | |
sslTrustmanagerAlgorithm |
Consumer Options (Camel 2.17 or newer)
Property | Default & Description Reference |
---|---|
| http://kafka.apache.org/documentation.html#newconsumerconfigs |
keyDeserializer | |
valueDeserializer | |
fetchMinBytes | |
groupId | |
heartbeatIntervalMs | |
maxPartitionFetchBytes | |
sessionTimeoutMs | |
sslKeyPassword | |
sslKeystoreLocation | |
sslKeystorePassword | |
sslTruststoreLocation | |
sslTruststorePassword | |
autoOffsetReset | |
connectionMaxIdleMs | |
autoCommitEnable | |
partitionAssignor | |
receiveBufferBytes | |
consumerRequestTimeoutMs | |
saslKerberosServiceName | |
securityProtocol | |
sendBufferBytes | |
sslEnabledProtocols | |
sslKeystoreType | |
sslProtocol | |
sslProvider | |
sslTruststoreType | |
autoCommitIntervalMs | |
checkCrcs | |
clientId | |
fetchWaitMaxMs | |
metadataMaxAgeMs | |
metricReporters | |
noOfMetricsSample | |
metricsSampleWindowMs | |
reconnectBackoffMs | |
retryBackoffMs | |
kerberosInitCmd | |
kerberosBeforeReloginMinTime | |
kerberosRenewJitter | |
kerberosRenewWindowFactor | |
sslCipherSuites | |
sslEndpointAlgorithm | |
sslKeymanagerAlgorithm | |
sslTrustmanagerAlgorithm |
Samples
Camel 2.16 or older
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Camel 2.17 or newer
Consuming messages:
from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String messageKey = ""; if (exchange.getIn() != null) { Message message = exchange.getIn(); Integer partitionId = (Integer) message .getHeader(KafkaConstants.PARTITION); String topicName = (String) message .getHeader(KafkaConstants.TOPIC); if (message.getHeader(KafkaConstants.KEY) != null) messageKey = (String) message .getHeader(KafkaConstants.KEY); Object data = message.getBody(); System.out.println("topicName :: " + topicName + " partitionId :: " + partitionId + " messageKey :: " + messageKey + " message :: " + data + "\n"); } } }).to("log:input");
Producing messages:
from("direct:start").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class); exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); exchange.getIn().setHeader(KafkaConstants.KEY, "1"); } }).to("kafka:localhost:9092?topic=test");
Endpoints
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
- createProducer() will create a Producer for sending message exchanges to the endpoint
- createConsumer() implements the Event Driven Consumer pattern for consuming message exchanges from the endpoint via a Processor when creating a Consumer
- createPollingConsumer() implements the Polling Consumer pattern for consuming message exchanges from the endpoint via a PollingConsumer