Data Pipeline Template
Build ETL (Extract, Transform, Load) pipelines that move and transform data between systems. Use AI to clean, enrich, and standardize data automatically.
Time to build: 25 minutes Difficulty: Intermediate Perfect for: Data migration, system integration, data enrichment, reporting pipelines
What It Does
Extract from source → Transform with AI → Validate → Load to destination
Automates data movement and transformation between systems.
Step-by-Step
1. Create Workflow with Schedule
- New workflow → Name it "CRM to Data Warehouse Pipeline"
- Set trigger to Schedule:
0 */4 * * *(every 4 hours)
2. Extract: Fetch Source Data
- Drag HTTP Request Block
- Connect Schedule to HTTP Request
Method: GET
URL: https://api.crm.com/contacts?updated_since=<schedule.last_run>
Headers:
Authorization: Bearer <env.CRM_API_KEY>
3. Add Loop for Processing
- Drag Loop Block
- Connect HTTP Request to Loop
Items: <http_request.body.contacts>
Variable name: contact
4. Transform: Clean with AI
- Drag Agent Block inside Loop
- Connect Loop to Agent
Model: GPT-4o-mini (fast and cheap)
System Prompt:
"Clean and standardize this contact data. Fix formatting, standardize phone numbers, extract company domain from email, categorize industry."
User Prompt:
"Contact data: <contact>"
Output format (JSON):
{
"name": "standardized full name",
"email": "lowercase email",
"phone": "E.164 format",
"company": "company name",
"domain": "company domain",
"industry": "industry category",
"job_title": "standardized title"
}
5. Transform: Enrich Data
- Drag HTTP Request Block
- Connect Agent to HTTP Request
Method: GET
URL: https://api.clearbit.com/v2/companies/find?domain=<agent.domain>
Headers:
Authorization: Bearer <env.CLEARBIT_API_KEY>
6. Validate Data Quality
- Drag Condition Block
- Connect HTTP Request to Condition
Condition: <agent.email> matches /^[^\s@]+@[^\s@]+\.[^\s@]+$/
AND <agent.phone> is not empty
AND <http_request.body.company> is not null
If True: Data is valid, proceed to load
If False: Log error and skip
7. Load: Insert to Data Warehouse (True Path)
- Drag Database Query Block on True path
INSERT INTO contacts_warehouse (
crm_id,
name,
email,
phone,
company_name,
company_domain,
industry,
job_title,
company_size,
company_revenue,
last_synced
) VALUES (
'<contact.id>',
'<agent.name>',
'<agent.email>',
'<agent.phone>',
'<agent.company>',
'<agent.domain>',
'<agent.industry>',
'<agent.job_title>',
'<http_request.body.metrics.employees>',
'<http_request.body.metrics.revenue>',
NOW()
)
ON CONFLICT (crm_id)
DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
phone = EXCLUDED.phone,
company_name = EXCLUDED.company_name,
industry = EXCLUDED.industry,
last_synced = NOW()
8. Handle Errors (False Path)
- Drag Database Query Block on False path
INSERT INTO pipeline_errors (
source_system,
record_id,
error_type,
raw_data,
created_at
) VALUES (
'crm',
'<contact.id>',
'validation_failed',
'<contact>',
NOW()
)
9. Send Summary Report (After Loop)
- Drag Agent Block after Loop completes
Model: GPT-4o-mini
System Prompt:
"Create a brief summary of this data pipeline run."
User Prompt:
"Total contacts processed: <loop.count>
Successful: <loop.success_count>
Failed: <loop.error_count>
Duration: <workflow.duration>"
10. Notify Team
- Drag Slack Tool
Channel: #data-pipeline
Message:
"✅ CRM → Warehouse Sync Complete
<agent.content>
View errors: https://dashboard.yourapp.com/pipeline-errors"
Common Pipeline Patterns
Database to Database
- Extract from MySQL → Transform → Load to PostgreSQL
- Schedule: Hourly or real-time (webhook)
API to Database
- Extract from REST API → Enrich with AI → Load to warehouse
- Schedule: Every 4-6 hours
Database to API
- Extract from database → Format → POST to external API
- Schedule: Daily or event-driven
File to Database
- Extract from S3 CSV → Parse and clean → Load to database
- Trigger: New file uploaded
Real-World Examples
E-commerce Order Sync
- Shopify → Clean and categorize → Analytics database
- Run every 15 minutes
Marketing Data Pipeline
- Google Ads + Facebook Ads → Normalize → Marketing dashboard
- Run hourly
Financial Reconciliation
- Payment processor → Match transactions → Accounting system
- Run daily at 6 AM
Customer Data Platform
- Multiple CRMs → Deduplicate and merge → Unified customer view
- Run every 2 hours
Enhancements
Add Incremental Loading
- Track last sync timestamp
- Only process changed/new records
- Reduces API calls and processing time
Add Data Deduplication
- Check for duplicates before insert
- Merge duplicate records intelligently
- Use AI to match similar records
Add Data Quality Monitoring
- Track validation failure rate
- Alert if error rate exceeds threshold
- Generate data quality reports
Add Retry Logic
- Retry failed records with exponential backoff
- Store failed records for manual review
- Auto-retry on schedule
Add Transformation Caching
- Cache AI transformations for common values
- Reduces AI costs by 50-80%
- Faster processing
Performance Optimization
Batch Processing
- Process records in batches of 100
- Reduces database connections
- Faster overall execution
Parallel Processing
- Use Parallel Block for independent transformations
- Process multiple batches simultaneously
- 3-5x faster
Selective Fields
- Only extract/load changed fields
- Reduces data transfer
- Faster syncs
Cost
Per 1,000 records processed:
- AI transformation: ~$0.20-0.50
- Data enrichment API: ~$5-20 (Clearbit)
- Database operations: Negligible
- Total: ~$5.20-20.50
Monthly cost (10,000 records/day):
- AI: ~$60-150/month
- APIs: ~$1,500-6,000/month
- Total: ~$1,560-6,150/month
Cost optimization:
- Cache transformed data
- Batch API requests
- Use GPT-4o-mini instead of GPT-4o
- Skip enrichment for low-value records
Next Step
Try Parallel Processing to speed up your data pipeline by 3-5x!