Back to Projects
Data Engineering

ETL Pipeline Architecture

"Building production-grade data infrastructure for enterprise operations"

0
DB Migrations
0
Repositories
0
Services
0
API Endpoints
0
Workers
0
Entity Types

░▒▓ Overview

Built a production-grade data platform for a multi-tenant SaaS application serving manufacturing and compliance operations. The platform handles real-time data processing, ERP integrations, and analytics for enterprise customers.

Designed the complete data layer from schema architecture through ETL pipelines to external system integrations—ensuring data flows reliably across the entire stack.

░▒▓ Database Architecture

Designed and implemented a comprehensive multi-tenant PostgreSQL schema with Row-Level Security enforcing tenant isolation at the database level.

Core Domain Models
Database Schema (~300 migrations):
├── Organizations (multi-tenancy root)
├── Inventory System
│   ├── stock_items, stock_levels, stock_lots
│   ├── stock_locations, stock_transactions
│   ├── stock_reservations, stock_cost_layers
│   └── stock_serials
├── Manufacturing
│   ├── work_orders, work_centers, routings
│   ├── boms (bills of materials)
│   └── mrp_planning
├── Supply Chain
│   ├── customers, suppliers
│   ├── sales_orders, purchase_orders
│   └── supplier_evaluations
└── Compliance
    ├── documents, document_versions
    ├── training_records, competencies
    └── risks, risk_assessments
Key Design Decisions
ARCHITECTURAL PATTERNS
═══════════════════════════════════════════════════════════════

Row-Level Security (RLS)
├── All tables enforce tenant isolation at DB level
├── Policies attached to organization_id column
└── Zero application-level tenant filtering bugs possible

Audit Trails
├── created_by, updated_by on all transactional tables
├── created_at, updated_at with automatic timestamps
└── Full history for compliance requirements

Soft Deletes
├── deleted_at, deleted_by columns
├── Data preservation for compliance audits
└── Recoverable deletes without data loss

Strategic Indexing
├── organization_id on every table (tenant queries)
├── Foreign key indexes for join performance
└── Composite indexes for common query patterns

═══════════════════════════════════════════════════════════════

░▒▓ ETL Pipelines

Designed and implemented 7 production workers for async data processing using BullMQ with Redis-backed job queues.

Job Queue Architecture
┌─────────────────────────────────────────────────────────────────┐
│                    Job Queue Architecture                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────┐    ┌─────────┐    ┌──────────────────────┐       │
│   │  API    │───▶│  Redis  │───▶│  BullMQ Workers      │       │
│   │ Request │    │  Queue  │    │                      │       │
│   └─────────┘    └─────────┘    │  ├─ Document Proc.   │       │
│                                  │  ├─ Gap Analysis     │       │
│                                  │  ├─ RFQ Extraction   │       │
│                                  │  ├─ Competency Gap   │       │
│                                  │  ├─ Email Notifs     │       │
│                                  │  ├─ Compliance Scan  │       │
│                                  │  └─ Trial Lifecycle  │       │
│                                  └──────────────────────┘       │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘
Worker Registry
┌─────────────────────┬─────────────────────────┬────────────────────────────┐
│ Worker              │ Queue                   │ Function                   │
├─────────────────────┼─────────────────────────┼────────────────────────────┤
│ Document Processing │ document-queue          │ OCR, classification,       │
│                     │                         │ entity extraction          │
├─────────────────────┼─────────────────────────┼────────────────────────────┤
│ Gap Analysis        │ gap-analysis-queue      │ Compliance assessment      │
│                     │                         │ via AI orchestration       │
├─────────────────────┼─────────────────────────┼────────────────────────────┤
│ RFQ Processor       │ rfq-queue               │ Quote extraction,          │
│                     │                         │ product matching           │
├─────────────────────┼─────────────────────────┼────────────────────────────┤
│ Competency Gap      │ competency-gap-queue    │ Skills gap detection,      │
│                     │                         │ auto-CAPA generation       │
├─────────────────────┼─────────────────────────┼────────────────────────────┤
│ Email Notifications │ email-queue             │ Queue-based email          │
│                     │                         │ dispatch                   │
├─────────────────────┼─────────────────────────┼────────────────────────────┤
│ Compliance Scanner  │ compliance-notif-queue  │ Overdue item detection     │
│                     │                         │ and alerting               │
├─────────────────────┼─────────────────────────┼────────────────────────────┤
│ Trial Lifecycle     │ trial-queue             │ Subscription management    │
│                     │                         │ and expiration             │
└─────────────────────┴─────────────────────────┴────────────────────────────┘

FEATURES:
├── Real-time progress tracking via Server-Sent Events (SSE)
├── Job retry logic with exponential backoff
├── Dead letter queues for failed jobs
└── Job prioritization and rate limiting

░▒▓ Data Import/Export System

Built a robust import system supporting 10 entity types with AI-assisted column mapping and comprehensive validation.

Import Pipeline
┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
│   Upload   │───▶│   Parse    │───▶│  Validate  │───▶│   Execute  │
│ (XLSX/CSV) │    │ (SheetJS)  │    │   (Zod)    │    │  (Upsert)  │
└────────────┘    └────────────┘    └────────────┘    └────────────┘
                        │
                        ▼
                 ┌────────────┐
                 │ AI Analyze │
                 │  (OpenAI)  │
                 │            │
                 │ Auto-map   │
                 │ columns to │
                 │ schema     │
                 └────────────┘

SUPPORTED ENTITY TYPES:
├── Products / Stock Items
├── Customers & Suppliers
├── Sales Orders & Purchase Orders
├── Work Orders
├── Warehouse Locations
├── Opening Balances
├── Manufacturing Routings
├── Bills of Materials (BOMs)
├── Training Records
└── Equipment / Calibrations

░▒▓ ERP Integrations

Built a pluggable integration framework supporting multiple ERP systems with bidirectional sync and conflict resolution.

Integration Framework
Integration Framework
├── core/
│   ├── sync-engine.ts        # Main orchestrator
│   ├── base-integration.ts   # Abstract base class
│   ├── provider-registry.ts  # Provider management
│   ├── conflict-resolver.ts  # Conflict resolution
│   └── entity-mapper.ts      # Field mapping engine
└── providers/
    ├── quickbooks/           # OAuth 2.0, REST API
    └── odoo/                 # XML-RPC protocol

───────────────────────────────────────────────────────────────

QUICKBOOKS INTEGRATION
├── Auth: OAuth 2.0 with automatic token refresh
├── Entities: Customers, Suppliers, Products
├── Sync: Bidirectional push/pull
└── API: Intuit SDK

ODOO INTEGRATION
├── Protocol: XML-RPC via odoo-xmlrpc
├── Entities: 12 entity types
└── Sync: Dependency-aware ordering
Odoo Sync Order (Dependency-Aware)
SYNC EXECUTION ORDER
═══════════════════════════════════════════════════════════════

 1. UOMs                 ──▶  Base unit definitions
 2. Work Centers         ──▶  Manufacturing capacity
 3. Stock Locations      ──▶  Warehouse structure
 4. Customers & Suppliers──▶  Business partners
 5. Stock Items          ──▶  Products (depends on UOMs)
 6. BOMs                 ──▶  Bills of materials (depends on items)
 7. Stock Lots           ──▶  Lot tracking (depends on items)
 8. Stock Levels         ──▶  Inventory quantities (pull-only)
 9. Sales Orders         ──▶  Customer orders
10. Purchase Orders      ──▶  Supplier orders
11. Work Orders          ──▶  Manufacturing jobs

SYNC ENGINE FEATURES:
├── Inline sync for small datasets (<500 records)
├── Queued async sync for larger datasets
├── Conflict resolution (last-write-wins, manual review)
├── Field mapping with transformations
└── Error handling with retry logic

═══════════════════════════════════════════════════════════════

░▒▓ Analytics Services

Built specialized services for real-time analytics and KPI calculation across inventory, manufacturing, and compliance domains.

[I]

Inventory Service

Stock levels, movements, valuations with FIFO, LIFO, and weighted average costing methods.

[M]

MRP Service

Material requirements planning with demand forecasting and production scheduling.

[A]

ATP Service

Available-to-promise calculations considering reservations, allocations, and incoming stock.

[K]

KPI Calculator

Auto-calculates ~22 operational metrics across supplier, quality, compliance, and customer dimensions.

[H]

Health Score Engine

Weighted compliance health score (0-100) with trend analysis and threshold-based alerting.

░▒▓ AI/ML Data Pipelines

Built AI-powered data pipelines for document intelligence and automated compliance assessment.

Document Intelligence Pipeline
DOCUMENT PROCESSING FLOW
═══════════════════════════════════════════════════════════════

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  Upload  │───▶│  Vision  │───▶│ Classify │───▶│ Extract  │
│  (PDF/   │    │   API    │    │  (GPT-4) │    │ Entities │
│   IMG)   │    │  (OCR)   │    │          │    │          │
└──────────┘    └──────────┘    └──────────┘    └──────────┘
                                      │               │
                                      ▼               ▼
                               ┌──────────┐    ┌──────────┐
                               │ Document │    │  Match   │
                               │  Types:  │    │ to ERP   │
                               │ SOP,Form │    │ Records  │
                               │ Policy...│    │ (Fuzzy)  │
                               └──────────┘    └──────────┘
                                                     │
                                                     ▼
                                              ┌──────────┐
                                              │ Generate │
                                              │ Clause   │
                                              │Embeddings│
                                              └──────────┘

───────────────────────────────────────────────────────────────

GAP ANALYSIS PIPELINE

Organization    Requirements    AI          Findings    Report
   Data     ───▶   Mapping   ───▶ Analysis ───▶  List  ───▶ Gen
     │                              │
     │                              │
     ▼                              ▼
 Collect all         Multi-step assessment
 compliance          with progress events
 evidence            via SSE streaming

═══════════════════════════════════════════════════════════════

░▒▓ Tech Stack

Database

PostgreSQL
Primary database with RLS
Supabase
Managed PostgreSQL + Auth
PL/pgSQL
Stored procedures and functions

Job Queue

BullMQ
Redis-backed job queues
Redis
Queue storage and caching

Data Processing

SheetJS
XLSX import/export
Papa Parse
CSV processing
Zod
Schema validation

Integrations

OAuth 2.0
QuickBooks authentication
XML-RPC
Odoo protocol
OpenAI API
GPT-4, Vision, Embeddings

░▒▓ Key Achievements

01

~300 Migrations with Zero Data Loss

Scaled schema evolution across production deployments with comprehensive migration testing and rollback procedures.

02

7 Production Workers Processing Async Jobs

Built reliable job queue system with retry logic, dead letter queues, and real-time progress tracking via SSE.

03

Multi-Tenant Architecture with RLS

Designed database-level tenant isolation ensuring zero cross-tenant data leakage by design.

04

ERP Integrations Syncing 12 Entity Types

Implemented bidirectional sync with QuickBooks and Odoo including dependency-aware execution ordering.

05

AI-Powered Document Pipeline

Built OCR, classification, and entity extraction system with fuzzy matching to existing ERP records.

░▒▓ What I Learned

>>

Multi-Tenant Schema Design

Designing schemas for SaaS with proper isolation using RLS, strategic indexing, and audit trail patterns.

::

Reliable Job Queue Architecture

Building production job queues with BullMQ including retry logic, dead letters, and progress streaming.

/\

External API Integration Patterns

Integrating with OAuth 2.0, XML-RPC, and REST APIs with proper error handling and token refresh.

$$

AI for Data Processing

Using LLMs for document classification, entity extraction, and intelligent column mapping.