diff --git a/src/directors/anomaly-detector.service.ts b/src/directors/anomaly-detector.service.ts new file mode 100644 index 0000000..d208c6f --- /dev/null +++ b/src/directors/anomaly-detector.service.ts @@ -0,0 +1,83 @@ +import { MemoryStore } from '../stores/memory.store'; +import { RawDatum, Anomaly } from '../types'; +import { genId, nowIso, safeNumber } from '../utils'; + +/** + * Real-time anomaly detector: + * - Maintains short-window statistics per source+metric (mean, std, count) + * - Uses z-score detection for spikes + * - Uses EWMA for drift detection (configurable alpha) + * + * This example treats numeric values extracted via a metric extractor function. + */ + +type MetricExtractor = (d: RawDatum) => { metric: string; value: number } | null; + +export class AnomalyDetectorService { + private store: MemoryStore; + private stats: Map; + private alpha: number; + private zThreshold: number; + + constructor(store: MemoryStore, opts?: { alpha?: number; zThreshold?: number }) { + this.store = store; + this.stats = new Map(); + this.alpha = opts?.alpha ?? 0.3; // EWMA smoothing + this.zThreshold = opts?.zThreshold ?? 4; // conservative default + } + + private keyFor(source: string, metric: string) { + return `${source}::${metric}`; + } + + ingest(datum: RawDatum, extractor: MetricExtractor) { + const m = extractor(datum); + if (!m) return; + const key = this.keyFor(datum.source, m.metric); + const v = safeNumber(m.value); + if (v === null) return; + + // update Welford online mean/std + let s = this.stats.get(key); + if (!s) { + s = { mean: v, m2: 0, n: 1, ewma: v }; + this.stats.set(key, s); + return; + } + s.n++; + const delta = v - s.mean; + s.mean += delta / s.n; + s.m2 += delta * (v - s.mean); + + // EWMA + s.ewma = (s.ewma ?? v) * (1 - this.alpha) + v * this.alpha; + + // check z-score if n>2 + const variance = s.n > 1 ? s.m2 / (s.n - 1) : 0; + const std = Math.sqrt(variance); + const z = std === 0 ? 0 : Math.abs((v - s.mean) / std); + const reasons: string[] = []; + + if (z > this.zThreshold) { + reasons.push(`z-score=${z.toFixed(2)} > ${this.zThreshold}`); + } + // ewma-based relative jump + if (s.ewma !== undefined && Math.abs(v - s.ewma) / (Math.abs(s.ewma) + 1e-9) > 0.2) { + reasons.push(`ewma jump ${(Math.abs(v - s.ewma) / (Math.abs(s.ewma) + 1e-9) * 100).toFixed(1)}%`); + } + + if (reasons.length) { + const anomaly: Anomaly = { + id: genId('anom-'), + source: datum.source, + timestamp: datum.timestamp, + metric: m.metric, + value: v, + reason: reasons.join('; '), + severity: z > this.zThreshold ? 'high' : 'medium', + }; + this.store.pushAnomaly(anomaly); + this.store.incAnomalyCount(datum.source); + } + } +} diff --git a/src/directors/completeness.service.ts b/src/directors/completeness.service.ts new file mode 100644 index 0000000..f26515e --- /dev/null +++ b/src/directors/completeness.service.ts @@ -0,0 +1,44 @@ +import { RawDatum } from '../types'; +import dayjs from 'dayjs'; + +/** + * Completeness / gap detection: + * - for feeds expected at a cadence (e.g., price ticks per symbol), detect missing windows + * - For simplicity, track last timestamp per (source, key) and if gap > expectedInterval*2, flag + */ + +export class CompletenessService { + private lastPerKey: Map; + private expectedIntervalSec: number; + + constructor(opts?: { expectedIntervalSec?: number }) { + this.lastPerKey = new Map(); + this.expectedIntervalSec = opts?.expectedIntervalSec ?? 30; // default expected cadence 30s + } + + keyFor(source: string, key: string) { + return `${source}::${key}`; + } + + mark(datum: RawDatum, key: string) { + const k = this.keyFor(datum.source, key); + this.lastPerKey.set(k, datum.timestamp); + } + + detectGap(source: string, key: string) { + const k = this.keyFor(source, key); + const last = this.lastPerKey.get(k); + if (!last) return null; + const now = dayjs(); + const diff = now.diff(dayjs(last), 'second'); + if (diff > this.expectedIntervalSec * 2) { + return { + source, + key, + last, + gapSec: diff, + }; + } + return null; + } +} diff --git a/src/directors/correction.service.ts b/src/directors/correction.service.ts new file mode 100644 index 0000000..85ba84e --- /dev/null +++ b/src/directors/correction.service.ts @@ -0,0 +1,71 @@ +import { MemoryStore } from '../stores/memory.store'; +import { RawDatum } from '../types'; +import { safeNumber, nowIso } from '../utils'; + +/** + * Automated correction: + * - type coercion, missing value imputation (last-known, mean), clipping to valid ranges + * - For safety, apply only for common, low-risk patterns; otherwise flag for manual review. + * + * Returns { correctedDatum, actionTaken: string | null } + */ + +export class CorrectionService { + private store: MemoryStore; + private lastValues: Map; + + constructor(store: MemoryStore) { + this.store = store; + this.lastValues = new Map(); + } + + registerLast(source: string, metric: string, value: any) { + this.lastValues.set(`${source}::${metric}`, value); + } + + getLast(source: string, metric: string) { + return this.lastValues.get(`${source}::${metric}`); + } + + async attemptCorrection(datum: RawDatum): Promise<{ corrected: RawDatum; action: string | null }> { + // Basic example: if payload.price is string, cast; if missing price, impute last known + const corrected = { ...datum, payload: { ...datum.payload } }; + let action: string | null = null; + + if (corrected.payload && 'price' in corrected.payload) { + let p = corrected.payload.price; + const n = safeNumber(p); + if (n === null) { + // try to impute from last value + const last = this.getLast(corrected.source, 'price'); + if (last !== undefined) { + corrected.payload.price = last; + action = 'imputed_price_from_last'; + } else { + action = 'could_not_impute_price'; + } + } else { + // valid number -> store as last known + corrected.payload.price = n; + this.registerLast(corrected.source, 'price', n); + action = 'type_cast_price'; + } + } + + // timestamp sanity: if timestamp in future or missing, replace with now + try { + const ts = new Date(corrected.timestamp); + if (!ts || isNaN(ts.getTime()) || ts.getTime() - Date.now() > 1000 * 60 * 5) { + corrected.timestamp = nowIso(); + action = action ? `${action}; fixed_timestamp` : 'fixed_timestamp'; + } + } catch (e) { + corrected.timestamp = nowIso(); + action = action ? `${action}; fixed_timestamp` : 'fixed_timestamp'; + } + + // push to store as corrected + await this.store.pushDatum(corrected); + return { corrected, action }; + } +} diff --git a/src/directors/dashboard.controller.ts b/src/directors/dashboard.controller.ts new file mode 100644 index 0000000..879d557 --- /dev/null +++ b/src/directors/dashboard.controller.ts @@ -0,0 +1,38 @@ +import express from 'express'; +import bodyParser from 'body-parser'; +import cors from 'cors'; +import { MemoryStore } from '../stores/memory.store'; +import { ReportService } from '../reports/report.service'; +import { FreshnessService } from '../detectors/freshness.service'; + +export function createDashboardApp(store: MemoryStore, reportService: ReportService, freshness: FreshnessService) { + const app = express(); + app.use(cors()); + app.use(bodyParser.json()); + + app.get('/health', (req, res) => res.json({ ok: true })); + + app.get('/anomalies', async (req, res) => { + const source = typeof req.query.source === 'string' ? req.query.source : undefined; + const list = await store.getAnomalies(source, 1000); + res.json(list); + }); + + app.get('/sources', async (req, res) => { + const s = await store.getSourceScores(); + res.json(s); + }); + + app.get('/report', async (req, res) => { + const r = await reportService.generateReport(); + res.json(r); + }); + + app.get('/freshness/:source', (req, res) => { + const source = req.params.source; + const last = freshness.getLastSeen(source); + res.json({ source, last }); + }); + + return app; +} diff --git a/src/directors/freshness.service.ts b/src/directors/freshness.service.ts new file mode 100644 index 0000000..5e3f553 --- /dev/null +++ b/src/directors/freshness.service.ts @@ -0,0 +1,52 @@ +import { MemoryStore } from '../stores/memory.store'; +import { RawDatum } from '../types'; +import dayjs from 'dayjs'; + +/** + * Freshness monitoring: + * - track last timestamp per source + * - if last received timestamp is older than threshold, raise stale event + * - acceptance criteria: alerts on delays >15 minutes (configurable) + */ + +export class FreshnessService { + private store: MemoryStore; + private lastSeenMap: Map; + private thresholdMin: number; + + constructor(store: MemoryStore, opts?: { thresholdMin?: number }) { + this.store = store; + this.lastSeenMap = new Map(); + this.thresholdMin = opts?.thresholdMin ?? 15; // acceptance criteria default + } + + markReceived(datum: RawDatum) { + // datum.timestamp is the data's own timestamp; we store it as last seen + this.lastSeenMap.set(datum.source, datum.timestamp); + } + + async checkFreshness() { + const now = dayjs(); + for (const [source, lastTs] of this.lastSeenMap) { + const diff = now.diff(dayjs(lastTs), 'minute'); + if (diff > this.thresholdMin) { + // stale + await this.store.incStaleCount(source); + // write an anomaly-like record + await this.store.pushAnomaly({ + id: `stale-${source}-${now.toISOString()}`, + source, + timestamp: now.toISOString(), + metric: 'freshness', + value: diff, + reason: `stale by ${diff} minutes (> ${this.thresholdMin}m)`, + severity: 'high', + }); + } + } + } + + getLastSeen(source: string) { + return this.lastSeenMap.get(source) ?? null; + } +} diff --git a/src/directors/lineage.service.ts b/src/directors/lineage.service.ts new file mode 100644 index 0000000..ef4e9c8 --- /dev/null +++ b/src/directors/lineage.service.ts @@ -0,0 +1,27 @@ +import { RawDatum, DataLineage } from '../types'; +import { nowIso } from '../utils'; + +/** + * Data lineage tracker: attach simple lineage metadata to each datum. + * For production, record transforms in DB and provide lineage graph. + */ + +export class LineageService { + attach(datum: RawDatum, transformation?: string): RawDatum { + const lineage: DataLineage = { + source: datum.source, + receivedAt: nowIso(), + originalId: datum.id, + transformations: transformation ? [transformation] : [], + }; + return { ...datum, lineage }; + } + + addTransform(datum: RawDatum, transformation: string): RawDatum { + const copy = { ...datum }; + copy.lineage = copy.lineage ?? { source: datum.source, receivedAt: nowIso(), transformations: [] }; + copy.lineage.transformations = copy.lineage.transformations ?? []; + copy.lineage.transformations.push(transformation); + return copy; + } +} diff --git a/src/directors/memory.store.ts b/src/directors/memory.store.ts new file mode 100644 index 0000000..f13d17f --- /dev/null +++ b/src/directors/memory.store.ts @@ -0,0 +1,86 @@ +import { RawDatum, Anomaly, SourceScore } from '../types'; +import { nowIso } from '../utils'; + +export class MemoryStore { + private data: RawDatum[] = []; + private anomalies: Anomaly[] = []; + private sources: Map = new Map(); + + async pushDatum(d: RawDatum) { + const copy = { ...d, _receivedAt: nowIso() }; + this.data.push(copy); + this.touchSource(copy.source); + return copy; + } + + async getRecentData(source?: string, limit = 100) { + const list = source ? this.data.filter(d => d.source === source) : this.data; + return list.slice(-limit); + } + + async pushAnomaly(a: Anomaly) { + this.anomalies.push(a); + } + + async getAnomalies(source?: string, limit = 100) { + const list = source ? this.anomalies.filter(a => a.source === source) : this.anomalies; + return list.slice(-limit); + } + + async touchSource(source: string) { + const now = nowIso(); + const s = this.sources.get(source) || { + source, + score: 100, + lastSeen: now, + staleCount: 0, + anomalyCount: 0, + completenessScore: 100, + }; + s.lastSeen = now; + this.sources.set(source, s); + } + + async incAnomalyCount(source: string) { + const s = this.sources.get(source) || { + source, + score: 100, + lastSeen: nowIso(), + staleCount: 0, + anomalyCount: 0, + completenessScore: 100, + }; + s.anomalyCount++; + this.sources.set(source, s); + } + + async incStaleCount(source: string) { + const s = this.sources.get(source) || { + source, + score: 100, + lastSeen: nowIso(), + staleCount: 0, + anomalyCount: 0, + completenessScore: 100, + }; + s.staleCount++; + this.sources.set(source, s); + } + + async updateSourceScore(source: string, score: number) { + const s = this.sources.get(source) || { + source, + score, + lastSeen: nowIso(), + staleCount: 0, + anomalyCount: 0, + completenessScore: 100, + }; + s.score = score; + this.sources.set(source, s); + } + + async getSourceScores() { + return Array.from(this.sources.values()); + } +} diff --git a/src/directors/outlier-detector.service.ts b/src/directors/outlier-detector.service.ts new file mode 100644 index 0000000..48f43cc --- /dev/null +++ b/src/directors/outlier-detector.service.ts @@ -0,0 +1,48 @@ +import { RawDatum } from '../types'; +import { safeNumber } from '../utils'; + +/** + * Statistical outlier detector using rolling quantiles (IQR method). + * For streaming data, we maintain a sliding buffer per metric and compute IQR. + */ + +export class OutlierDetectorService { + private buffers: Map; + private maxBuffer: number; + private multiplier: number; // IQR multiplier e.g., 1.5 + + constructor(opts?: { maxBuffer?: number; multiplier?: number }) { + this.buffers = new Map(); + this.maxBuffer = opts?.maxBuffer ?? 1000; + this.multiplier = opts?.multiplier ?? 1.5; + } + + private bufferKey(source: string, metric: string) { + return `${source}::${metric}`; + } + + ingestNumeric(source: string, metric: string, value: number) { + const key = this.bufferKey(source, metric); + const b = this.buffers.get(key) || []; + b.push(value); + if (b.length > this.maxBuffer) b.shift(); + this.buffers.set(key, b); + + if (b.length < 10) return false; // not enough data + const sorted = [...b].sort((a, z) => a - z); + const q1 = sorted[Math.floor(sorted.length * 0.25)]; + const q3 = sorted[Math.floor(sorted.length * 0.75)]; + const iqr = q3 - q1; + const lower = q1 - this.multiplier * iqr; + const upper = q3 + this.multiplier * iqr; + return value < lower || value > upper; + } + + ingest(datum: RawDatum, extractor: (d: RawDatum) => { metric: string; value: number } | null) { + const m = extractor(datum); + if (!m) return false; + const v = safeNumber(m.value); + if (v === null) return false; + return this.ingestNumeric(datum.source, m.metric, v); + } +} diff --git a/src/directors/reliability.service.ts b/src/directors/reliability.service.ts new file mode 100644 index 0000000..00b4c58 --- /dev/null +++ b/src/directors/reliability.service.ts @@ -0,0 +1,33 @@ +import { MemoryStore } from '../stores/memory.store'; + +/** + * Compute a simple reliability score per source: + * - base 100 + * - subtract points for stale events, anomalies, and completeness gaps + * - applied smoothing + clamp to 0..100 + */ + +export class ReliabilityService { + private store: MemoryStore; + + constructor(store: MemoryStore) { + this.store = store; + } + + async recomputeAll() { + const scores = await this.store.getSourceScores(); + const recomputed = scores.map(s => { + // simple formula: + let reduction = s.staleCount * 2 + s.anomalyCount * 3 + Math.max(0, 50 - s.completenessScore) * 0.1; + let base = 100 - reduction; + base = Math.max(0, Math.min(100, base)); + s.score = Math.round(base); + return s; + }); + // update store + for (const s of recomputed) { + await this.store.updateSourceScore(s.source, s.score); + } + return recomputed; + } +} diff --git a/src/directors/reports.service.ts b/src/directors/reports.service.ts new file mode 100644 index 0000000..e64cdc3 --- /dev/null +++ b/src/directors/reports.service.ts @@ -0,0 +1,31 @@ +import { MemoryStore } from '../stores/memory.store'; +import { nowIso } from '../utils'; + +/** + * Automated data quality reports: + * - periodic snapshot of anomalies, stale counts, top 5 sources by score + */ + +export class ReportService { + private store: MemoryStore; + + constructor(store: MemoryStore) { + this.store = store; + } + + async generateReport() { + const anomalies = await this.store.getAnomalies(undefined, 1000); + const sources = await this.store.getSourceScores(); + const top = [...sources].sort((a, b) => b.score - a.score).slice(0, 10); + return { + generatedAt: nowIso(), + totalAnomalies: anomalies.length, + anomaliesRecent: anomalies.slice(-50), + topSources: top, + summary: { + totalSources: sources.length, + avgScore: sources.length ? Math.round(sources.reduce((s, x) => s + x.score, 0) / sources.length) : 100, + }, + }; + } +} diff --git a/src/directors/scheduler.service.ts b/src/directors/scheduler.service.ts new file mode 100644 index 0000000..976927c --- /dev/null +++ b/src/directors/scheduler.service.ts @@ -0,0 +1,36 @@ +import { MemoryStore } from '../stores/memory.store'; +import { FreshnessService } from '../detectors/freshness.service'; +import { ReliabilityService } from '../scoring/reliability.service'; +import { ReportService } from '../reports/report.service'; + +export class SchedulerService { + private store: MemoryStore; + private freshness: FreshnessService; + private reliability: ReliabilityService; + private report: ReportService; + private timers: NodeJS.Timeout[] = []; + + constructor(store: MemoryStore, freshness: FreshnessService, reliability: ReliabilityService, report: ReportService) { + this.store = store; + this.freshness = freshness; + this.reliability = reliability; + this.report = report; + } + + start() { + // freshness check every minute + this.timers.push(setInterval(() => this.freshness.checkFreshness(), 60 * 1000)); + // recompute reliability every 5 minutes + this.timers.push(setInterval(() => this.reliability.recomputeAll(), 5 * 60 * 1000)); + // generate report every 15 minutes + this.timers.push(setInterval(async () => { + const r = await this.report.generateReport(); + // in production: push to S3/email/DB + console.log('[DQ Report]', r.generatedAt, 'anomalies=', r.totalAnomalies); + }, 15 * 60 * 1000)); + } + + stop() { + for (const t of this.timers) clearInterval(t); + } +} diff --git a/src/directors/schema.validator.ts b/src/directors/schema.validator.ts new file mode 100644 index 0000000..816fcdf --- /dev/null +++ b/src/directors/schema.validator.ts @@ -0,0 +1,35 @@ +import { RawDatum, ValidationResult } from '../types'; + +/** + * Lightweight schema validator. + * For production, replace with Joi/zod/ajv and precompiled JSON schemas. + * + * This example knows about two example feed types: + * - price feed: payload { symbol: string, price: number } + * - orderbook snapshot: payload { symbol: string, bids: Array<[price, size]>, asks: Array<[price, size]> } + * + * Returns ValidationResult: ok + errors array + */ + +export class SchemaValidator { + validate(d: RawDatum): ValidationResult { + const errors: string[] = []; + if (!d.source) errors.push('missing source'); + if (!d.timestamp) errors.push('missing timestamp'); + if (!d.payload) errors.push('missing payload'); + + // small heuristic: if payload has price -> price feed + if (d.payload && typeof d.payload === 'object') { + if ('price' in d.payload) { + if (typeof d.payload.price !== 'number') errors.push('price must be number'); + if (!d.payload.symbol || typeof d.payload.symbol !== 'string') errors.push('price feed missing symbol'); + } else if ('bids' in d.payload && 'asks' in d.payload) { + if (!Array.isArray(d.payload.bids) || !Array.isArray(d.payload.asks)) { + errors.push('orderbook bids/asks must be arrays'); + } + } // else: allow free-form for now + } + + return { ok: errors.length === 0, errors: errors.length ? errors : undefined }; + } +} diff --git a/src/directors/types.ts b/src/directors/types.ts new file mode 100644 index 0000000..34acd81 --- /dev/null +++ b/src/directors/types.ts @@ -0,0 +1,38 @@ +export type RawDatum = { + id?: string; + source: string; + timestamp: string; // ISO + payload: any; // schema depends on feed + lineage?: DataLineage; +}; + +export type ValidationResult = { + ok: boolean; + errors?: string[]; +}; + +export type Anomaly = { + id: string; + source: string; + timestamp: string; + metric: string; + value: number; + reason: string; + severity: 'low' | 'medium' | 'high'; +}; + +export type DataLineage = { + source: string; + receivedAt: string; + transformations?: string[]; // descriptions + originalId?: string; +}; + +export type SourceScore = { + source: string; + score: number; // 0..100 + lastSeen: string; + staleCount: number; + anomalyCount: number; + completenessScore: number; +}; diff --git a/src/directors/utils.ts b/src/directors/utils.ts new file mode 100644 index 0000000..07222a6 --- /dev/null +++ b/src/directors/utils.ts @@ -0,0 +1,18 @@ +import dayjs from 'dayjs'; +import { v4 as uuidv4 } from 'uuid'; + +export const nowIso = () => dayjs().toISOString(); + +export const minuteDiff = (aIso: string, bIso: string) => { + const a = dayjs(aIso); + const b = dayjs(bIso); + return a.diff(b, 'minute'); +}; + +export const genId = (prefix = '') => `${prefix}${uuidv4()}`; + +export function safeNumber(v: any): number | null { + if (v === null || v === undefined) return null; + const n = Number(v); + return Number.isFinite(n) ? n : null; +}