Skip to main content

Data Pipeline Template

Build ETL (Extract, Transform, Load) pipelines that move and transform data between systems. Use AI to clean, enrich, and standardize data automatically.

Time to build: 25 minutes Difficulty: Intermediate Perfect for: Data migration, system integration, data enrichment, reporting pipelines


What It Does

Extract from source → Transform with AI → Validate → Load to destination

Automates data movement and transformation between systems.


Step-by-Step

1. Create Workflow with Schedule

  • New workflow → Name it "CRM to Data Warehouse Pipeline"
  • Set trigger to Schedule: 0 */4 * * * (every 4 hours)

2. Extract: Fetch Source Data

  • Drag HTTP Request Block
  • Connect Schedule to HTTP Request
Method: GET
URL: https://api.crm.com/contacts?updated_since=<schedule.last_run>

Headers:
Authorization: Bearer <env.CRM_API_KEY>

3. Add Loop for Processing

  • Drag Loop Block
  • Connect HTTP Request to Loop
Items: <http_request.body.contacts>
Variable name: contact

4. Transform: Clean with AI

  • Drag Agent Block inside Loop
  • Connect Loop to Agent
Model: GPT-4o-mini (fast and cheap)

System Prompt:
"Clean and standardize this contact data. Fix formatting, standardize phone numbers, extract company domain from email, categorize industry."

User Prompt:
"Contact data: <contact>"

Output format (JSON):
{
"name": "standardized full name",
"email": "lowercase email",
"phone": "E.164 format",
"company": "company name",
"domain": "company domain",
"industry": "industry category",
"job_title": "standardized title"
}

5. Transform: Enrich Data

  • Drag HTTP Request Block
  • Connect Agent to HTTP Request
Method: GET
URL: https://api.clearbit.com/v2/companies/find?domain=<agent.domain>

Headers:
Authorization: Bearer <env.CLEARBIT_API_KEY>

6. Validate Data Quality

  • Drag Condition Block
  • Connect HTTP Request to Condition
Condition: <agent.email> matches /^[^\s@]+@[^\s@]+\.[^\s@]+$/
AND <agent.phone> is not empty
AND <http_request.body.company> is not null

If True: Data is valid, proceed to load
If False: Log error and skip

7. Load: Insert to Data Warehouse (True Path)

  • Drag Database Query Block on True path
INSERT INTO contacts_warehouse (
crm_id,
name,
email,
phone,
company_name,
company_domain,
industry,
job_title,
company_size,
company_revenue,
last_synced
) VALUES (
'<contact.id>',
'<agent.name>',
'<agent.email>',
'<agent.phone>',
'<agent.company>',
'<agent.domain>',
'<agent.industry>',
'<agent.job_title>',
'<http_request.body.metrics.employees>',
'<http_request.body.metrics.revenue>',
NOW()
)
ON CONFLICT (crm_id)
DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
phone = EXCLUDED.phone,
company_name = EXCLUDED.company_name,
industry = EXCLUDED.industry,
last_synced = NOW()

8. Handle Errors (False Path)

  • Drag Database Query Block on False path
INSERT INTO pipeline_errors (
source_system,
record_id,
error_type,
raw_data,
created_at
) VALUES (
'crm',
'<contact.id>',
'validation_failed',
'<contact>',
NOW()
)

9. Send Summary Report (After Loop)

  • Drag Agent Block after Loop completes
Model: GPT-4o-mini

System Prompt:
"Create a brief summary of this data pipeline run."

User Prompt:
"Total contacts processed: <loop.count>
Successful: <loop.success_count>
Failed: <loop.error_count>
Duration: <workflow.duration>"

10. Notify Team

  • Drag Slack Tool
Channel: #data-pipeline
Message:
"✅ CRM → Warehouse Sync Complete

<agent.content>

View errors: https://dashboard.yourapp.com/pipeline-errors"

Common Pipeline Patterns

Database to Database

  • Extract from MySQL → Transform → Load to PostgreSQL
  • Schedule: Hourly or real-time (webhook)

API to Database

  • Extract from REST API → Enrich with AI → Load to warehouse
  • Schedule: Every 4-6 hours

Database to API

  • Extract from database → Format → POST to external API
  • Schedule: Daily or event-driven

File to Database

  • Extract from S3 CSV → Parse and clean → Load to database
  • Trigger: New file uploaded

Real-World Examples

E-commerce Order Sync

  • Shopify → Clean and categorize → Analytics database
  • Run every 15 minutes

Marketing Data Pipeline

  • Google Ads + Facebook Ads → Normalize → Marketing dashboard
  • Run hourly

Financial Reconciliation

  • Payment processor → Match transactions → Accounting system
  • Run daily at 6 AM

Customer Data Platform

  • Multiple CRMs → Deduplicate and merge → Unified customer view
  • Run every 2 hours

Enhancements

Add Incremental Loading

  • Track last sync timestamp
  • Only process changed/new records
  • Reduces API calls and processing time

Add Data Deduplication

  • Check for duplicates before insert
  • Merge duplicate records intelligently
  • Use AI to match similar records

Add Data Quality Monitoring

  • Track validation failure rate
  • Alert if error rate exceeds threshold
  • Generate data quality reports

Add Retry Logic

  • Retry failed records with exponential backoff
  • Store failed records for manual review
  • Auto-retry on schedule

Add Transformation Caching

  • Cache AI transformations for common values
  • Reduces AI costs by 50-80%
  • Faster processing

Performance Optimization

Batch Processing

  • Process records in batches of 100
  • Reduces database connections
  • Faster overall execution

Parallel Processing

  • Use Parallel Block for independent transformations
  • Process multiple batches simultaneously
  • 3-5x faster

Selective Fields

  • Only extract/load changed fields
  • Reduces data transfer
  • Faster syncs

Cost

Per 1,000 records processed:

  • AI transformation: ~$0.20-0.50
  • Data enrichment API: ~$5-20 (Clearbit)
  • Database operations: Negligible
  • Total: ~$5.20-20.50

Monthly cost (10,000 records/day):

  • AI: ~$60-150/month
  • APIs: ~$1,500-6,000/month
  • Total: ~$1,560-6,150/month

Cost optimization:

  • Cache transformed data
  • Batch API requests
  • Use GPT-4o-mini instead of GPT-4o
  • Skip enrichment for low-value records
Next Step

Try Parallel Processing to speed up your data pipeline by 3-5x!