...
Camel 2.17 or newer
Scala is not no longer used, as we use the kafka java client.
...
And then the Scala libraries of choice. camel-kafka does not include that dependency, but assume its assumes it is provided. For example to use Scala 2.10.4 add:
...
Div | ||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||||||||||||||||||||||||||||||||||||||
|
...
Property | Default | Description |
---|---|---|
topic | Topic to use. From the consumer side you can specify also a comma separated list of topics. | |
groupId | ||
consumerStreams | 10 | |
clientId | ||
consumersCount | 1 | The number of consumers that connect to kafka server |
batchSize | 100 | Commit Size if auto commit is false |
bridgeEndpoint | false | Camel 2.16.0: If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message. |
...
Property | Default & Description Reference |
---|---|
keyDeserializerserializerClass | http://kafka.apache.org/documentation.html#producerconfigs serializerClass : org.apache.kafka.common.serialization.StringSerializer keySerializerClass : org.apache.kafka.common.serialization.StringSerializer partitioner : org.apache.kafka.clients.producer.internals.DefaultPartitioner
|
keySerializerClasskeyDeserializer | |
requestRequiredAcks | |
bufferMemorySize | |
compressionCodec | |
retries | |
sslKeyPassword | |
sslKeystoreLocation | |
sslKeystorePassword | |
sslTruststoreLocation | |
sslTruststorePassword | |
producerBatchSize | |
clientId | |
connectionMaxIdleMs | |
lingerMs | |
maxBlockMs | |
maxRequestSize | |
partitioner | |
receiveBufferBytes | |
requestTimeoutMs | |
saslKerberosServiceName | |
saslMechanism (from Camel 2.18) | |
securityProtocol | |
sendBufferBytes | |
sslEnabledProtocols | |
sslKeystoreType | |
sslProtocol | |
sslProvider | |
sslTruststoreTypetimeoutMs | |
blockOnBufferFull | |
maxInFlightRequest metadataFetchTimeoutMs | |
metadataMaxAgeMs | |
metricReporters | |
noOfMetricsSample | |
metricsSampleWindowMs | |
reconnectBackoffMs | |
retryBackoffMs | |
kerberosInitCmd | |
kerberosBeforeReloginMinTime | |
kerberosRenewJitter | |
kerberosRenewWindowFactor | |
sslCipherSuites | |
sslEndpointAlgorithm | |
sslKeymanagerAlgorithm | |
sslTrustmanagerAlgorithm |
...
Property | Default & Description Reference |
---|---|
bootstrapServers | http://kafka.apache.org/documentation.html#newconsumerconfigs keyDeserializer : org.apache.kafka.common.serialization.StringDeserializer valueDeserializer : org.apache.kafka.common.serialization.StringDeserializer partitionAssignor : org.apache.kafka.clients.consumer.RangeAssignor
|
keyDeserializer | |
valueDeserializer | |
fetchMinBytes | |
groupId | |
heartbeatIntervalMs | |
maxPartitionFetchBytes | |
sessionTimeoutMs | |
sslKeyPassword | |
sslKeystoreLocation | |
sslKeystorePassword | |
sslTruststoreLocation | |
sslTruststorePassword | |
autoOffsetReset | |
connectionMaxIdleMs | |
autoCommitEnable | |
partitionAssignor | |
receiveBufferBytes | |
consumerRequestTimeoutMs | |
saslKerberosServiceName | |
saslMechanism (from Camel 2.18) | |
securityProtocol | |
sendBufferBytes | |
sslEnabledProtocols | |
sslKeystoreType | |
sslProtocol | |
sslProvider | |
sslTruststoreType | |
autoCommitIntervalMs | |
checkCrcs | |
clientId | |
fetchWaitMaxMs | |
metadataMaxAgeMs | |
metricReporters | |
noOfMetricsSample | |
metricsSampleWindowMs | |
reconnectBackoffMs | |
retryBackoffMs | |
kerberosInitCmd | |
kerberosBeforeReloginMinTime | |
kerberosRenewJitter | |
kerberosRenewWindowFactor | |
sslCipherSuites | |
sslEndpointAlgorithm | |
sslKeymanagerAlgorithm | |
sslTrustmanagerAlgorithm |
...
See unit tests of camel-kafka for more examples
Camel 2.17 or newer
Consuming messages:
Code Block |
---|
from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
.process(new Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String messageKey = "";
if (exchange.getIn() != null) {
Message message = exchange.getIn();
Integer partitionId = (Integer) message
.getHeader(KafkaConstants.PARTITION);
String topicName = (String) message
.getHeader(KafkaConstants.TOPIC);
if (message.getHeader(KafkaConstants.KEY) != null)
messageKey = (String) message
.getHeader(KafkaConstants.KEY);
Object data = message.getBody();
System.out.println("topicName :: "
+ topicName + " partitionId :: "
+ partitionId + " messageKey :: "
+ messageKey + " message :: "
+ data + "\n");
}
}
}).to("log:input");
|
Producing messages:
Code Block |
---|
from("direct:start").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
exchange.getIn().setHeader(KafkaConstants.KEY, "1");
}
}).to("kafka:localhost:9092?topic=test");
|
Using the Kafka idempotent repository (Available from Camel 2.19)
The camel-kafka
library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.
The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions; as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic.
Each repository instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic each will control its own offset.
On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state. The cache will not be considered warmed up until one poll of pollDurationMs
in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.
A KafkaIdempotentRepository
has the following properties:
Property | Description |
---|---|
topic | The name of the Kafka topic to use to broadcast changes. (required) |
bootstrapServers | The bootstrap.servers property on the internal Kafka producer and consumer. Use this as shorthand if not setting consumerConfig and producerConfig . If used, this component will apply sensible default configurations for the producer and consumer. |
producerConfig | Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides bootstrapServers , so must define the Kafka bootstrap.servers property itself |
consumerConfig | Sets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides bootstrapServers , so must define the Kafka bootstrap.servers property itself |
maxCacheSize | How many of the most recently used keys should be stored in memory (default 1000). |
pollDurationMs | The poll duration of the Kafka consumer. The local caches are updated immediately; this value will affect how far behind other peers in the cluster are, which are updating their caches from the topic, relative to the idempotent consumer instance issued the cache action message. The default value of this is 100 ms. If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. |
The repository can be instantiated by defining the topic and bootstrapServers
, or the producerConfig
and consumerConfig
property sets can be explicitly defined to enable features such as SSL/SASL.
To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is CamelContext
aware.
Sample usage is as follows:
Code Block | ||
---|---|---|
| ||
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end() |
In XML:
Code Block | ||
---|---|---|
| ||
<!-- simple -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>
|
Include Page | ||||
---|---|---|---|---|
|