Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

See unit tests of camel-kafka for more examples

Camel 2.17 or newer

Consuming messages:

Code Block
from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
						.process(new Processor() {
							@Override
							public void process(Exchange exchange)
									throws Exception {
								String messageKey = "";
								if (exchange.getIn() != null) {
									Message message = exchange.getIn();
									Integer partitionId = (Integer) message
											.getHeader(KafkaConstants.PARTITION);
									String topicName = (String) message
											.getHeader(KafkaConstants.TOPIC);
									if (message.getHeader(KafkaConstants.KEY) != null)
										messageKey = (String) message
												.getHeader(KafkaConstants.KEY);
									Object data = message.getBody();


									System.out.println("topicName :: "
											+ topicName + " partitionId :: "
											+ partitionId + " messageKey :: "
											+ messageKey + " message :: "
											+ data + "\n");
								}
							}
						}).to("log:input");

 

Producing messages:

Code Block
from("direct:start").process(new Processor() {
					@Override
					public void process(Exchange exchange) throws Exception {
						exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
						exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
						exchange.getIn().setHeader(KafkaConstants.KEY, "1");
					}
				}).to("kafka:localhost:9092?topic=test");

 

Include Page
Endpoint
Endpoint