Camel 2.3 - Overhaul of Aggregator EIP

The Aggregator EIP needs an overhaul in Camel to remedy a few new features, improvements and fix an issue with using completion predicate and timeout.

Current issues

  • Build on top of BatchProcessor which has to complex logic for concurrent locking and accepting new messages.
  • BatchProcessor was originally designed to help with batch and transactions
  • Either BatchProcessor needs to be reworked or a new base class implemented
  • Completion predicate and batch timeout does not work together
  • Batch timeout is actually a interval check timeout
  • AggregationStrategy is invoked to late as its only invoked when the batch timeout triggers
  • BatchSender is single threaded

New Design

  • BatchSender should use ExecutorsService that ensures if a thread is dead a new thread will be started, eg the task is always running. DONE
  • BatchSender is currently only a single thread. We should support concurrency, so you can have multiple output based on per correlation key. DONE
  • BatchSender should have simpler logic for enqueing and locking if possible. DONE
  • BatchProcessor should support persistent, so aggregated exchanges can be stored in a database using JPA. See how we do this with the Idempotent Consumer and/or Tracer. DONE
  • If possible: AggregationStrategy should be invoked on the fly so you in the future can use it to determine if we are complete. DONE
  • Batch timeout and completion predicate should work together so the completion is triggered by the first one. DONE
  • Batch timeout should be per correlation key based, and not a single global timeout. DONE
  • Add POJO based aggregation strategy so you can avoid using the low level Exchange APIs
  • Add option what to do in case the correlation key cannot be evaluated, eg to ignore or throw an exception back to caller. DONE
  • Add option to ignore to late exchanges if a correlation key has been closed. DONE
  • Look all JIRA tickets at Apache what we have for the Aggregator DONE
  • Resequencer is also using the BatchProcessor so we should ensure it works as well and maybe it can benefit some of the new features as well.
  • Add persistent AggregationRepository out of the box DONE
  • Configuration of completion options should be more dynamic such as timeout and size DONE
  • No labels