This document provides a comprehensive overview of all migration patterns supported in the Medallion Architecture datalake. It covers forward flows, reverse flows, format conversions, cross-layer bypasses, bulk operations, and temporal operations.
The Medallion Architecture supports multiple data migration patterns to handle various scenarios:
Purpose: Aggregate AI-analyzed metadata from Silver layer into Gold analytics.
Data Flow:
Silver Layer (.metadata.json files)
↓
Gold Aggregator (generateAIAnalysisStats)
↓
Gold Layer JSON (ai_analysis_stats/{period}/{date}.json)
↓
Analytics Exporter (convertAIAnalysisStatsToCSV)
↓
CSV Files (analytics/ai_analysis_stats/{period}/{date}.csv)
↓
Looker Studio
Implementation:
.metadata.json files from silver-education/notability/Example:
// Generate AI analysis stats
const aggregator = new GoldAggregator();
await aggregator.generateAIAnalysisStats(weekStart, weekEnd, 'weekly');
// → gold-analytics/ai_analysis_stats/weekly/{date}.json
// Export to CSV
const exporter = new AnalyticsExporter();
await exporter.convertAIAnalysisStatsToCSV(weekStart, 'weekly');
// → gold-analytics/analytics/ai_analysis_stats/weekly/{date}.csv
Purpose: Ingest raw data from external sources into the datalake.
Implementation: @stephen/datalake-sync framework with connectors and ingesters.
Supported Sources:
Pattern:
Source → Connector → Ingester → Bronze Layer
Example:
// Google Calendar sync
const connector = new GoogleCalendarConnector();
const ingester = new GoogleCalendarIngester();
const events = await connector.fetchData();
await ingester.ingest(events);
// → bronze-education/calendar/events/{year}/{month}/{eventId}.json
Purpose: Clean, enrich, and standardize raw data.
Implementation: BaseSilverProcessor classes.
Supported Processors:
Pattern:
Bronze → Processor → Silver Layer
Example:
// Calendar enrichment
const processor = new CalendarSilverProcessor();
const bronzeData = await readFromBronze(path);
const silverData = await processor.process(bronzeData);
await writeToSilver(silverPath, silverData);
// → silver-education/calendar/processed/{year}/{month}/{eventId}.json
Purpose: Aggregate cleaned data into business-level analytics.
Implementation: BaseGoldAggregator classes.
Supported Aggregations:
Pattern:
Silver → Aggregator → Gold Layer
Example:
// Weekly progress aggregation
const aggregator = new GoldAggregator();
const silverData = await readSilverRange(startDate, endDate);
const goldData = await aggregator.generateStudentProgressWeekly(weekStart);
// → gold-analytics/student_progress_weekly/{date}.json
// AI analysis stats aggregation
await aggregator.generateAIAnalysisStats(weekStart, weekEnd, 'weekly');
// → gold-analytics/ai_analysis_stats/weekly/{date}.json
// Reads from: silver-education/notability/.../*.metadata.json
Purpose: Export analytics data to external systems.
Implementation: AnalyticsExporter class.
Supported Formats:
Pattern:
Gold JSON (SSOT) → Exporter → CSV/Parquet
Example:
// CSV export for Looker Studio
const exporter = new AnalyticsExporter();
await exporter.exportAllToCSV(weekStart);
// → gold-analytics/analytics/student_progress_weekly/{date}.csv
Purpose: Sync datalake data to operational database.
Implementation: syncBronzeToPostgres() methods.
Supported Syncs:
Pattern:
Bronze → Database Sync → PostgreSQL
Example:
// Calendar sync to database
const syncService = new CalendarDatabaseSync();
await syncService.syncBronzeToPostgres(startDate, endDate);
// → PostgreSQL lessons table
Purpose: Reprocess Gold aggregations when aggregation logic changes.
Use Cases:
Implementation: BaseGoldToSilverReprocessor
Pattern:
Gold → Reprocessor → Silver (for reprocessing)
Example:
// Reprocess Gold aggregations
const reprocessor = new GoldToSilverReprocessor();
await reprocessor.reprocessGoldToSilver(dateRange);
// Reads Gold, extracts Silver components, writes back to Silver
Best Practices:
Purpose: Reprocess Silver data when processing logic changes.
Use Cases:
Implementation: BaseSilverToBronzeReprocessor
Pattern:
Silver → Reprocessor → Bronze (for reprocessing)
Example:
// Reprocess Silver data
const reprocessor = new SilverToBronzeReprocessor();
await reprocessor.reprocessSilverToBronze(dateRange);
// Reads Silver, extracts Bronze components, writes back to Bronze
Best Practices:
Purpose: Backfill historical data from PostgreSQL to Bronze layer.
Use Cases:
Implementation: PostgreSQLToBronzeSync
Pattern:
PostgreSQL → Reverse Sync → Bronze
Example:
// Backfill from PostgreSQL
const backfill = new PostgreSQLToBronzeSync();
await backfill.syncPostgresToBronze(startDate, endDate);
// → bronze-education/calendar/events/{year}/{month}/{eventId}.json
Best Practices:
Purpose: Export Gold layer JSON to analytics-friendly formats.
Status: ✅ Implemented via AnalyticsExporter
Pattern:
Gold JSON (SSOT) → Exporter → CSV/Parquet
Purpose: Import CSV data back to Gold layer for recovery or external data import.
Use Cases:
Implementation: AnalyticsImporter
Pattern:
CSV → Importer → Gold JSON
Example:
// Import CSV to JSON
const importer = new AnalyticsImporter();
const jsonData = await importer.importCSVToJSON(csvPath);
await writeToGold(goldPath, jsonData);
Best Practices:
Purpose: Import Parquet data back to Gold layer.
Use Cases:
Implementation: AnalyticsImporter
Pattern:
Parquet → Importer → Gold JSON
Example:
// Import Parquet to JSON
const importer = new AnalyticsImporter();
const jsonData = await importer.importParquetToJSON(parquetPath);
await writeToGold(goldPath, jsonData);
Purpose: Skip Bronze layer for trusted, pre-processed sources.
Use Cases:
Implementation: BaseDirectSilverIngester
Pattern:
Trusted Source → Direct Ingester → Silver (bypass Bronze)
Example:
// Direct to Silver (requires explicit config flag)
const ingester = new DirectSilverIngester();
if (config.allowDirectSilver) {
await ingester.ingestDirectToSilver(trustedData);
}
Best Practices:
Purpose: Skip aggregation for pre-aggregated Silver data.
Use Cases:
Implementation: BaseDirectGoldIngester
Pattern:
Pre-aggregated Silver → Direct Ingester → Gold (bypass Aggregator)
Example:
// Direct to Gold (requires explicit config flag)
const ingester = new DirectGoldIngester();
if (config.allowDirectGold) {
await ingester.ingestDirectToGold(preAggregatedData);
}
Best Practices:
allowDirectGold flagPurpose: Efficiently import large volumes of data.
Use Cases:
Implementation: BulkOperations utility
Pattern:
Large Dataset → Bulk Importer → Bronze/Silver/Gold
Example:
// Bulk import with progress tracking
const bulkOps = new BulkOperations();
await bulkOps.bulkImport({
source: externalSystem,
target: 'bronze-education',
batchSize: 1000,
onProgress: (progress) => console.log(`Progress: ${progress}%`)
});
Features:
Purpose: Efficiently export large volumes of data.
Use Cases:
Implementation: BulkOperations utility
Pattern:
Bronze/Silver/Gold → Bulk Exporter → External System
Example:
// Bulk export with progress tracking
const bulkOps = new BulkOperations();
await bulkOps.bulkExport({
source: 'gold-analytics',
target: externalSystem,
dateRange: { start, end },
onProgress: (progress) => console.log(`Progress: ${progress}%`)
});
Purpose: Query data at a specific point in time.
Use Cases:
Implementation: TemporalOperations framework
Pattern:
Query → Time Travel API → Historical Data
Example:
// Query data at specific timestamp
const temporal = new TemporalOperations();
const data = await temporal.queryAtTimestamp(
'gold-analytics',
'student_progress_weekly',
timestamp
);
How It Works:
Purpose: Reprocess historical data with new logic.
Use Cases:
Implementation: TemporalOperations framework
Pattern:
Historical Data → Reprocessor → Updated Data
Example:
// Reprocess historical data
const temporal = new TemporalOperations();
await temporal.reprocessHistorical({
layer: 'silver',
dateRange: { start: '2024-01-01', end: '2024-12-31' },
processor: new UpdatedProcessor()
});
Best Practices:
Purpose: Manage schema evolution and data versions.
Use Cases:
Implementation: Version metadata in file paths or sidecar files
Pattern:
Data → Version Manager → Versioned Storage
Example:
// Versioned data storage
const versioned = new VersionedStorage();
await versioned.store({
data: goldData,
schema: 'v2',
timestamp: new Date()
});
// → gold-analytics/v2/student_progress_weekly/{date}.json
POST /api/datalake/reprocess/gold-to-silver - Reprocess Gold to SilverPOST /api/datalake/reprocess/silver-to-bronze - Reprocess Silver to BronzePOST /api/datalake/sync/postgres-to-bronze - Backfill from PostgreSQLPOST /api/datalake/bulk/import - Bulk import dataPOST /api/datalake/bulk/export - Bulk export dataGET /api/datalake/bulk/status/{jobId} - Check bulk operation statusGET /api/datalake/temporal/query?timestamp={ts}&path={path} - Time travel queryPOST /api/datalake/temporal/reprocess - Historical reprocessing