
On-demand ingestion: building systems that know what they don't know
Self-healing pipelines fix data that arrived broken. But what about data you never knew you needed? Here is how to build a system that detects knowledge gaps at query time and ingests the missing source before returning an answer.
The gap that self-healing pipelines cannot fix
There is a class of data problem that monitoring, schema validation, and retry logic cannot solve. Not because those tools are insufficient, but because the problem is upstream of them.
Self-healing pipelines are designed around a known corpus. You have decided what sources to monitor, registered their schemas, set their volume thresholds, and configured fallback paths. The system protects the integrity of data you already knew you needed.
But what about data you did not know you needed until a user asked a question that required it?
This is a fundamentally different failure mode. The pipeline is healthy. The ingestion jobs completed without error. The vector store is consistent and current. And yet the system cannot answer the question in front of it because the relevant source was never ingested in the first place.
Pre-ingesting every possible source is not a solution. For any domain with a large and continuously expanding body of source material, the cost and complexity of exhaustive upfront ingestion is prohibitive. The corpus is too large, too dynamic, and too unpredictable. You cannot know in advance which documents a user will need.
The solution is to move ingestion out of the setup phase and into the query path. When the system detects that it cannot answer a question fully with its current knowledge, it fetches and indexes the missing source material before returning a response.
What a knowledge gap looks like
A knowledge gap is not an error. The retrieval pipeline runs normally, the vector search completes, and chunks are returned. The gap is detectable only by evaluating the quality of what was retrieved against the requirements of the query.
There are two reliable signals:
Retrieval confidence below threshold. The semantic similarity scores of the top-k retrieved chunks fall below a configured minimum. The system found the closest matches it had, but none of them are close enough to constitute a relevant answer. This indicates the query is asking about a domain that is underrepresented or absent in the current index.
Coverage gap detection. The query contains identifiable entities, such as a named regulation, a specific filing, a dated publication, or a named organisation, that do not appear in any retrieved chunk. The system can answer questions about the topic in general but cannot answer this specific question because the specific source has not been ingested.
Both signals are computable before the summarisation step runs. Detecting them early means the system can branch: proceed to summarisation if coverage is sufficient, or trigger on-demand ingestion if it is not.
The architecture
User query
│
▼
┌─────────────────────────────────────────────────────────────┐
│ RETRIEVE (standard RAG) │
│ • Semantic search against current index │
│ • Returns top-k chunks with similarity scores │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ COVERAGE EVALUATOR │
│ • Checks max similarity score against threshold │
│ • Extracts named entities from query │
│ • Checks entity coverage in retrieved chunks │
│ • SUFFICIENT → proceed to summarisation │
│ • INSUFFICIENT → Knowledge Gap Detector │
└─────────────────────────────────────────────────────────────┘
│ (gap detected)
▼
┌─────────────────────────────────────────────────────────────┐
│ KNOWLEDGE GAP DETECTOR │
│ • Classifies gap type (domain, entity, temporal) │
│ • Resolves source candidates for missing material │
│ • Checks ingestion queue: already pending? wait. else add. │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ ON-DEMAND INGESTION WORKER (BullMQ, high-priority queue) │
│ • Fetches identified source │
│ • Runs standard integrity check and schema validation │
│ • Indexes into vector store │
│ • Emits ingestion-complete event │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ RE-RETRIEVE │
│ • Repeats original query against updated index │
│ • If coverage now sufficient: proceed to summarisation │
│ • If still insufficient: return partial answer + gap flag │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ SUMMARISATION (zero-temperature, source-anchored) │
│ • Standard deterministic RAG inference │
│ • Newly ingested source available as context │
└─────────────────────────────────────────────────────────────┘
│
▼
Response + audit trail (newly ingested source permanently indexed) The newly ingested source is not discarded after the query completes. It becomes part of the permanent index, available for all future queries. On-demand ingestion is a one-time cost per source. The second user to ask a question that requires the same source gets a fast answer from the existing index.
The coverage evaluator
The coverage evaluator sits between the retrieval step and the summarisation step. It makes a binary decision: proceed or pause.
// evaluators/coverage.evaluator.ts
export interface CoverageResult {
sufficient: boolean;
maxSimilarityScore: number;
missingEntities: string[];
gapType: 'domain' | 'entity' | 'temporal' | null;
}
export class CoverageEvaluator {
private readonly SIMILARITY_THRESHOLD = 0.72;
async evaluate(
query: string,
retrievedChunks: RetrievedChunk[],
): Promise<CoverageResult> {
const maxSimilarityScore = Math.max(...retrievedChunks.map(c => c.similarityScore));
// Extract named entities from the query
const queryEntities = await this.entityExtractor.extract(query);
// Check which entities appear in retrieved chunks
const retrievedText = retrievedChunks.map(c => c.content).join(' ').toLowerCase();
const missingEntities = queryEntities.filter(
entity => !retrievedText.includes(entity.toLowerCase())
);
const sufficient =
maxSimilarityScore >= this.SIMILARITY_THRESHOLD &&
missingEntities.length === 0;
return {
sufficient,
maxSimilarityScore,
missingEntities,
gapType: sufficient ? null : this.classifyGap(maxSimilarityScore, missingEntities),
};
}
private classifyGap(
maxScore: number,
missingEntities: string[],
): CoverageResult['gapType'] {
if (missingEntities.length > 0) return 'entity';
if (maxScore < this.SIMILARITY_THRESHOLD) return 'domain';
return 'temporal';
}
} The gap type informs the ingestion strategy. An entity gap means a specific named source is missing and can often be fetched directly. A domain gap means the entire subject area is underrepresented, which may require ingesting a broader set of sources. A temporal gap means the relevant material exists in the index but is outdated, requiring a refresh rather than a new ingestion.
The knowledge gap detector
When the coverage evaluator returns a gap, the detector resolves which sources to fetch and queues the ingestion job.
// detectors/knowledge-gap.detector.ts
@Injectable()
export class KnowledgeGapDetector {
async resolve(
query: string,
coverageResult: CoverageResult,
): Promise<GapResolution> {
const sourceCandidates = await this.sourceResolver.resolve({
query,
missingEntities: coverageResult.missingEntities,
gapType: coverageResult.gapType,
});
if (sourceCandidates.length === 0) {
return { resolvable: false, reason: 'no-source-candidates' };
}
const ingestionJobs: IngestionJob[] = [];
for (const source of sourceCandidates) {
// Idempotency: skip if already indexed or ingestion already pending
const alreadyIndexed = await this.vectorStore.sourceExists(source.id);
const alreadyQueued = await this.ingestionQueue.isQueued(source.id);
if (alreadyIndexed || alreadyQueued) continue;
const job = await this.ingestionQueue.add(
'on-demand-ingest',
{ sourceId: source.id, sourceUrl: source.url, priority: 'high' },
{ priority: 1 }, // BullMQ priority: lower number = higher priority
);
ingestionJobs.push(job);
}
return {
resolvable: ingestionJobs.length > 0,
jobs: ingestionJobs,
};
}
} The idempotency check matters here for the same reason it matters in scheduled ingestion pipelines: two users asking similar questions simultaneously should not trigger two parallel ingestion jobs for the same source. The second request waits for the first ingestion to complete, then re-retrieves from the now-populated index.
Handling the wait
On-demand ingestion introduces latency into the query path. Fetching, parsing, chunking, embedding, and indexing a source document takes seconds to minutes depending on its size. The user experience needs to handle this gracefully.
The approach is to split the response into two phases:
In the first phase, the system returns immediately with whatever partial answer the current index supports, along with an explicit signal that additional source material is being ingested and the answer will be updated. This is not a degraded experience, it is an honest one. The user knows the system is working and why.
In the second phase, when the ingestion-complete event fires, the system re-runs the query and pushes the updated answer. In a web interface this is a streaming update. In an asynchronous workflow this is a webhook or a polling endpoint.
The partial answer in phase one also has value. If the current index has adjacent knowledge, the partial answer gives the user something to work with while the gap is being filled. The system is never simply stuck.
Why this changes the economics of knowledge infrastructure
The conventional approach to building a domain-specific research tool requires upfront decisions about scope. What sources will the system cover? What documents need to be ingested before launch? What happens when a user asks about something outside that scope?
On-demand ingestion changes the answer to the last question from “the system cannot help” to “the system will find out and get back to you in this session.”
This has a compounding effect. Every on-demand ingestion permanently expands the index. A system that launches covering a defined corpus grows organically toward the edges of its domain as users ask questions that push those boundaries. The index becomes more capable over time without requiring manual curation of what to add next.
The boundary of the system is no longer defined at build time. It is defined by what users actually need, discovered at query time, and preserved permanently for everyone who follows.
This connects directly to the traceability guarantees described elsewhere in this series. Every on-demand ingestion event is recorded in the chain of custody: what triggered it, what source was fetched, when it was indexed, and which query first required it. The audit trail covers not just the answer but the moment the system learned what it needed to know to produce it.