Kinesis Component
Available as of Camel 2.717
The Kinesis component supports receiving messages from and sending messages to Amazon Kinesis service.
Info | ||
---|---|---|
| ||
You must have a valid Amazon Web Services developer account, and be signed up to use Amazon Kinesis. More information are available at AWS Kinesis |
...
Name | Default Value | Context | Description |
---|---|---|---|
amazonKinesisClient |
| ConsumerShared | Reference to a |
maxMessagesPerPoll |
| Consumer | Maximum results that will be returned in each poll to the AWS API, Given that the shard iterator is unique to the consumer, changing it shouldn't effect other consumers. |
iteratorType |
| Consumer | One of trim_horizon or latestTRIM_HORIZON, LATEST, AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER. See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html for descriptions of these two iterator types. |
sequenceNumber | null | Consumer | The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER |
Info | ||
---|---|---|
| ||
You have to provide the amazonKinesisClient in the Registry with proxies and relevant credentials configured. |
...
This allows you for instance to know how many messages exists in this batch and for instance let the Aggregator aggregate this number of messages.
Usage
Message headers set by the
...
Kinesis consumer
Header | Type | Description |
---|---|---|
|
| The sequence number of the record. This is represented as a String as it size is not defined by the API. If it is to be used as a numerical type then use |
|
| The time AWS assigned as the arrival time of the record. |
|
| Identifies which shard in the stream the data record is assigned to. |
Message headers set by the Kinesis producer
Header | Type | Description |
---|---|---|
|
| The sequence number of the record. This is represented as a String as it size is not defined by the API. If it is to be used as a numerical type then use |
|
| Indicates where the data was stored. |
AmazonKinesis configuration
You will need to create an instance of AmazonDynamoDBStreamsClient and AmazonKinesisClient and bind it to the registry
Code Block | ||
---|---|---|
| ||
ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setProxyHost("http://myProxyHost"); clientConfiguration.setProxyPort(8080); Region region = Region.getRegion(Regions.fromName(region)); region.createClient(AmazonDynamoDBStreamsClientAmazonKinesisClient.class, null, clientConfiguration); // the 'null' here is the AWSCredentialsProvider which defaults to an instance of DefaultAWSCredentialsProviderChain registry.bind("kinesisClient", client); |
You then have to reference the AmazonKinesisClient in the amazonKinesisClient URI option.
Code Block | ||
---|---|---|
| ||
from("aws-kinesis://mykinesisstream?amazonKinesisClient=#kinesisClient") .to("log:out?showAll=true"); |
Providing AWS Credentials
...
where ${camel-version
} must be replaced by the actual version of Camel (2.7 17 or higher).
Include Page | ||||
---|---|---|---|---|
|
...