This pillar covers how data enters the lake and how different systems integrate with it.
We use a modular, reusable framework (@stephen/datalake-sync) for all data ingestion. This framework provides:
┌─────────────┐
│ Data Source │ (Chatwoot, Calendar, Stripe, etc.)
└──────┬──────┘
│
▼
┌─────────────┐
│ Connector │ (Fetches data from source)
└──────┬──────┘
│
▼
┌─────────────┐
│ Ingester │ (Writes to Bronze)
└──────┬──────┘
│
▼
┌─────────────┐
│ Bronze │ (Raw, immutable data)
└──────┬──────┘
│
▼
┌─────────────┐
│ Processor │ (Bronze → Silver)
└──────┬──────┘
│
▼
┌─────────────┐
│ Silver │ (Cleaned, enriched)
└──────┬──────┘
│
▼
┌─────────────┐
│ Aggregator │ (Silver → Gold)
└──────┬──────┘
│
▼
┌─────────────┐
│ Gold │ (Business analytics)
└─────────────┘
bronze-communication/chatwoot/conversations/{date}/{id}.jsonsilver-communication/chatwoot/processed/{date}/{id}.jsonChatwootConnector (reads from PostgreSQL)/api/webhooks/chatwootbronze-education/calendar/events/{year}/{month}/{eventId}.jsonsilver-education/calendar/processed/{year}/{month}/{eventId}.jsonGoogleCalendarConnector (fetches from Google Calendar API)CalendarSilverProcessor (enriches with student matching, duration, time categorization)CalendarDatabaseSync (syncs to PostgreSQL Lesson table)bronze-education/notability/metadata/{subject}/{student}/{file}.json + PDF filessilver-education/{path}.metadata.json (AI analysis) + thumbnails/{path}/{size}.pngNotabilityConnector (scans MinIO for PDFs)NotabilitySilverProcessor (AI analysis with OpenAI, taxonomy classification)NotabilityDatabaseSync (syncs to PostgreSQL Note table with lesson linking)bronze-communication/stripe/payments/{date}/{id}.jsonSources are configured in /projects/datalake-sources.yaml:
sources:
- id: chatwoot
name: Chatwoot CRM
type: webhook
connector: ChatwootConnector
enabled: true
buckets:
bronze: bronze-communication
silver: silver-communication
gold: gold-analytics
options:
webhook_path: /api/webhooks/chatwoot
BaseDataSourceConnectorBaseBronzeIngesterBaseSilverProcessorBaseGoldAggregatordatalake-sources.yamlSee @stephen/datalake-sync README for detailed examples.
WebhookHandler.handle(sourceId, request)/api/datalake/webhook/{source}CronScheduler with cron expressions"*/15 * * * *" (every 15 minutes)POST /api/datalake/sync/{source}1. Chatwoot sends webhook → /api/webhooks/chatwoot
2. Webhook handler validates HMAC signature
3. Stores to PostgreSQL (existing logic)
4. Triggers datalake sync (async):
a. ChatwootConnector fetches conversation from PostgreSQL
b. ChatwootIngester writes to Bronze
c. ChatwootProcessor enriches to Silver (async)
1. Cron job triggers every 15 minutes
2. GoogleCalendarConnector fetches events from Google Calendar API
3. Groups events by student and month
4. GoogleCalendarIngester writes to Bronze
5. CalendarSilverProcessor enriches with student matching
6. GoldAggregator creates monthly statistics
SyncResult includes success/failure countsSyncResult provides duration, counts, errorsconnector.isAvailable() for source health_metadata with layer, source, version