Skip to main content

Event Sourcing Template

Capture every event and state change in your workflows. Build complete audit trails, enable time-travel debugging, support compliance requirements, and reconstruct system state at any point in time.

Time to build: 30 minutes Difficulty: Advanced Perfect for: Financial systems, compliance workflows, audit trails, debugging, regulated industries


What It Does

Action occurs → Store event → Update state → Maintain immutable event log

Every change is recorded permanently. System state can be reconstructed from event history.


Step-by-Step

1. Create Workflow

  • New workflow → Name it "Order Processing with Event Sourcing"
  • Webhook trigger for order events

2. Capture Event Metadata

  • Drag Function Block to enrich event
  • Connect Webhook to Function
// Generate unique event ID and add metadata
return {
event_id: crypto.randomUUID(),
event_type: input.webhook.body.event_type,
aggregate_id: input.webhook.body.order_id,
aggregate_type: "order",
data: input.webhook.body,
metadata: {
user_id: input.webhook.body.user_id,
ip_address: input.webhook.headers.x_forwarded_for,
user_agent: input.webhook.headers.user_agent,
timestamp: new Date().toISOString(),
workflow_execution_id: input.workflow.execution_id
},
version: 1
};

3. Store Event (Immutable)

  • Drag Database Query Block
  • Connect Function to Database
-- Events table is append-only, never updated or deleted
INSERT INTO events (
event_id,
event_type,
aggregate_id,
aggregate_type,
event_data,
metadata,
event_version,
created_at,
sequence_number
) VALUES (
'<function.event_id>',
'<function.event_type>',
'<function.aggregate_id>',
'<function.aggregate_type>',
'<function.data>'::jsonb,
'<function.metadata>'::jsonb,
<function.version>,
NOW(),
nextval('event_sequence')
) RETURNING id, sequence_number

4. Get Current Aggregate State

  • Drag Database Query Block
  • Connect previous Database to this one
-- Get current order state by replaying events
SELECT
aggregate_id,
current_state,
version,
last_event_sequence
FROM order_snapshots
WHERE aggregate_id = '<function.aggregate_id>'
FOR UPDATE

5. Apply Event to State

  • Drag Agent Block for state transition logic
Model: GPT-4o-mini

System Prompt:
"Apply this event to the current state and return the new state. Follow these rules:

Event types and state transitions:
- order_created: status → 'pending'
- payment_received: status → 'paid', add payment info
- order_shipped: status → 'shipped', add tracking
- order_delivered: status → 'delivered'
- order_cancelled: status → 'cancelled', add reason

Return the complete new state as JSON."

User Prompt:
"Current state: <database.current_state>
Event type: <function.event_type>
Event data: <function.data>"

6. Store Updated State Snapshot

  • Drag Database Query Block
-- Store current state snapshot for fast queries
INSERT INTO order_snapshots (
aggregate_id,
current_state,
version,
last_event_sequence,
updated_at
) VALUES (
'<function.aggregate_id>',
'<agent.content>'::jsonb,
<database.version> + 1,
<database_event.sequence_number>,
NOW()
)
ON CONFLICT (aggregate_id)
DO UPDATE SET
current_state = EXCLUDED.current_state,
version = EXCLUDED.version,
last_event_sequence = EXCLUDED.last_event_sequence,
updated_at = NOW()

7. Route Based on Event Type

  • Drag Condition Block for different event handlers
Condition: <function.event_type> == "payment_received"

If True: Handle payment
If False: Check next event type

8. Handle Payment Event (True Path)

  • Drag HTTP Request Block for payment processor
Method: POST
URL: https://api.stripe.com/v1/charges/<function.data.charge_id>/capture

Headers:
Authorization: Bearer <env.STRIPE_SECRET_KEY>

9. Store Payment Event Result

  • Drag Database Query Block
-- Store correlation between event and external action
INSERT INTO event_correlations (
event_id,
correlation_type,
external_id,
external_system,
response_data,
created_at
) VALUES (
'<function.event_id>',
'payment_capture',
'<http_request.body.id>',
'stripe',
'<http_request.body>'::jsonb,
NOW()
)

10. Publish Event to Subscribers

  • Drag Parallel Block to notify multiple systems

Parallel Actions:

  1. Webhook: Notify customer system
  2. Slack: Alert fulfillment team
  3. Database: Update search index
  4. Email: Send customer confirmation

11. Add Event Replay Capability

  • Create separate workflow: "Replay Events"
  • Schedule or webhook trigger
-- Replay all events for an order to rebuild state
SELECT
event_id,
event_type,
event_data,
sequence_number
FROM events
WHERE aggregate_id = '<input.order_id>'
AND aggregate_type = 'order'
ORDER BY sequence_number ASC

12. Rebuild State from Events

  • Drag Loop Block to process events
Items: <database.events>
Variable name: event

13. Apply Each Event (Inside Loop)

  • Drag Agent Block
Model: GPT-4o-mini

System Prompt:
"Apply this event to build up the state. Use the same logic as the live system."

User Prompt:
"Current state: <state>
Event: <event>"

14. Verify Rebuilt State

  • Drag Condition Block after Loop
Condition: <rebuilt_state> == <snapshot_state>

If True: State is consistent
If False: State mismatch detected, alert team

Event Design Patterns

Event Structure

{
"event_id": "uuid",
"event_type": "order_created",
"aggregate_id": "order_123",
"aggregate_type": "order",
"data": {
"customer_id": "cust_456",
"items": [...],
"total": 99.99
},
"metadata": {
"user_id": "user_789",
"timestamp": "2025-11-01T12:00:00Z",
"ip_address": "192.168.1.1"
},
"version": 1
}

Event Types

Commands: Actions that will happen
- CreateOrder, CancelOrder

Events: Things that happened (past tense)
- OrderCreated, OrderCancelled

Use events, not commands

Aggregate Pattern

Aggregate: Business entity (Order, User, Account)
Events: All changes to that aggregate
Stream: All events for one aggregate

Real-World Examples

Banking Transaction System

  • Every transaction is an event
  • Account balance rebuilt from events
  • Complete audit trail for regulators
  • Time-travel to any point in history

E-commerce Order Management

  • Order lifecycle as events
  • Track every status change
  • Reconstruct order history
  • Handle disputes with evidence

Healthcare Patient Records

  • Every interaction is logged
  • HIPAA compliance built-in
  • Complete medical history
  • Cannot modify past records

Inventory Management

  • Track all stock movements
  • Audit trail for discrepancies
  • Reconstruct inventory at any date
  • Find when/why items went missing

Event Sourcing Benefits

Complete Audit Trail

  • Every change is recorded
  • Who, what, when, why
  • Compliance requirements met
  • Regulatory reporting simplified

Time Travel

  • Reconstruct state at any point
  • Debug historical issues
  • "What was the balance on Jan 15?"
  • Replay events to find bugs

Event Replay

  • Fix bugs by replaying events
  • Test new logic on old events
  • Data migration simplified
  • Build new views from events

System Integration

  • Events feed other systems
  • Loose coupling
  • Real-time updates
  • Event-driven architecture

Advanced Patterns

CQRS (Command Query Responsibility Segregation)

Write side: Store events
Read side: Build optimized views
Separate models for read/write

Snapshotting

Store state snapshot every N events
Replay from snapshot, not from beginning
Faster state reconstruction
Balance between speed and storage

Event Versioning

v1: { "name": "John" }
v2: { "first_name": "John", "last_name": "Doe" }
Handle multiple versions
Migrate old events

Event Correlation

Track related events
Link cause and effect
Distributed tracing
End-to-end visibility

Schema Design

Events Table

CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
event_id UUID UNIQUE NOT NULL,
event_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
aggregate_type VARCHAR(50) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB NOT NULL,
event_version INTEGER NOT NULL,
sequence_number BIGINT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,

INDEX idx_aggregate (aggregate_id, sequence_number),
INDEX idx_type (event_type, created_at),
INDEX idx_created (created_at)
);

-- Prevent updates and deletes
CREATE RULE no_update AS ON UPDATE TO events DO INSTEAD NOTHING;
CREATE RULE no_delete AS ON DELETE TO events DO INSTEAD NOTHING;

Snapshots Table

CREATE TABLE order_snapshots (
aggregate_id VARCHAR(100) PRIMARY KEY,
current_state JSONB NOT NULL,
version INTEGER NOT NULL,
last_event_sequence BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL
);

Performance Optimization

Snapshotting Strategy

  • Snapshot every 100 events
  • Replay from latest snapshot
  • 100x faster than full replay

Indexing

  • Index by aggregate_id + sequence
  • Fast event stream queries
  • Partition by date for old events

Archival

  • Move old events to cold storage
  • Keep snapshots for fast queries
  • Reduce primary database size

Monitoring and Analytics

Event Analytics

  • Most common event types
  • Event frequency over time
  • Event processing latency
  • Failed event correlations

State Consistency Checks

  • Compare snapshot vs replayed state
  • Alert on inconsistencies
  • Automated repair workflows

Cost

Storage costs (per 1M events):

  • Events table: ~$1-2/month (PostgreSQL)
  • Snapshots: ~$0.10-0.20/month
  • Indexes: ~$0.50-1.00/month
  • Total: ~$1.60-3.20/month

Processing costs (per 1,000 events):

  • AI state transitions: ~$0.10-0.20
  • Database operations: ~$0.01
  • Total: ~$0.11-0.21

Value:

  • Complete audit trail: Priceless for compliance
  • Debug capabilities: Save hours of investigation
  • System integration: Enable event-driven architecture
Next Step

Combine with State Management for complete workflow state tracking with full audit history!