Skip to content

Embedding Pipeline

The embedding pipeline is the core process that runs when a thread transitions to dormant. It extracts facts from the conversation, embeds them as vectors, deduplicates against existing memories, and saves the unique ones.

import { runEmbeddingPipeline } from "vitamem";

In normal usage, you do not call this function directly — the Vitamem facade runs it automatically during triggerDormantTransition() and sweepThreads(). It is exported for advanced use cases where you need direct control over the pipeline.


Process a dormant thread’s messages into deduplicated, embedded memories.

async function runEmbeddingPipeline(
thread: Thread,
messages: Message[],
llm: LLMAdapter,
storage: StorageAdapter,
deduplicationThreshold?: number,
supersedeThreshold?: number,
embeddingConcurrency?: number,
autoPinRules?: AutoPinRule[],
structuredRules?: StructuredExtractionRule[],
enableReflection?: boolean,
reflectionPrompt?: string,
): Promise<EmbeddingPipelineResult>;
ParameterTypeDefaultDescription
threadThreadThe thread that transitioned to dormant. Used for userId and threadId when saving memories.
messagesMessage[]The full conversation history to extract facts from.
llmLLMAdapterThe LLM adapter used for extraction (extractMemories), embedding (embed), and reflection (chat).
storageStorageAdapterThe storage adapter for reading existing memories, saving new ones, and updating profiles.
deduplicationThresholdnumber0.92Cosine similarity threshold for deduplication. Facts with similarity >= this value to any existing memory are considered duplicates and discarded.
supersedeThresholdnumber0.75Cosine similarity threshold for superseding. Facts with similarity >= this value and < deduplicationThreshold update the existing memory in-place.
embeddingConcurrencynumber5Maximum number of concurrent embedding API calls. Controls how many llm.embed() calls run in parallel.
autoPinRulesAutoPinRule[][]Rules that automatically pin critical memories during save/supersede.
structuredRulesStructuredExtractionRule[]undefinedRules for classifying extracted facts into structured profile fields. Facts matching rules are routed to profile updates instead of the embedding pipeline.
enableReflectionbooleanfalseEnable a second LLM call to validate and enrich extracted facts before embedding.
reflectionPromptstringbuilt-inCustom prompt override for the reflection LLM call.
interface EmbeddingPipelineResult {
memoriesSaved: number; // Facts that passed deduplication and were saved as new memories
memoriesDeduped: number; // Facts that were discarded as duplicates
memoriesSuperseded: number; // Facts that updated an existing memory in-place
totalExtracted: number; // Total facts extracted by the LLM (before structured classification)
profileFieldsUpdated: number; // Structured facts routed to user profile updates
reflection?: {
factsModified: number; // Facts enriched by the reflection pass
factsRemoved: number; // Facts removed by the reflection pass
missedFactsAdded: number; // New facts discovered by the reflection pass
conflictsFound: number; // Conflicts detected between new and existing facts
};
}

For freeform facts (those not routed to profile fields), the relationship memoriesSaved + memoriesDeduped + memoriesSuperseded equals the number of facts that entered the embedding pipeline. The reflection field is only present when enableReflection is true.

The function executes these steps in order:

  1. Extract facts — Calls llm.extractMemories(messages, sessionDate) to get structured facts from the conversation. Each fact has a content string, a source ("confirmed" or "inferred"), and optional tags and profile fields. The session date is derived from thread.lastMessageAt.

  2. Reflection pass (optional) — If enableReflection is true, calls reflectOnExtraction() to validate and enrich the extracted facts via a second LLM call. Catches contradictions, enriches vague facts, and detects missed information. On failure, the original facts are kept unchanged.

  3. Structured classification (optional) — If structuredRules are provided and the storage adapter supports updateProfileField, classifies facts against the rules. Matching facts are routed to profile field updates (no embedding needed). Only the remaining freeform facts continue to the embedding pipeline.

  4. Embed facts — Calls llm.embed(fact.content) for each freeform fact. This step respects the embeddingConcurrency parameter, running at most N embedding calls in parallel using a worker pool.

  5. Load existing memories — Calls storage.getMemories(thread.userId) to get all current memories for the user, needed for deduplication.

  6. Classify and persist — Compares each new fact’s embedding against all existing memory embeddings (and already-accepted new facts from this batch) using cosine similarity. Each fact is classified as one of:

    • Skip (similarity >= deduplicationThreshold): duplicate, discarded
    • Supersede (similarity >= supersedeThreshold and < deduplicationThreshold): updates the existing memory in-place via storage.updateMemory(), preserving the higher-confidence source and existing pin status
    • Save (similarity < supersedeThreshold): saved as a new memory via storage.saveMemory()

    During save and supersede, auto-pin rules are evaluated and matching memories are automatically pinned.

import { runEmbeddingPipeline } from "vitamem";
const result = await runEmbeddingPipeline(
dormantThread,
messages,
llmAdapter,
storageAdapter,
0.90, // slightly lower dedup threshold
0.75, // supersede threshold
3, // conservative concurrency for rate-limited APIs
);
console.log(`Extracted: ${result.totalExtracted}`);
console.log(`Saved: ${result.memoriesSaved}`);
console.log(`Superseded: ${result.memoriesSuperseded}`);
console.log(`Deduped: ${result.memoriesDeduped}`);
console.log(`Profile fields updated: ${result.profileFieldsUpdated}`);

The embeddingConcurrency parameter controls how many llm.embed() calls execute simultaneously. This is implemented using a worker pool pattern where N workers each process one item at a time from a shared queue.

concurrency = 3, facts = 8
Worker 1: [fact1] [fact4] [fact7]
Worker 2: [fact2] [fact5] [fact8]
Worker 3: [fact3] [fact6]
ScenarioRecommended value
OpenAI (paid tier)5 (default)
OpenAI (free tier)2-3
Anthropic + OpenAI embeddings5
Ollama (local, single GPU)1 (Ollama processes requests sequentially)
Ollama (multi-GPU / cluster)3-5 (match your throughput capacity)
Custom embedding serviceDepends on your service’s capacity

The facade passes config.embeddingConcurrency (default: 5) to this function. Set it in your createVitamem config:

const mem = await createVitamem({
provider: "openai",
apiKey: process.env.OPENAI_API_KEY,
storage: "supabase",
supabaseUrl: process.env.SUPABASE_URL,
supabaseKey: process.env.SUPABASE_KEY,
embeddingConcurrency: 3, // conservative
});

The result type returned by runEmbeddingPipeline().

interface EmbeddingPipelineResult {
memoriesSaved: number;
memoriesDeduped: number;
memoriesSuperseded: number;
totalExtracted: number;
profileFieldsUpdated: number;
reflection?: {
factsModified: number;
factsRemoved: number;
missedFactsAdded: number;
conflictsFound: number;
};
}
FieldTypeDescription
memoriesSavednumberNumber of new unique memories persisted to storage
memoriesDedupednumberNumber of extracted facts discarded as duplicates
memoriesSupersedednumberNumber of existing memories updated in-place with newer content
totalExtractednumberTotal number of facts the LLM extracted from the conversation
profileFieldsUpdatednumberNumber of structured facts routed to user profile updates
reflectionobject?Reflection pass statistics (only present when enableReflection is true)

In the standard Vitamem facade, the pipeline runs in two scenarios:

  1. triggerDormantTransition(threadId) — Immediately transitions the thread to dormant and runs the pipeline.

  2. sweepThreads() — For each cooling thread that has exceeded the cooling timeout, transitions to dormant and runs the pipeline.

Both paths call runEmbeddingPipeline() with the configured thresholds (deduplicationThreshold, supersedeThreshold), embeddingConcurrency, autoPinRules, structuredExtractionRules, and reflection settings.