Event Sourcing Template
Capture every event and state change in your workflows. Build complete audit trails, enable time-travel debugging, support compliance requirements, and reconstruct system state at any point in time.
Time to build: 30 minutes Difficulty: Advanced Perfect for: Financial systems, compliance workflows, audit trails, debugging, regulated industries
What It Does
Action occurs → Store event → Update state → Maintain immutable event log
Every change is recorded permanently. System state can be reconstructed from event history.
Step-by-Step
1. Create Workflow
- New workflow → Name it "Order Processing with Event Sourcing"
- Webhook trigger for order events
2. Capture Event Metadata
- Drag Function Block to enrich event
- Connect Webhook to Function
// Generate unique event ID and add metadata
return {
event_id: crypto.randomUUID(),
event_type: input.webhook.body.event_type,
aggregate_id: input.webhook.body.order_id,
aggregate_type: "order",
data: input.webhook.body,
metadata: {
user_id: input.webhook.body.user_id,
ip_address: input.webhook.headers.x_forwarded_for,
user_agent: input.webhook.headers.user_agent,
timestamp: new Date().toISOString(),
workflow_execution_id: input.workflow.execution_id
},
version: 1
};
3. Store Event (Immutable)
- Drag Database Query Block
- Connect Function to Database
-- Events table is append-only, never updated or deleted
INSERT INTO events (
event_id,
event_type,
aggregate_id,
aggregate_type,
event_data,
metadata,
event_version,
created_at,
sequence_number
) VALUES (
'<function.event_id>',
'<function.event_type>',
'<function.aggregate_id>',
'<function.aggregate_type>',
'<function.data>'::jsonb,
'<function.metadata>'::jsonb,
<function.version>,
NOW(),
nextval('event_sequence')
) RETURNING id, sequence_number
4. Get Current Aggregate State
- Drag Database Query Block
- Connect previous Database to this one
-- Get current order state by replaying events
SELECT
aggregate_id,
current_state,
version,
last_event_sequence
FROM order_snapshots
WHERE aggregate_id = '<function.aggregate_id>'
FOR UPDATE
5. Apply Event to State
- Drag Agent Block for state transition logic
Model: GPT-4o-mini
System Prompt:
"Apply this event to the current state and return the new state. Follow these rules:
Event types and state transitions:
- order_created: status → 'pending'
- payment_received: status → 'paid', add payment info
- order_shipped: status → 'shipped', add tracking
- order_delivered: status → 'delivered'
- order_cancelled: status → 'cancelled', add reason
Return the complete new state as JSON."
User Prompt:
"Current state: <database.current_state>
Event type: <function.event_type>
Event data: <function.data>"
6. Store Updated State Snapshot
- Drag Database Query Block
-- Store current state snapshot for fast queries
INSERT INTO order_snapshots (
aggregate_id,
current_state,
version,
last_event_sequence,
updated_at
) VALUES (
'<function.aggregate_id>',
'<agent.content>'::jsonb,
<database.version> + 1,
<database_event.sequence_number>,
NOW()
)
ON CONFLICT (aggregate_id)
DO UPDATE SET
current_state = EXCLUDED.current_state,
version = EXCLUDED.version,
last_event_sequence = EXCLUDED.last_event_sequence,
updated_at = NOW()
7. Route Based on Event Type
- Drag Condition Block for different event handlers
Condition: <function.event_type> == "payment_received"
If True: Handle payment
If False: Check next event type
8. Handle Payment Event (True Path)
- Drag HTTP Request Block for payment processor
Method: POST
URL: https://api.stripe.com/v1/charges/<function.data.charge_id>/capture
Headers:
Authorization: Bearer <env.STRIPE_SECRET_KEY>
9. Store Payment Event Result
- Drag Database Query Block
-- Store correlation between event and external action
INSERT INTO event_correlations (
event_id,
correlation_type,
external_id,
external_system,
response_data,
created_at
) VALUES (
'<function.event_id>',
'payment_capture',
'<http_request.body.id>',
'stripe',
'<http_request.body>'::jsonb,
NOW()
)
10. Publish Event to Subscribers
- Drag Parallel Block to notify multiple systems
Parallel Actions:
- Webhook: Notify customer system
- Slack: Alert fulfillment team
- Database: Update search index
- Email: Send customer confirmation
11. Add Event Replay Capability
- Create separate workflow: "Replay Events"
- Schedule or webhook trigger
-- Replay all events for an order to rebuild state
SELECT
event_id,
event_type,
event_data,
sequence_number
FROM events
WHERE aggregate_id = '<input.order_id>'
AND aggregate_type = 'order'
ORDER BY sequence_number ASC
12. Rebuild State from Events
- Drag Loop Block to process events
Items: <database.events>
Variable name: event
13. Apply Each Event (Inside Loop)
- Drag Agent Block
Model: GPT-4o-mini
System Prompt:
"Apply this event to build up the state. Use the same logic as the live system."
User Prompt:
"Current state: <state>
Event: <event>"
14. Verify Rebuilt State
- Drag Condition Block after Loop
Condition: <rebuilt_state> == <snapshot_state>
If True: State is consistent
If False: State mismatch detected, alert team
Event Design Patterns
Event Structure
{
"event_id": "uuid",
"event_type": "order_created",
"aggregate_id": "order_123",
"aggregate_type": "order",
"data": {
"customer_id": "cust_456",
"items": [...],
"total": 99.99
},
"metadata": {
"user_id": "user_789",
"timestamp": "2025-11-01T12:00:00Z",
"ip_address": "192.168.1.1"
},
"version": 1
}
Event Types
Commands: Actions that will happen
- CreateOrder, CancelOrder
Events: Things that happened (past tense)
- OrderCreated, OrderCancelled
Use events, not commands
Aggregate Pattern
Aggregate: Business entity (Order, User, Account)
Events: All changes to that aggregate
Stream: All events for one aggregate
Real-World Examples
Banking Transaction System
- Every transaction is an event
- Account balance rebuilt from events
- Complete audit trail for regulators
- Time-travel to any point in history
E-commerce Order Management
- Order lifecycle as events
- Track every status change
- Reconstruct order history
- Handle disputes with evidence
Healthcare Patient Records
- Every interaction is logged
- HIPAA compliance built-in
- Complete medical history
- Cannot modify past records
Inventory Management
- Track all stock movements
- Audit trail for discrepancies
- Reconstruct inventory at any date
- Find when/why items went missing
Event Sourcing Benefits
Complete Audit Trail
- Every change is recorded
- Who, what, when, why
- Compliance requirements met
- Regulatory reporting simplified
Time Travel
- Reconstruct state at any point
- Debug historical issues
- "What was the balance on Jan 15?"
- Replay events to find bugs
Event Replay
- Fix bugs by replaying events
- Test new logic on old events
- Data migration simplified
- Build new views from events
System Integration
- Events feed other systems
- Loose coupling
- Real-time updates
- Event-driven architecture
Advanced Patterns
CQRS (Command Query Responsibility Segregation)
Write side: Store events
Read side: Build optimized views
Separate models for read/write
Snapshotting
Store state snapshot every N events
Replay from snapshot, not from beginning
Faster state reconstruction
Balance between speed and storage
Event Versioning
v1: { "name": "John" }
v2: { "first_name": "John", "last_name": "Doe" }
Handle multiple versions
Migrate old events
Event Correlation
Track related events
Link cause and effect
Distributed tracing
End-to-end visibility
Schema Design
Events Table
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
event_id UUID UNIQUE NOT NULL,
event_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
aggregate_type VARCHAR(50) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB NOT NULL,
event_version INTEGER NOT NULL,
sequence_number BIGINT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
INDEX idx_aggregate (aggregate_id, sequence_number),
INDEX idx_type (event_type, created_at),
INDEX idx_created (created_at)
);
-- Prevent updates and deletes
CREATE RULE no_update AS ON UPDATE TO events DO INSTEAD NOTHING;
CREATE RULE no_delete AS ON DELETE TO events DO INSTEAD NOTHING;
Snapshots Table
CREATE TABLE order_snapshots (
aggregate_id VARCHAR(100) PRIMARY KEY,
current_state JSONB NOT NULL,
version INTEGER NOT NULL,
last_event_sequence BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL
);
Performance Optimization
Snapshotting Strategy
- Snapshot every 100 events
- Replay from latest snapshot
- 100x faster than full replay
Indexing
- Index by aggregate_id + sequence
- Fast event stream queries
- Partition by date for old events
Archival
- Move old events to cold storage
- Keep snapshots for fast queries
- Reduce primary database size
Monitoring and Analytics
Event Analytics
- Most common event types
- Event frequency over time
- Event processing latency
- Failed event correlations
State Consistency Checks
- Compare snapshot vs replayed state
- Alert on inconsistencies
- Automated repair workflows
Cost
Storage costs (per 1M events):
- Events table: ~$1-2/month (PostgreSQL)
- Snapshots: ~$0.10-0.20/month
- Indexes: ~$0.50-1.00/month
- Total: ~$1.60-3.20/month
Processing costs (per 1,000 events):
- AI state transitions: ~$0.10-0.20
- Database operations: ~$0.01
- Total: ~$0.11-0.21
Value:
- Complete audit trail: Priceless for compliance
- Debug capabilities: Save hours of investigation
- System integration: Enable event-driven architecture
Next Step
Combine with State Management for complete workflow state tracking with full audit history!