Modular framework for ingesting data from any source into the Medallion Architecture (Bronze → Silver → Gold).
This package provides a reusable, type-safe framework for syncing data from external sources (Chatwoot, Google Calendar, Stripe, etc.) into your MinIO datalake following the Medallion Architecture pattern.
Data Source → Connector → Ingester → Bronze Layer
↓
Processor → Silver Layer
↓
Aggregator → Gold Layer
npm install @stephen/datalake-sync
Edit /projects/datalake-sources.yaml:
sources:
- id: chatwoot
name: Chatwoot CRM
type: webhook
connector: ChatwootConnector
enabled: true
buckets:
bronze: bronze-communication
silver: silver-communication
gold: gold-analytics
import { SourceConfigLoader, CronScheduler, WebhookHandler } from '@stephen/datalake-sync';
import '@stephen/datalake-sync/sources/chatwoot'; // Auto-registers
const configLoader = new SourceConfigLoader();
configLoader.load('/path/to/datalake-sources.yaml');
const cronScheduler = new CronScheduler(configLoader);
const webhookHandler = new WebhookHandler(configLoader);
// Start cron jobs
cronScheduler.start();
Webhook Handler:
// POST /api/datalake/webhook/chatwoot
export async function POST(request: NextRequest) {
return webhookHandler.handle('chatwoot', request);
}
Manual Sync:
// POST /api/datalake/sync/chatwoot
import { registry, configLoader } from '@stephen/datalake-sync';
const config = configLoader.get('chatwoot');
const connector = registry.getConnector(config.connector);
const ingester = registry.getIngester('chatwoot');
const records = await connector.fetchData();
const result = await ingester.ingest(records);
import { BaseDataSourceConnector, SourceRecord } from '@stephen/datalake-sync';
export interface MySourceRecord extends SourceRecord {
id: string;
timestamp: Date;
data: { /* your data */ };
}
export class MySourceConnector extends BaseDataSourceConnector<MySourceRecord> {
readonly sourceId = 'my-source';
async fetchData(options?: FetchOptions): Promise<MySourceRecord[]> {
// Fetch from your source
return [];
}
}
import { BaseBronzeIngester } from '@stephen/datalake-sync';
import { MedallionBuckets } from '@stephen/datalake';
export class MySourceIngester extends BaseBronzeIngester<MySourceRecord> {
readonly bucket = MedallionBuckets.BRONZE_COMMUNICATION;
getPath(record: MySourceRecord): string {
const date = record.timestamp.toISOString().split('T')[0];
return `my-source/${date}/${record.id}.json`;
}
}
import { BaseSilverProcessor } from '@stephen/datalake-sync';
export class MySourceProcessor extends BaseSilverProcessor {
readonly bronzeBucket = MedallionBuckets.BRONZE_COMMUNICATION;
readonly silverBucket = MedallionBuckets.SILVER_COMMUNICATION;
async process(bronzeData: unknown): Promise<unknown> {
// Enrich, anonymize, transform
return { ...bronzeData, _enrichments: {} };
}
}
import { registry } from '@stephen/datalake-sync';
import { MySourceConnector, MySourceIngester, MySourceProcessor } from './my-source';
registry.registerConnector('MySourceConnector', MySourceConnector);
registry.registerIngester('my-source', MySourceIngester);
registry.registerProcessor('my-source', MySourceProcessor);
sources:
- id: my-source
name: My Source
type: cron
schedule: "0 * * * *"
connector: MySourceConnector
enabled: true
buckets:
bronze: bronze-communication
silver: silver-communication
IDataSourceConnector<TRecord> - Fetch data from sourceIBronzeIngester<TRecord> - Ingest to Bronze layerISilverProcessor<TBronze, TSilver> - Process Bronze → SilverIGoldAggregator<TSilver, TGold> - Aggregate Silver → GoldBaseDataSourceConnector<TRecord> - Abstract connectorBaseBronzeIngester<TRecord> - Abstract ingesterBaseSilverProcessor<TBronze, TSilver> - Abstract processorBaseGoldAggregator<TSilver, TGold> - Abstract aggregatorWebhookHandler - Handle webhook eventsCronScheduler - Schedule cron-based syncsSourceConfigLoader - Load and validate YAML configSourceRegistry - Register and retrieve connectors/ingestersMINIO_ENDPOINT=platform-minio # or localhost
MINIO_PORT=9000
MINIO_SECURE=false
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
See /projects/packages/datalake-sync/src/sources/chatwoot/ for a complete example implementation.
Private - Stephen's Privélessen