
Building self-healing data pipelines for market intelligence
Legacy scrapers break silently. Filings change structure. APIs deprecate fields. Reliable intelligence requires pipelines that detect these failures automatically and trigger corrective action before a human notices the gap.
The failure mode no one talks about
A research platform that fails loudly is manageable. A platform that fails silently is dangerous.
Loud failures are visible: the job throws an error, the alert fires, an engineer investigates. Silent failures are the ones that matter for intelligence work: the Reddit API returns a 200 with an empty array because the subreddit went private. The SEC EDGAR filing has a new field structure and the parser quietly drops the financials section. The survey platform changes its export schema and the Likert scores are now on a 7-point scale, not a 5-point one, but the database still accepts the numbers.
All three of these scenarios produce clean-looking data. None of them produce correct data. And if the pipeline does not actively check for them, the research analyst receives a confident report built on gaps.
This post describes the engineering architecture that catches these failures before they reach the output layer.
Failure taxonomy: the three types worth engineering against
Not all pipeline failures are equal. The engineering response depends on the failure type.
Structural failures occur when the shape of incoming data changes without warning. A JSON response gains a new nesting layer. A field is renamed. A pagination schema changes. These are detected by schema validation against a known contract.
Completeness failures occur when data is missing but the pipeline does not know it. A date range query returns fewer results than expected. A source that reliably produces 200 records per week returns 12. The ingestion job succeeds, but the output is incomplete. These are detected by statistical bounds checking on volume metrics.
Staleness failures occur when data has not been refreshed within the expected window. A regulatory filing that should update monthly has not changed in 47 days. A competitor’s pricing page has not been re-fetched since the last pipeline run failed silently. These are detected by timestamp auditing at the record level.
Each failure type requires a different remediation path. The architecture below handles all three. These three types share a common assumption: you already know which sources to monitor. For the case where the source was never registered in the first place, see on-demand ingestion.
The pipeline architecture
[External Sources: Reddit API, EDGAR, Survey Platforms, PDFs]
│
▼
┌────────────────────────────────────────────────────────────┐
│ INGESTION WORKER (BullMQ + NestJS) │
│ • Fetches raw data per source, per schedule │
│ • Assigns job_id, source_id, ingestion_timestamp │
│ • Emits raw payload to Integrity Check queue │
└────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ INTEGRITY CHECK (Stateless validator) │
│ • Schema validation against registered contracts │
│ • Volume bounds check (min/max record thresholds) │
│ • Timestamp freshness audit │
│ • PASS → Schema Validation queue │
│ • FAIL → Auto-Correct Trigger queue + alert │
└────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ SCHEMA VALIDATION (Valibot contract enforcement) │
│ • Enforces typed field contracts per source │
│ • Coerces safe type differences (string "5" → number 5) │
│ • Hard rejects breaking changes │
│ • PASS → Enrichment queue │
│ • FAIL → Schema Drift alert + Auto-Correct Trigger │
└────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ ENRICHMENT WORKER │
│ • Normalises fields across sources │
│ • Appends derived metadata (sentiment, category tags) │
│ • Deduplication against existing records │
└────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ AUTO-CORRECT TRIGGER (Event-driven remediation) │
│ • Classifies failure type │
│ • Routes to: secondary source, schema updater, or alert │
│ • Re-queues corrected job with exponential backoff │
└────────────────────────────────────────────────────────────┘
│
▼
[Database: PostgreSQL + pgvector — verified, enriched records] No stage passes data downstream without explicit validation. Each worker is stateless. It receives a job, processes it, and emits an event. The queue is the shared state, not the worker. This is what makes the system testable, scalable, and recoverable.
Idempotency: the property that makes retries safe
The most important property in a self-healing pipeline is idempotency. An idempotent operation produces the same result whether it is executed once or a hundred times. Without idempotency, retries cause duplicate records, double-counted metrics, and corrupted aggregations.
Citium workers achieve idempotency through composite job keys. Every ingestion job is identified by a combination of source_id, date_range_start, and date_range_end. Before processing, the worker checks whether a completed record with that composite key already exists in the database.
// workers/ingestion.worker.ts
@Processor('ingestion')
export class IngestionWorker {
constructor(
private readonly sourceService: SourceService,
private readonly recordRepository: RecordRepository,
) {}
@Process('ingest-source')
async handleIngestion(job: Job<IngestionJobPayload>): Promise<void> {
const { sourceId, dateRangeStart, dateRangeEnd } = job.data;
// Idempotency check: skip if already successfully processed
const existingRecord = await this.recordRepository.findByCompositeKey({
sourceId,
dateRangeStart,
dateRangeEnd,
status: 'completed',
});
if (existingRecord) {
this.logger.log(`Job already processed: ${sourceId} ${dateRangeStart}→${dateRangeEnd}. Skipping.`);
return;
}
// Fetch, validate, and emit
const rawPayload = await this.sourceService.fetch(sourceId, dateRangeStart, dateRangeEnd);
await this.integrityQueue.add('check', { rawPayload, jobMeta: job.data });
}
} The retry logic uses exponential backoff with jitter to prevent thundering herd problems when a source is temporarily unavailable:
// queue configuration (app.module.ts)
BullModule.registerQueue({
name: 'ingestion',
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000, // 2s → 4s → 8s → 16s → 32s
},
removeOnComplete: false, // retain for audit
removeOnFail: false, // retain for post-mortem
},
}) Completed and failed jobs are both retained. This is not a cost oversight, it is an audit requirement. When a client asks “why is there a gap in the data on 14 November?”, the job history is the answer.
Schema drift detection
Schema drift is the silent killer of intelligence pipelines. A source that has been stable for eight months changes its response structure, and the parser does not throw an error, it just silently loses the fields it no longer recognises.
The schema validation stage registers a Valibot contract per source. When a new ingestion run produces a schema that differs from the registered contract, the worker classifies the difference:
// validators/schema.validator.ts
import * as v from 'valibot';
export function validateAgainstContract(
rawPayload: unknown,
contract: v.BaseSchema,
): SchemaValidationResult {
const result = v.safeParse(contract, rawPayload);
if (result.success) {
return { status: 'pass', data: result.output };
}
const issues = result.issues;
const isBreaking = issues.some(i => i.kind === 'schema');
return {
status: 'fail',
failureType: isBreaking ? 'breaking_schema_change' : 'additive_schema_change',
issues,
};
} Additive changes (new optional fields) are logged and the contract is auto-updated. Breaking changes (required fields gone missing, type changes on critical fields) halt the pipeline and trigger an alert. The researcher sees a “data gap detected” flag on the affected date range rather than fabricated data.
Out-of-bounds monitoring and secondary source fallback
Volume anomaly detection runs as a separate monitoring layer, querying aggregate counts per source per time window and comparing them to a rolling baseline.
When a source returns significantly fewer records than its rolling average, a configurable threshold typically two standard deviations, the Auto-Correct Trigger fires. It attempts to fill the gap from a registered secondary source before escalating to a human alert.
This is the engineering behind a guarantee that sounds simple in a client meeting: “If the primary source goes down, we do not leave a gap in your intelligence feed.” The guarantee is only possible because the monitoring layer knows what “normal” looks like for each source, and the remediation layer has a pre-configured fallback path.
The same monitoring infrastructure that detects volume anomalies feeds the observability dashboard. Every data gap, schema drift event, and retry burst is visible in the same view as infrastructure metrics. Pipeline health and system health are not separate concerns.
Why this matters for the research output
A research report built on incomplete data is not a research report. It is a guess with formatting.
The self-healing pipeline architecture described here is the engineering foundation for the Reliability guarantee in Citium’s research infrastructure. The deterministic retrieval approach in the deterministic RAG guide depends on this layer being correct: if the vector store contains records from a silently broken ingestion run, zero-temperature inference and audit trails do not save you. The correctness guarantee starts here, at the data collection boundary.
Every intelligence product is only as reliable as the least-monitored step in its pipeline. For queries that push beyond the boundaries of the current index entirely, on-demand ingestion describes how the system detects and fills those gaps at query time.