Aggregation Repository¶
Batch IoT events using Camel's aggregation pattern with a JDBC-backed repository for durability across restarts.
What You'll Learn¶
- How Forage creates a JDBC aggregation repository from a single property
- Configuring Camel's aggregator with size-based and timeout-based completion
- Writing a custom
AggregationStrategyto collect events into a list - Persisting aggregation state in PostgreSQL
Prerequisites¶
Start PostgreSQL:
Then create the aggregation repository tables:
What setup-db.sh does
CREATE TABLE IF NOT EXISTS event_aggregation (
id varchar(255) NOT NULL,
exchange bytea NOT NULL,
version BIGINT NOT NULL,
CONSTRAINT event_aggregation_pk PRIMARY KEY (id)
);
CREATE TABLE IF NOT EXISTS event_aggregation_completed (
id varchar(255) NOT NULL,
exchange bytea NOT NULL,
version BIGINT NOT NULL,
CONSTRAINT event_aggregation_completed_pk PRIMARY KEY (id)
);
Camel's JDBC aggregation repository requires two tables: one for in-progress aggregations and one for completed aggregations. The table names must match the forage.jdbc.aggregation.repository.name property.
Configuration¶
application.properties
# Database connection
forage.jdbc.db.kind=postgresql # (1)!
forage.jdbc.url=jdbc:postgresql://localhost:5432/postgres
forage.jdbc.username=test
forage.jdbc.password=test
# Connection pool
forage.jdbc.pool.initial.size=5
forage.jdbc.pool.min.size=2
forage.jdbc.pool.max.size=20
forage.jdbc.pool.acquisition.timeout.seconds=5
forage.jdbc.pool.validation.timeout.seconds=3
forage.jdbc.pool.leak.timeout.minutes=10
forage.jdbc.pool.idle.validation.timeout.minutes=3
# Transaction
forage.jdbc.transaction.enabled=true # (2)!
forage.jdbc.transaction.timeout.seconds=30
# Aggregation repository
forage.jdbc.aggregation.repository.name=event_aggregation # (3)!
- Uses the default (unnamed) prefix.
- Transactions are required for the aggregation repository to guarantee consistency.
- This creates a
JdbcAggregationRepositorybean namedevent_aggregationthat stores aggregation state in theevent_aggregation/event_aggregation_completedtables.
Route¶
event-batching.camel.yaml
- route:
id: event-batching
from:
uri: direct
parameters:
name: events
steps:
- log:
message: "Received event with id :${header.eventId} and body: ${body}"
- aggregate:
aggregationRepository: "#event_aggregation" # (1)!
aggregationStrategy: "#groupedBodyAggregationStrategy" # (2)!
completionSize: 5 # (3)!
completionTimeout: "5000" # (4)!
correlationExpression:
header:
expression: eventId # (5)!
steps:
- log:
message: >-
Batch complete with ${exchangeProperty.CamelAggregatedSize}
event id: ${header.eventId} and events: ${body}
- beans:
- name: groupedBodyAggregationStrategy
type: org.forage.MyAggregationStrategy
- References the JDBC aggregation repository bean created by Forage.
- Custom strategy that collects message bodies into a
List. - Complete the batch when 5 events with the same correlation key arrive.
- Or complete after 5 seconds of inactivity, whichever comes first.
- Events are grouped by their
eventIdheader -- each unique value gets its own batch.
Aggregation strategy¶
The custom MyAggregationStrategy collects each incoming exchange body into an ArrayList:
org/forage/MyAggregationStrategy.java
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
if (oldExchange == null) {
List<Object> list = new ArrayList<>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
}
List<Object> list = oldExchange.getIn().getBody(List.class);
list.add(newBody);
oldExchange.getIn().setBody(list);
return oldExchange;
}
}
Running¶
Sending test events¶
Use camel cmd send to push events into the direct:events endpoint:
# 3 events with eventId=1 -- batch completes after 5s timeout
camel cmd send --body="Event 1" --header="eventId=1" \
--endpoint="direct:events" event-batching
camel cmd send --body="Event 2" --header="eventId=1" \
--endpoint="direct:events" event-batching
camel cmd send --body="Event 3" --header="eventId=1" \
--endpoint="direct:events" event-batching
# 5 events with eventId=2 -- batch completes immediately by size
for i in 1 2 3 4 5; do
camel cmd send --body="Batch $i" --header="eventId=2" \
--endpoint="direct:events" event-batching
done
Expected output:
Received event with id :1 and body: Event 1
Received event with id :1 and body: Event 2
Received event with id :1 and body: Event 3
...
Batch complete with 3 event id: 1 and events: [Event 1, Event 2, Event 3]
Batch complete with 5 event id: 2 and events: [Batch 1, Batch 2, Batch 3, Batch 4, Batch 5]
- eventId=1: Only 3 events arrived, so the batch completed after the 5-second timeout.
- eventId=2: All 5 events arrived, so the batch completed immediately by size.
Key Takeaways¶
- Setting
forage.jdbc.aggregation.repository.namecreates a ready-to-useJdbcAggregationRepositorybean. - The repository persists in-flight aggregation state in PostgreSQL, surviving application restarts.
- Completion criteria (size and timeout) work together -- whichever triggers first completes the batch.
- Events are correlated by a header value, allowing independent batches per event type.