
Here's a scenario you've probably encountered: you build a chatbot that feels brilliant in demo. It remembers what the user said, responds coherently, feels genuinely conversational. Then you put it in front of real users. After twenty exchanges, it starts contradicting itself. After forty, it forgets the user's name. After sixty, you get a context_length_exceeded error and the whole thing crashes.
The problem isn't your prompt engineering. It's that you've been treating conversational memory as an afterthought — just appending messages to a list and hoping the model keeps up. At scale, that approach collapses fast. Real conversational AI requires deliberate memory architecture: decisions about what to keep, what to compress, what to retrieve, and how to store it all across sessions, users, and time. By the end of this lesson, you'll have built a complete memory system that handles all of that.
What you'll learn:
You should be comfortable calling the OpenAI API (or a compatible LLM API) directly, understand what tokens are and roughly how they're counted, and have built at least one simple chatbot. You should know basic Python, be familiar with asyncio at a surface level, and have used an ORM or made raw database queries before. Experience with vector embeddings is helpful but not required — we'll explain what we need as we go.
Before building anything, let's be precise about the failure modes. An LLM has no persistent state between API calls. Every call is stateless. The only "memory" it has is whatever you put in the context window of that specific request. The standard naive approach looks like this:
messages = []
def chat(user_input: str) -> str:
messages.append({"role": "user", "content": user_input})
response = openai.chat.completions.create(
model="gpt-4o",
messages=messages
)
reply = response.choices[0].message.content
messages.append({"role": "assistant", "content": reply})
return reply
This works beautifully for short conversations. The failure is mathematical. GPT-4o's context window is 128,000 tokens. That sounds enormous until you remember that a thorough support conversation about a software issue might average 150 tokens per exchange. At that rate you have roughly 850 exchanges before you hit the limit — but the cost of each call grows linearly with the conversation length, and latency grows with it too. For a high-volume production system with thousands of concurrent users, each keeping a full chat history, you're looking at token costs that compound quickly and a memory footprint that's completely unmanageable.
There's also a subtler problem called attention dilution. Research on transformer attention patterns shows that models pay disproportionate attention to the beginning and end of their context window, with a relative "valley" in the middle. In a very long conversation, important facts stated by the user thirty messages ago sit in that middle valley. The model may technically "see" them, but they don't influence the response as strongly as they should. Users notice this as the model seeming to "forget" things it was told.
The solution is a layered memory architecture — multiple storage and retrieval mechanisms working together, each optimized for a different timescale and type of information.
Think of conversational memory as having four tools in a toolkit. Production systems mix them; the skill is knowing which to reach for.
Buffer memory is the raw message history, kept in full fidelity. It's what the naive approach uses. Buffer memory is appropriate for the recent context — the last several exchanges — because recent history is the most syntactically relevant. The user's last message may contain pronouns referring to things three messages ago. Buffer memory handles this naturally.
The key decision is buffer size. A common heuristic is to keep the last 10–20 messages as a buffer, which covers most pronoun resolution and follow-up question patterns without growing unbounded.
Instead of keeping raw messages, summary memory periodically compresses older exchanges into a dense summary. When your buffer exceeds a threshold, you pass the oldest messages to the LLM and ask it to summarize them, then discard the originals.
Summary memory is lossy by design — you're trading verbatim accuracy for token efficiency. For most conversational applications, this is an acceptable trade. The summary "User is a senior data engineer at a fintech startup, troubleshooting a dbt model with a fan-out join issue. They've tried rewriting the CTE twice." is far more useful than the 800 raw tokens those exchanges consumed.
A sliding window discards the oldest messages once a token budget is exceeded, keeping only the most recent N tokens of raw history. Unlike summary memory, it doesn't try to preserve meaning from what it discards — it simply forgets it. This is the right approach when conversation context is inherently short-lived (like a customer service session where each turn is largely self-contained) or when you need a simpler implementation.
This is the most powerful primitive and the most complex. Every message (or every exchange, or every summarized chunk) is embedded into a vector and stored in a vector database. When generating a response, you perform a semantic search to find the most relevant historical context and inject only those results into the prompt.
Vector retrieval solves the attention dilution problem directly: instead of feeding the model 50 messages and hoping it finds the relevant one, you retrieve the three most relevant exchanges and feed those. The model's attention is focused exactly where it should be.
The tradeoff is latency (embedding + retrieval adds time), complexity (you need an embedding model and vector store), and potential for retrieval failure (if your similarity search returns the wrong chunks, the model may "confidently misremember").
Let's build this properly. We'll implement a production-grade memory system with a SQLite database for persistence (easily swappable for PostgreSQL), a FAISS vector index for semantic retrieval, and a tiered memory manager that combines buffer and summary memory with optional vector lookup.
Install the dependencies:
pip install openai tiktoken faiss-cpu numpy sqlalchemy pydantic python-dotenv
Create a project structure:
memory_system/
├── __init__.py
├── config.py
├── models.py
├── token_counter.py
├── storage.py
├── memory_manager.py
├── embeddings.py
└── chatbot.py
# config.py
from pydantic import BaseSettings
class Settings(BaseSettings):
openai_api_key: str
model_name: str = "gpt-4o"
embedding_model: str = "text-embedding-3-small"
# Memory configuration
buffer_message_limit: int = 20 # Max messages in hot buffer
summary_trigger_tokens: int = 4000 # Summarize when buffer exceeds this
max_context_tokens: int = 8000 # Total token budget for context
retrieval_top_k: int = 5 # How many chunks to retrieve from vector store
database_url: str = "sqlite:///./conversations.db"
class Config:
env_file = ".env"
settings = Settings()
# models.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class MessageRole(str, Enum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
@dataclass
class Message:
role: MessageRole
content: str
timestamp: datetime = field(default_factory=datetime.utcnow)
token_count: int = 0
message_id: Optional[str] = None
session_id: Optional[str] = None
embedding: Optional[list] = None
@dataclass
class ConversationSummary:
content: str
covers_message_ids: list[str]
created_at: datetime = field(default_factory=datetime.utcnow)
token_count: int = 0
session_id: Optional[str] = None
Every memory decision hinges on accurate token counts. A lot of production bugs come from treating tokens as equivalent to words — they're not. Let's build a reliable counter:
# token_counter.py
import tiktoken
from models import Message
class TokenCounter:
def __init__(self, model_name: str = "gpt-4o"):
# tiktoken uses the same encoding for gpt-4o family
try:
self.encoding = tiktoken.encoding_for_model(model_name)
except KeyError:
self.encoding = tiktoken.get_encoding("cl100k_base")
# OpenAI charges 3 tokens of overhead per message for role/formatting
self.message_overhead = 3
def count_text(self, text: str) -> int:
return len(self.encoding.encode(text))
def count_message(self, message: Message) -> int:
return self.count_text(message.content) + self.message_overhead
def count_messages(self, messages: list[Message]) -> int:
# +3 for the reply priming tokens OpenAI adds
return sum(self.count_message(m) for m in messages) + 3
def truncate_to_token_limit(self, text: str, limit: int) -> str:
"""Hard truncate text to a token limit. Use sparingly."""
tokens = self.encoding.encode(text)
if len(tokens) <= limit:
return text
return self.encoding.decode(tokens[:limit])
Why this matters: If you're off by 20% on token counts, your buffer management will behave unpredictably. The
tiktokenlibrary uses the exact same tokenizer as OpenAI, so the counts it produces are authoritative for OpenAI models. For other providers, use their equivalent library or their API's token counting endpoint.
Memory is only useful if it persists. Let's build a storage layer that keeps conversations in a database:
# storage.py
import uuid
import json
from datetime import datetime
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer, JSON
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from sqlalchemy import text
from models import Message, MessageRole, ConversationSummary
from config import settings
Base = declarative_base()
class MessageRecord(Base):
__tablename__ = "messages"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
session_id = Column(String, nullable=False, index=True)
role = Column(String, nullable=False)
content = Column(Text, nullable=False)
timestamp = Column(DateTime, default=datetime.utcnow)
token_count = Column(Integer, default=0)
embedding_json = Column(Text, nullable=True) # Store as JSON string
class SummaryRecord(Base):
__tablename__ = "summaries"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
session_id = Column(String, nullable=False, index=True)
content = Column(Text, nullable=False)
covers_message_ids = Column(JSON, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
token_count = Column(Integer, default=0)
class ConversationStore:
def __init__(self):
self.engine = create_engine(settings.database_url)
Base.metadata.create_all(self.engine)
self.SessionLocal = sessionmaker(bind=self.engine)
def save_message(self, message: Message) -> str:
with self.SessionLocal() as session:
record = MessageRecord(
id=message.message_id or str(uuid.uuid4()),
session_id=message.session_id,
role=message.role.value,
content=message.content,
timestamp=message.timestamp,
token_count=message.token_count,
embedding_json=json.dumps(message.embedding) if message.embedding else None
)
session.add(record)
session.commit()
return record.id
def load_session_messages(self, session_id: str) -> list[Message]:
with self.SessionLocal() as session:
records = session.query(MessageRecord)\
.filter(MessageRecord.session_id == session_id)\
.order_by(MessageRecord.timestamp)\
.all()
return [
Message(
role=MessageRole(r.role),
content=r.content,
timestamp=r.timestamp,
token_count=r.token_count,
message_id=r.id,
session_id=r.session_id,
embedding=json.loads(r.embedding_json) if r.embedding_json else None
)
for r in records
]
def save_summary(self, summary: ConversationSummary) -> str:
with self.SessionLocal() as session:
record = SummaryRecord(
session_id=summary.session_id,
content=summary.content,
covers_message_ids=summary.covers_message_ids,
created_at=summary.created_at,
token_count=summary.token_count
)
session.add(record)
session.commit()
return record.id
def load_latest_summary(self, session_id: str) -> ConversationSummary | None:
with self.SessionLocal() as session:
record = session.query(SummaryRecord)\
.filter(SummaryRecord.session_id == session_id)\
.order_by(SummaryRecord.created_at.desc())\
.first()
if not record:
return None
return ConversationSummary(
content=record.content,
covers_message_ids=record.covers_message_ids,
created_at=record.created_at,
token_count=record.token_count,
session_id=session_id
)
For long-running conversations or cases where users return after gaps (say, a support agent picking up a ticket three days later), you need semantic retrieval. Let's build it:
# embeddings.py
import numpy as np
import faiss
from openai import OpenAI
from models import Message
from config import settings
client = OpenAI(api_key=settings.openai_api_key)
class VectorMemory:
"""
In-memory FAISS index for semantic retrieval within a session.
For multi-session production use, swap FAISS for Pinecone,
Weaviate, or pgvector.
"""
def __init__(self, embedding_dim: int = 1536):
self.index = faiss.IndexFlatIP(embedding_dim) # Inner product = cosine on normalized vecs
self.messages: list[Message] = []
self.embedding_dim = embedding_dim
def embed_text(self, text: str) -> np.ndarray:
response = client.embeddings.create(
model=settings.embedding_model,
input=text
)
vector = np.array(response.data[0].embedding, dtype=np.float32)
# Normalize for cosine similarity via inner product
vector = vector / np.linalg.norm(vector)
return vector
def add_message(self, message: Message) -> None:
# Combine role context with content for richer embeddings
text_to_embed = f"{message.role.value}: {message.content}"
embedding = self.embed_text(text_to_embed)
message.embedding = embedding.tolist()
self.index.add(embedding.reshape(1, -1))
self.messages.append(message)
def search(self, query: str, top_k: int = 5) -> list[tuple[Message, float]]:
if len(self.messages) == 0:
return []
query_vec = self.embed_text(query)
k = min(top_k, len(self.messages))
scores, indices = self.index.search(query_vec.reshape(1, -1), k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx >= 0: # FAISS returns -1 for empty slots
results.append((self.messages[idx], float(score)))
return sorted(results, key=lambda x: x[1], reverse=True)
def load_from_messages(self, messages: list[Message]) -> None:
"""Rebuild index from messages that already have embeddings."""
for message in messages:
if message.embedding:
vec = np.array(message.embedding, dtype=np.float32)
self.index.add(vec.reshape(1, -1))
self.messages.append(message)
else:
self.add_message(message)
Production note: FAISS is an in-process index — it lives in RAM and disappears when your process dies. In production, use a persistent vector database. For simple setups,
pgvector(a PostgreSQL extension) means you only need one database. For high-scale multi-tenant applications, Pinecone or Weaviate give you managed infrastructure with namespace support for per-user isolation.
This is the core of the system — the component that decides what goes into each LLM call:
# memory_manager.py
import uuid
from datetime import datetime
from openai import OpenAI
from models import Message, MessageRole, ConversationSummary
from storage import ConversationStore
from embeddings import VectorMemory
from token_counter import TokenCounter
from config import settings
client = OpenAI(api_key=settings.openai_api_key)
class MemoryManager:
"""
Tiered memory system:
1. Hot buffer: raw recent messages (last N, token-bounded)
2. Summary layer: compressed older conversation history
3. Vector retrieval: semantic search over full history
"""
def __init__(self, session_id: str):
self.session_id = session_id
self.store = ConversationStore()
self.token_counter = TokenCounter(settings.model_name)
self.vector_memory = VectorMemory()
# Hot buffer: the recent raw messages
self.buffer: list[Message] = []
# The most recent summary (covers everything before the buffer)
self.current_summary: ConversationSummary | None = None
self._load_session()
def _load_session(self) -> None:
"""Restore state from persistent storage on initialization."""
all_messages = self.store.load_session_messages(self.session_id)
self.current_summary = self.store.load_latest_summary(self.session_id)
if self.current_summary:
# Identify which messages are already summarized
summarized_ids = set(self.current_summary.covers_message_ids)
unsummarized = [m for m in all_messages if m.message_id not in summarized_ids]
self.buffer = unsummarized[-settings.buffer_message_limit:]
else:
self.buffer = all_messages[-settings.buffer_message_limit:]
# Rebuild vector index from all messages
self.vector_memory.load_from_messages(all_messages)
print(f"Session {self.session_id} loaded: "
f"{len(all_messages)} total messages, "
f"{len(self.buffer)} in buffer, "
f"summary: {'yes' if self.current_summary else 'no'}")
def add_message(self, role: MessageRole, content: str) -> Message:
message = Message(
role=role,
content=content,
session_id=self.session_id,
message_id=str(uuid.uuid4()),
timestamp=datetime.utcnow()
)
message.token_count = self.token_counter.count_message(message)
# Persist to database
self.store.save_message(message)
# Add to hot buffer
self.buffer.append(message)
# Add to vector index (this embeds the message via API call)
self.vector_memory.add_message(message)
# Check if we need to summarize
buffer_tokens = self.token_counter.count_messages(self.buffer)
if buffer_tokens > settings.summary_trigger_tokens:
self._summarize_old_buffer()
return message
def _summarize_old_buffer(self) -> None:
"""
Summarize the older half of the buffer, keeping the recent half intact.
This prevents a situation where summarization swallows recent context.
"""
split_point = len(self.buffer) // 2
messages_to_summarize = self.buffer[:split_point]
self.buffer = self.buffer[split_point:]
if not messages_to_summarize:
return
# Build the summarization prompt
prior_context = ""
if self.current_summary:
prior_context = f"Prior summary:\n{self.current_summary.content}\n\n"
conversation_text = "\n".join(
f"{m.role.value.upper()}: {m.content}"
for m in messages_to_summarize
)
summary_prompt = f"""{prior_context}Please summarize the following conversation segment.
Capture: the user's identity/role (if mentioned), their primary goals,
key facts they've shared, decisions made, and any open questions.
Be dense and specific — this summary replaces the raw messages.
Conversation:
{conversation_text}
Summary:"""
response = client.chat.completions.create(
model=settings.model_name,
messages=[{"role": "user", "content": summary_prompt}],
max_tokens=500,
temperature=0.1 # Low temp for factual compression
)
summary_content = response.choices[0].message.content
self.current_summary = ConversationSummary(
content=summary_content,
covers_message_ids=[m.message_id for m in messages_to_summarize],
session_id=self.session_id,
token_count=self.token_counter.count_text(summary_content)
)
self.store.save_summary(self.current_summary)
print(f"Summarized {len(messages_to_summarize)} messages → "
f"{self.current_summary.token_count} tokens")
def build_context(
self,
current_query: str,
system_prompt: str,
use_vector_retrieval: bool = True
) -> list[dict]:
"""
Construct the messages array to send to the LLM.
Combines: system prompt + summary + retrieved context + buffer.
"""
context_messages = []
# Start with system prompt, enriched with the summary if available
system_content = system_prompt
if self.current_summary:
system_content += (
f"\n\n## Conversation History Summary\n"
f"{self.current_summary.content}"
)
context_messages.append({
"role": "system",
"content": system_content
})
# Optionally inject semantically relevant historical context
if use_vector_retrieval and len(self.vector_memory.messages) > settings.buffer_message_limit:
retrieved = self.vector_memory.search(current_query, top_k=settings.retrieval_top_k)
# Filter out messages already in the buffer to avoid duplication
buffer_ids = {m.message_id for m in self.buffer}
relevant_historical = [
(msg, score) for msg, score in retrieved
if msg.message_id not in buffer_ids and score > 0.75
]
if relevant_historical:
retrieval_block = "## Relevant Earlier Context\n"
for msg, score in relevant_historical[:3]:
retrieval_block += f"[{msg.role.value}]: {msg.content}\n"
# Inject as a system message so it doesn't look like a real turn
context_messages.append({
"role": "system",
"content": retrieval_block
})
# Add the hot buffer (recent raw messages)
for message in self.buffer:
context_messages.append({
"role": message.role.value,
"content": message.content
})
return context_messages
def get_memory_stats(self) -> dict:
"""Diagnostic information about current memory state."""
buffer_tokens = self.token_counter.count_messages(self.buffer)
return {
"session_id": self.session_id,
"buffer_messages": len(self.buffer),
"buffer_tokens": buffer_tokens,
"has_summary": self.current_summary is not None,
"summary_tokens": self.current_summary.token_count if self.current_summary else 0,
"vector_index_size": len(self.vector_memory.messages),
"total_estimated_tokens": buffer_tokens + (
self.current_summary.token_count if self.current_summary else 0
)
}
Now we wire everything into a usable interface:
# chatbot.py
from openai import OpenAI
from memory_manager import MemoryManager
from models import MessageRole
from config import settings
client = OpenAI(api_key=settings.openai_api_key)
SYSTEM_PROMPT = """You are a senior data engineering assistant at Wicked Smart Data.
You help practitioners with data pipelines, SQL optimization, dbt models,
Spark performance, and architectural decisions.
You maintain context across the conversation and refer back to earlier discussion
when relevant. When you're working from a summary of earlier context, you can
acknowledge this if asked."""
class DataEngineeringAssistant:
def __init__(self, session_id: str):
self.memory = MemoryManager(session_id)
def chat(self, user_input: str) -> str:
# Record the user's message
self.memory.add_message(MessageRole.USER, user_input)
# Build context for this turn
context = self.memory.build_context(
current_query=user_input,
system_prompt=SYSTEM_PROMPT,
use_vector_retrieval=True
)
# Call the LLM
response = client.chat.completions.create(
model=settings.model_name,
messages=context,
temperature=0.7,
max_tokens=1000
)
reply = response.choices[0].message.content
# Record the assistant's response
self.memory.add_message(MessageRole.ASSISTANT, reply)
return reply
def status(self) -> None:
stats = self.memory.get_memory_stats()
print("\n--- Memory Status ---")
for key, value in stats.items():
print(f" {key}: {value}")
print("--------------------\n")
# Demo usage
if __name__ == "__main__":
import sys
# Use a fixed session ID to demonstrate persistence across runs
session_id = "session_ana_torres_2024"
assistant = DataEngineeringAssistant(session_id)
print(f"Starting session: {session_id}")
print("Type 'status' to see memory diagnostics, 'quit' to exit.\n")
while True:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() == "quit":
break
if user_input.lower() == "status":
assistant.status()
continue
response = assistant.chat(user_input)
print(f"\nAssistant: {response}\n")
Build a multi-user version of this system where each user gets isolated memory, and implement a "memory quality" metric.
Part 1: Multi-user isolation
Modify the system so session IDs are derived from user identifiers. Add a UserMemoryRegistry class that maps user IDs to active MemoryManager instances, with a maximum of 100 active sessions in memory at once (evicting least-recently-used sessions to database and reloading on demand).
Part 2: Memory quality scoring
After each summarization, implement a check: take three random messages from the summarized batch and ask the LLM whether the information in those messages is accurately represented in the summary. Log a quality score (0–1) per summary. This gives you a feedback loop on whether your summarization prompts are working.
def score_summary_quality(
summary_content: str,
sampled_messages: list[Message],
llm_client: OpenAI
) -> float:
"""
Ask the LLM to verify summary accuracy.
Returns a score between 0.0 (poor) and 1.0 (excellent).
"""
check_prompt = f"""Given this summary:
{summary_content}
And these original messages it should cover:
{chr(10).join(f'{m.role.value}: {m.content}' for m in sampled_messages)}
For each key fact or detail in the original messages, is it accurately
represented in the summary?
Respond with a JSON object: {{"score": 0.0-1.0, "missing": ["list of missing details"], "accurate": true/false}}"""
response = llm_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": check_prompt}],
response_format={"type": "json_object"},
temperature=0
)
import json
result = json.loads(response.choices[0].message.content)
return result.get("score", 0.5)
Part 3: Token budget dashboard
Write a function that, given a MemoryManager instance, prints a breakdown of exactly how many tokens of your budget are consumed by: the system prompt, the summary, retrieved context, and the buffer. Use this to tune summary_trigger_tokens and max_context_tokens for your specific use case.
Mistake 1: Summarizing too aggressively
If you trigger summarization at a low token threshold (say 1,000 tokens), you'll summarize conversations before enough information has accumulated to make a useful summary. Worse, you'll summarize the very beginning of a conversation — the part that typically contains the most important context-setting. Set your trigger threshold at 30–50% of your total context budget.
Mistake 2: Including the buffer in vector search
A common bug is retrieving messages from the vector index without filtering out messages already in the hot buffer. This causes the same content to appear twice in the context — once as retrieved context and once as part of the buffer — which wastes tokens and can confuse the model with apparently duplicated "history."
Mistake 3: Using a single global FAISS index for all users
FAISS doesn't have namespaces. If you create a shared index and don't carefully track which vectors belong to which user, user A's queries will retrieve user B's conversation history. This is a data privacy disaster. Always maintain per-session indices, or use a vector database with proper namespace/tenant isolation.
Mistake 4: Not handling the cold start case
When a user sends their very first message, build_context returns just the system prompt and that one message. This is fine — but you need to make sure your code doesn't crash because self.current_summary is None or self.buffer is empty. Write your context-building logic to degrade gracefully.
Mistake 5: Embedding every message synchronously
If you call the embedding API synchronously in the hot path of every user message, you add 50–200ms of latency to every turn. At scale, do this asynchronously: queue messages for embedding after the user has already received the response. Accept that very new messages won't be in the vector index for a turn or two.
# Async embedding pattern
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI(api_key=settings.openai_api_key)
async def embed_and_index_async(message: Message, vector_memory: VectorMemory):
"""Run after returning response to user."""
text = f"{message.role.value}: {message.content}"
response = await async_client.embeddings.create(
model=settings.embedding_model,
input=text
)
# ... update index
Mistake 6: Trusting that retrieval will always find the right thing
Vector similarity search is probabilistic. If a user asks "What did I say about my deployment deadline?" and the word "deadline" appeared in only one message buried in a very long conversation, the retrieval may or may not surface it depending on how it was phrased. Build in a graceful fallback: if the model's response suggests it's missing information ("I don't have information about your deadline"), have a secondary retrieval path that searches for specific entity types (dates, names, numbers) using keyword search rather than semantic search.
Once you're past prototype and handling real user load, a few things need to change:
Decouple embedding from the request path. As mentioned, put embedding calls in an async background queue. A simple approach is a Redis queue consumed by a worker process.
Cache summaries aggressively. Summaries change rarely (only when a new summarization is triggered). Cache them in Redis with a TTL slightly longer than your typical session gap. This eliminates database reads on most session restorations.
Measure your p95 latency, not p50. Memory retrieval adds tails to your latency distribution. The FAISS search itself is fast (sub-millisecond for reasonable index sizes), but the embedding API call and database reads add up. Track p95 and p99 in your metrics.
Consider token budgets per user tier. If you're building a product, your free tier users might get a 4,000 token context budget (buffer-only, no vector retrieval), while paid users get 16,000 tokens with full semantic retrieval. The build_context method already supports this — just parameterize the budget.
You've built a complete conversational memory system with four interlocking components: a token-accurate buffer that maintains recent raw history, a summarization layer that compresses older context without losing essential meaning, a vector retrieval layer that brings back semantically relevant long-ago context on demand, and a persistence layer that survives process restarts and enables cross-session continuity.
The key insight is that these aren't competing approaches — they're complementary, each serving a different timescale. The buffer handles syntax-level context (pronoun resolution, follow-up questions). The summary handles session-level context (who the user is, what they're trying to do). Vector retrieval handles episodic context (specific facts mentioned long ago that become relevant again).
Where to go next:
The architecture you've built here is genuinely production-grade. The main remaining work is operational: adding monitoring, rate limit handling, cost tracking, and the kind of defensive error handling that production services require. But the memory logic itself is solid — and that's the hard part.
Learning Path: Building with LLMs