Embedding Pipeline
Embedding Pipeline
Section titled “Embedding Pipeline”The embedding pipeline is the core process that runs when a thread transitions to dormant. It extracts facts from the conversation, embeds them as vectors, deduplicates against existing memories, and saves the unique ones.
import { runEmbeddingPipeline } from "vitamem";In normal usage, you do not call this function directly — the Vitamem facade runs it automatically during triggerDormantTransition() and sweepThreads(). It is exported for advanced use cases where you need direct control over the pipeline.
runEmbeddingPipeline()
Section titled “runEmbeddingPipeline()”Process a dormant thread’s messages into deduplicated, embedded memories.
Signature
Section titled “Signature”async function runEmbeddingPipeline( thread: Thread, messages: Message[], llm: LLMAdapter, storage: StorageAdapter, deduplicationThreshold?: number, supersedeThreshold?: number, embeddingConcurrency?: number, autoPinRules?: AutoPinRule[], structuredRules?: StructuredExtractionRule[], enableReflection?: boolean, reflectionPrompt?: string,): Promise<EmbeddingPipelineResult>;Parameters
Section titled “Parameters”| Parameter | Type | Default | Description |
|---|---|---|---|
thread | Thread | — | The thread that transitioned to dormant. Used for userId and threadId when saving memories. |
messages | Message[] | — | The full conversation history to extract facts from. |
llm | LLMAdapter | — | The LLM adapter used for extraction (extractMemories), embedding (embed), and reflection (chat). |
storage | StorageAdapter | — | The storage adapter for reading existing memories, saving new ones, and updating profiles. |
deduplicationThreshold | number | 0.92 | Cosine similarity threshold for deduplication. Facts with similarity >= this value to any existing memory are considered duplicates and discarded. |
supersedeThreshold | number | 0.75 | Cosine similarity threshold for superseding. Facts with similarity >= this value and < deduplicationThreshold update the existing memory in-place. |
embeddingConcurrency | number | 5 | Maximum number of concurrent embedding API calls. Controls how many llm.embed() calls run in parallel. |
autoPinRules | AutoPinRule[] | [] | Rules that automatically pin critical memories during save/supersede. |
structuredRules | StructuredExtractionRule[] | undefined | Rules for classifying extracted facts into structured profile fields. Facts matching rules are routed to profile updates instead of the embedding pipeline. |
enableReflection | boolean | false | Enable a second LLM call to validate and enrich extracted facts before embedding. |
reflectionPrompt | string | built-in | Custom prompt override for the reflection LLM call. |
Returns
Section titled “Returns”interface EmbeddingPipelineResult { memoriesSaved: number; // Facts that passed deduplication and were saved as new memories memoriesDeduped: number; // Facts that were discarded as duplicates memoriesSuperseded: number; // Facts that updated an existing memory in-place totalExtracted: number; // Total facts extracted by the LLM (before structured classification) profileFieldsUpdated: number; // Structured facts routed to user profile updates reflection?: { factsModified: number; // Facts enriched by the reflection pass factsRemoved: number; // Facts removed by the reflection pass missedFactsAdded: number; // New facts discovered by the reflection pass conflictsFound: number; // Conflicts detected between new and existing facts };}For freeform facts (those not routed to profile fields), the relationship memoriesSaved + memoriesDeduped + memoriesSuperseded equals the number of facts that entered the embedding pipeline. The reflection field is only present when enableReflection is true.
Pipeline Steps
Section titled “Pipeline Steps”The function executes these steps in order:
-
Extract facts — Calls
llm.extractMemories(messages, sessionDate)to get structured facts from the conversation. Each fact has acontentstring, asource("confirmed"or"inferred"), and optionaltagsand profile fields. The session date is derived fromthread.lastMessageAt. -
Reflection pass (optional) — If
enableReflectionistrue, callsreflectOnExtraction()to validate and enrich the extracted facts via a second LLM call. Catches contradictions, enriches vague facts, and detects missed information. On failure, the original facts are kept unchanged. -
Structured classification (optional) — If
structuredRulesare provided and the storage adapter supportsupdateProfileField, classifies facts against the rules. Matching facts are routed to profile field updates (no embedding needed). Only the remaining freeform facts continue to the embedding pipeline. -
Embed facts — Calls
llm.embed(fact.content)for each freeform fact. This step respects theembeddingConcurrencyparameter, running at most N embedding calls in parallel using a worker pool. -
Load existing memories — Calls
storage.getMemories(thread.userId)to get all current memories for the user, needed for deduplication. -
Classify and persist — Compares each new fact’s embedding against all existing memory embeddings (and already-accepted new facts from this batch) using cosine similarity. Each fact is classified as one of:
- Skip (similarity >=
deduplicationThreshold): duplicate, discarded - Supersede (similarity >=
supersedeThresholdand <deduplicationThreshold): updates the existing memory in-place viastorage.updateMemory(), preserving the higher-confidence source and existing pin status - Save (similarity <
supersedeThreshold): saved as a new memory viastorage.saveMemory()
During save and supersede, auto-pin rules are evaluated and matching memories are automatically pinned.
- Skip (similarity >=
Example
Section titled “Example”import { runEmbeddingPipeline } from "vitamem";
const result = await runEmbeddingPipeline( dormantThread, messages, llmAdapter, storageAdapter, 0.90, // slightly lower dedup threshold 0.75, // supersede threshold 3, // conservative concurrency for rate-limited APIs);
console.log(`Extracted: ${result.totalExtracted}`);console.log(`Saved: ${result.memoriesSaved}`);console.log(`Superseded: ${result.memoriesSuperseded}`);console.log(`Deduped: ${result.memoriesDeduped}`);console.log(`Profile fields updated: ${result.profileFieldsUpdated}`);Concurrency Behavior
Section titled “Concurrency Behavior”The embeddingConcurrency parameter controls how many llm.embed() calls execute simultaneously. This is implemented using a worker pool pattern where N workers each process one item at a time from a shared queue.
concurrency = 3, facts = 8
Worker 1: [fact1] [fact4] [fact7]Worker 2: [fact2] [fact5] [fact8]Worker 3: [fact3] [fact6]Choosing a Concurrency Value
Section titled “Choosing a Concurrency Value”| Scenario | Recommended value |
|---|---|
| OpenAI (paid tier) | 5 (default) |
| OpenAI (free tier) | 2-3 |
| Anthropic + OpenAI embeddings | 5 |
| Ollama (local, single GPU) | 1 (Ollama processes requests sequentially) |
| Ollama (multi-GPU / cluster) | 3-5 (match your throughput capacity) |
| Custom embedding service | Depends on your service’s capacity |
The facade passes config.embeddingConcurrency (default: 5) to this function. Set it in your createVitamem config:
const mem = await createVitamem({ provider: "openai", apiKey: process.env.OPENAI_API_KEY, storage: "supabase", supabaseUrl: process.env.SUPABASE_URL, supabaseKey: process.env.SUPABASE_KEY, embeddingConcurrency: 3, // conservative});EmbeddingPipelineResult
Section titled “EmbeddingPipelineResult”The result type returned by runEmbeddingPipeline().
interface EmbeddingPipelineResult { memoriesSaved: number; memoriesDeduped: number; memoriesSuperseded: number; totalExtracted: number; profileFieldsUpdated: number; reflection?: { factsModified: number; factsRemoved: number; missedFactsAdded: number; conflictsFound: number; };}| Field | Type | Description |
|---|---|---|
memoriesSaved | number | Number of new unique memories persisted to storage |
memoriesDeduped | number | Number of extracted facts discarded as duplicates |
memoriesSuperseded | number | Number of existing memories updated in-place with newer content |
totalExtracted | number | Total number of facts the LLM extracted from the conversation |
profileFieldsUpdated | number | Number of structured facts routed to user profile updates |
reflection | object? | Reflection pass statistics (only present when enableReflection is true) |
When the Pipeline Runs
Section titled “When the Pipeline Runs”In the standard Vitamem facade, the pipeline runs in two scenarios:
-
triggerDormantTransition(threadId)— Immediately transitions the thread to dormant and runs the pipeline. -
sweepThreads()— For each cooling thread that has exceeded the cooling timeout, transitions to dormant and runs the pipeline.
Both paths call runEmbeddingPipeline() with the configured thresholds (deduplicationThreshold, supersedeThreshold), embeddingConcurrency, autoPinRules, structuredExtractionRules, and reflection settings.