...
- There is a default message store defined for the Camel Context. This can be overridden by a route-specific message store. This again can be overridden by a specific EIP processor.
Message Store Data
In order to disambiguate stored exchanges and make them retrievable again, message store entries must carry attributes in addition to the marshaled exchange.
Item | Description/Reason |
---|---|
ID | Generated unique ID of the entry |
Exchange | exchange (or parts thereof), usually in some marshalled form (except in-memory stores) |
CamelContext | A message store may be used by several CamelContext instances |
Correlation | Correlation ID to be able to aggregate related exchanges |
Source | Identifier that uniquely describes the point in the route from which an exchange was stored. E.g. there might be more than one aggregator processor in a route. Could also be used to manually or automatically refeed exchanges back into the route |
Status | Status of an exchange (e.g. PROCESSING, DONE, FAILED) |
CreationTime | Timestamp when the entry was created |
ExpirationTime | Timestamp when the entry can be considered as expired and picked up by some cleaning process |
... | ... more? |
Code examples
Note | ||
---|---|---|
| ||
Sometimes it is easier to express thoughts by providing a fictional piece of code along with some comments.... |
This section intends to demonstrate the usage of a Message Store by providing hypotethical hypothetical code snippets, e.g.
Code Block | ||
---|---|---|
| ||
... from(...) .aggregate() .correlationExpression(header(id)) .aggregationStrategy(myStrategy) .completionTimeout(10000) .messageStore(myStore) ... |
Claim check
THe claim check pattern temporarily reduces the data volume of the message by storing content in a message store in exchange for a claim check token. The content is retrieved later on before it's needed again.
Code Block | |||
---|---|---|---|
| |||
// Optionally: override default store from context
// (ohr:) IMHO I don't think that this configuration level is really necessary
defaultMessageStore(myStore);
| |||
Code Block | |||
| |||
// 1) Store body. // 2) Set body to null. // 3) Set Exchange.CLAIM_CHECK header to unique claim id. from(...) .claimCheckcheckIn() // store body in default store // .claimCheckcheckIn(header('bigHeader'), customStore) : store header in custom store .to(...); |
Code Block | ||
---|---|---|
| ||
// 1) Lookup for the Exchange.CLAIM_CHECK header value. // 2) Read the message. // 3) Set body to the value fetched from the store. from(...) // .setHeader(Exchange.CLAIM_CHECK, const("id")) : header should still contain the claim id .claimreclaim() // read body from default store // .transformreclaim(claim).aggregationStrategy(myStrategy)) : more generically as expressionusing a aggregation strategy // .setHeader('bigHeader', claim(reclaim(customStore).aggregationStrategy(myStrategy) : retrievereclaim from custom store backusing intoa originalaggregation headerstrategy .to(...); |
Open issues:
- exception handling if there's no data available for a specific token
- clean up of stale content that was never claimed back
- maybe return some kind of DataHandler instead of a token (cf. CXF MTOM attachments) and retrieve content transparently?