Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Building a Production Document Q&A System with Vector Embeddings

Building a Production Document Q&A System with Vector Embeddings

AI & Machine Learning🔥 Expert25 min readApr 21, 2026Updated Apr 21, 2026
Table of Contents
  • Prerequisites
  • Understanding the Architecture
  • Document Ingestion and Preprocessing
  • Vector Embeddings and Similarity Search
  • Query Processing and Retrieval Optimization
  • Language Model Integration and Prompt Engineering
  • Production Deployment and Optimization
  • Advanced Evaluation and Quality Assurance
  • Hands-On Exercise
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps

Building a Document Q&A System with Embeddings

Your legal team just dumped 500 regulatory compliance documents on your desk. Your boss needs answers to complex questions spanning multiple documents by tomorrow morning. Traditional search falls short—you need something that understands context, can synthesize information across documents, and handles nuanced queries like "What are the reporting requirements for financial institutions with assets over $10 billion that operate in multiple jurisdictions?"

This is where document Q&A systems powered by embeddings transform impossible tasks into routine queries. By the end of this lesson, you'll build a production-ready system that can ingest any document corpus, understand semantic relationships between concepts, and provide accurate answers with proper source attribution.

What you'll learn:

  • How to architect a scalable document Q&A system using vector embeddings and retrieval-augmented generation (RAG)
  • Advanced chunking strategies that preserve semantic coherence across document boundaries
  • Vector database selection, indexing, and query optimization techniques for production workloads
  • Prompt engineering patterns that improve answer accuracy and reduce hallucination
  • Evaluation frameworks for measuring system performance and detecting failure modes
  • Production deployment considerations including caching, rate limiting, and cost optimization

Prerequisites

You should have experience with Python, basic familiarity with transformer models and embeddings, and have worked with APIs. Knowledge of vector databases is helpful but not required—we'll cover the fundamentals as we build.

Understanding the Architecture

Document Q&A systems follow a two-phase pattern: an offline indexing phase and an online retrieval phase. Understanding this separation is crucial for building systems that scale.

During indexing, we transform unstructured documents into a queryable knowledge base. Documents get chunked into semantically coherent pieces, each chunk gets converted into a dense vector embedding, and these embeddings get stored in a vector database with metadata linking back to source documents.

During retrieval, user questions follow the same embedding process. We use vector similarity search to find the most relevant chunks, then feed both the question and retrieved context to a language model that synthesizes a final answer.

This architecture solves three critical problems: it makes semantic search possible (finding relevant information even when exact keywords don't match), it provides controllable context (you decide what information the LLM sees), and it enables source attribution (answers can cite specific documents and page numbers).

Let's build this step by step, starting with the document processing pipeline.

Document Ingestion and Preprocessing

Real-world documents come in messy formats—PDFs with inconsistent text extraction, Word documents with embedded images, HTML with navigation cruft. Your preprocessing pipeline needs to handle this chaos while preserving semantic structure.

import os
from pathlib import Path
from typing import List, Dict, Any
import PyPDF2
import docx
from bs4 import BeautifulSoup
import tiktoken
from dataclasses import dataclass

@dataclass
class DocumentChunk:
    content: str
    metadata: Dict[str, Any]
    chunk_id: str
    
class DocumentProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002")
        
    def extract_text(self, file_path: Path) -> Dict[str, Any]:
        """Extract text and metadata from various file formats."""
        suffix = file_path.suffix.lower()
        
        if suffix == '.pdf':
            return self._extract_from_pdf(file_path)
        elif suffix == '.docx':
            return self._extract_from_docx(file_path)
        elif suffix == '.html':
            return self._extract_from_html(file_path)
        elif suffix == '.txt':
            return self._extract_from_text(file_path)
        else:
            raise ValueError(f"Unsupported file type: {suffix}")
    
    def _extract_from_pdf(self, file_path: Path) -> Dict[str, Any]:
        """Extract text from PDF, preserving page structure."""
        text_by_page = []
        metadata = {"source": str(file_path), "type": "pdf", "pages": []}
        
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            
            for page_num, page in enumerate(pdf_reader.pages):
                page_text = page.extract_text().strip()
                if page_text:  # Skip empty pages
                    text_by_page.append(page_text)
                    metadata["pages"].append({
                        "page_num": page_num + 1,
                        "start_char": sum(len(p) for p in text_by_page[:-1]),
                        "end_char": sum(len(p) for p in text_by_page)
                    })
        
        return {
            "content": "\n\n".join(text_by_page),
            "metadata": metadata
        }
    
    def _extract_from_docx(self, file_path: Path) -> Dict[str, Any]:
        """Extract text from Word documents."""
        doc = docx.Document(file_path)
        
        paragraphs = []
        for paragraph in doc.paragraphs:
            if paragraph.text.strip():
                paragraphs.append(paragraph.text)
        
        content = "\n\n".join(paragraphs)
        metadata = {
            "source": str(file_path),
            "type": "docx",
            "paragraph_count": len(paragraphs)
        }
        
        return {"content": content, "metadata": metadata}

Notice how we preserve structural information during extraction. PDF pages, Word paragraphs, and HTML sections all become part of the metadata. This proves crucial for source attribution—users want to know not just which document contained an answer, but where exactly within that document.

The real challenge lies in chunking. Naive approaches split on sentence boundaries or fixed character counts, but this destroys semantic coherence. A paragraph about "quarterly reporting requirements" might get split, with half the context landing in one chunk and half in another.

def chunk_document(self, document: Dict[str, Any]) -> List[DocumentChunk]:
    """Create semantically coherent chunks with proper overlap."""
    content = document["content"]
    base_metadata = document["metadata"]
    
    # First, split into natural sections (paragraphs, etc.)
    sections = self._split_into_sections(content)
    
    chunks = []
    current_chunk = ""
    current_tokens = 0
    chunk_counter = 0
    
    for section in sections:
        section_tokens = len(self.tokenizer.encode(section))
        
        # If this section alone exceeds chunk size, split it further
        if section_tokens > self.chunk_size:
            # Save current chunk if it exists
            if current_chunk:
                chunks.append(self._create_chunk(
                    current_chunk, base_metadata, chunk_counter
                ))
                chunk_counter += 1
            
            # Split the oversized section
            subsection_chunks = self._split_oversized_section(
                section, base_metadata, chunk_counter
            )
            chunks.extend(subsection_chunks)
            chunk_counter += len(subsection_chunks)
            
            current_chunk = ""
            current_tokens = 0
            
        # If adding this section would exceed chunk size
        elif current_tokens + section_tokens > self.chunk_size:
            # Save current chunk
            if current_chunk:
                chunks.append(self._create_chunk(
                    current_chunk, base_metadata, chunk_counter
                ))
                chunk_counter += 1
            
            # Start new chunk with overlap from previous
            overlap_text = self._get_overlap_text(current_chunk)
            current_chunk = overlap_text + "\n\n" + section if overlap_text else section
            current_tokens = len(self.tokenizer.encode(current_chunk))
        
        else:
            # Add section to current chunk
            if current_chunk:
                current_chunk += "\n\n" + section
            else:
                current_chunk = section
            current_tokens += section_tokens
    
    # Handle final chunk
    if current_chunk:
        chunks.append(self._create_chunk(
            current_chunk, base_metadata, chunk_counter
        ))
    
    return chunks

def _split_into_sections(self, text: str) -> List[str]:
    """Split text into natural sections, preserving semantic units."""
    # Look for double line breaks (paragraph separators)
    sections = []
    
    # Split on double newlines first
    paragraphs = text.split('\n\n')
    
    for para in paragraphs:
        para = para.strip()
        if not para:
            continue
            
        # Check if paragraph is too long
        if len(self.tokenizer.encode(para)) > self.chunk_size // 2:
            # Split long paragraphs on sentence boundaries
            sentences = self._split_sentences(para)
            sections.extend(sentences)
        else:
            sections.append(para)
    
    return sections

def _get_overlap_text(self, text: str) -> str:
    """Extract overlap text from end of current chunk."""
    tokens = self.tokenizer.encode(text)
    if len(tokens) <= self.chunk_overlap:
        return text
    
    overlap_tokens = tokens[-self.chunk_overlap:]
    return self.tokenizer.decode(overlap_tokens)

This chunking strategy balances several competing concerns. We want chunks large enough to provide meaningful context but small enough that embeddings remain focused. We preserve semantic boundaries while ensuring reasonable overlap so related information doesn't get isolated.

Vector Embeddings and Similarity Search

With clean, chunked documents, we need to convert them into searchable vector representations. The choice of embedding model dramatically affects retrieval quality—different models excel at different types of content and query patterns.

For general document Q&A, OpenAI's text-embedding-ada-002 offers excellent performance across domains. For specialized fields like legal or medical documents, fine-tuned models often perform better, but require significantly more infrastructure.

import openai
import numpy as np
from typing import List, Tuple
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential

class EmbeddingService:
    def __init__(self, model: str = "text-embedding-ada-002", batch_size: int = 100):
        self.model = model
        self.batch_size = batch_size
        self.client = openai.OpenAI()
        
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    def get_embedding(self, text: str) -> List[float]:
        """Get embedding for a single text."""
        response = self.client.embeddings.create(
            input=text,
            model=self.model
        )
        return response.data[0].embedding
    
    async def get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
        """Get embeddings for multiple texts efficiently."""
        # Process in batches to respect rate limits
        all_embeddings = []
        
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i + self.batch_size]
            
            try:
                response = await self._async_embed_batch(batch)
                embeddings = [item.embedding for item in response.data]
                all_embeddings.extend(embeddings)
            except Exception as e:
                print(f"Batch {i//self.batch_size} failed: {e}")
                # Fallback to individual embeddings
                for text in batch:
                    try:
                        embedding = self.get_embedding(text)
                        all_embeddings.append(embedding)
                    except:
                        # Use zero vector as fallback
                        all_embeddings.append([0.0] * 1536)  # Ada-002 dimension
                        
        return all_embeddings
    
    async def _async_embed_batch(self, texts: List[str]):
        """Async wrapper for batch embedding requests."""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None, 
            lambda: self.client.embeddings.create(input=texts, model=self.model)
        )

The retry logic handles transient API failures, while batching maximizes throughput within rate limits. For production systems, you'll want more sophisticated error handling and potentially caching frequently requested embeddings.

Vector similarity search finds chunks semantically related to a query, but the choice of similarity metric and search algorithm affects both accuracy and performance.

from typing import Optional, Tuple
import faiss
import pickle

class VectorIndex:
    def __init__(self, dimension: int = 1536, index_type: str = "flat"):
        self.dimension = dimension
        self.index_type = index_type
        self.index = None
        self.chunk_metadata = []
        
    def build_index(self, embeddings: np.ndarray, metadata: List[Dict]):
        """Build FAISS index from embeddings."""
        embeddings = np.array(embeddings, dtype=np.float32)
        
        if self.index_type == "flat":
            # Exact search, slower but most accurate
            self.index = faiss.IndexFlatIP(self.dimension)  # Inner product
            
        elif self.index_type == "ivf":
            # Approximate search with inverted file index
            nlist = min(100, len(embeddings) // 10)  # Number of clusters
            quantizer = faiss.IndexFlatIP(self.dimension)
            self.index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
            
            # Train the index
            self.index.train(embeddings)
            
        elif self.index_type == "hnsw":
            # Hierarchical navigable small world graph
            m = 16  # Number of bi-directional links for each node
            self.index = faiss.IndexHNSWFlat(self.dimension, m)
            self.index.hnsw.efConstruction = 200  # Controls index time/accuracy
            
        # Add vectors to index
        self.index.add(embeddings)
        self.chunk_metadata = metadata
        
        print(f"Built {self.index_type} index with {len(embeddings)} vectors")
    
    def search(self, query_embedding: List[float], k: int = 5, 
               score_threshold: float = 0.0) -> List[Tuple[Dict, float]]:
        """Search for similar chunks."""
        if self.index is None:
            raise ValueError("Index not built yet")
            
        query_vector = np.array([query_embedding], dtype=np.float32)
        
        if self.index_type == "ivf":
            # Set search parameters for IVF index
            self.index.nprobe = min(10, self.index.nlist)  # Number of clusters to search
            
        elif self.index_type == "hnsw":
            # Set search parameters for HNSW index
            self.index.hnsw.efSearch = max(k * 2, 50)  # Controls search accuracy
        
        scores, indices = self.index.search(query_vector, k)
        
        results = []
        for idx, score in zip(indices[0], scores[0]):
            if idx >= 0 and score >= score_threshold:  # Valid result above threshold
                results.append((self.chunk_metadata[idx], float(score)))
                
        return results
    
    def save(self, path: str):
        """Save index and metadata to disk."""
        faiss.write_index(self.index, f"{path}.index")
        with open(f"{path}.metadata", 'wb') as f:
            pickle.dump(self.chunk_metadata, f)
    
    def load(self, path: str):
        """Load index and metadata from disk."""
        self.index = faiss.read_index(f"{path}.index")
        with open(f"{path}.metadata", 'rb') as f:
            self.chunk_metadata = pickle.load(f)

Index choice involves performance trade-offs. Flat indexes provide exact results but scale linearly with corpus size. IVF indexes offer sub-linear search time through clustering but may miss relevant results. HNSW indexes provide excellent recall with fast search times but require more memory.

Performance Tip: For document corpora under 100K chunks, start with a flat index. Beyond that, HNSW typically offers the best balance of accuracy and speed for Q&A systems.

Query Processing and Retrieval Optimization

User queries rarely match document language exactly. Someone might ask "What's the penalty for late submission?" while the document says "Entities failing to file by the prescribed deadline incur monetary sanctions." Effective query processing bridges this semantic gap.

class QueryProcessor:
    def __init__(self, embedding_service: EmbeddingService, vector_index: VectorIndex):
        self.embedding_service = embedding_service
        self.vector_index = vector_index
        self.query_cache = {}  # Simple in-memory cache
    
    def process_query(self, query: str, k: int = 5, 
                     rerank: bool = True) -> List[Tuple[DocumentChunk, float]]:
        """Process a user query and return relevant chunks."""
        # Normalize and expand query
        processed_query = self._preprocess_query(query)
        
        # Check cache first
        cache_key = f"{processed_query}:{k}:{rerank}"
        if cache_key in self.query_cache:
            return self.query_cache[cache_key]
        
        # Get query embedding
        query_embedding = self.embedding_service.get_embedding(processed_query)
        
        # Initial retrieval - get more candidates than needed
        initial_k = k * 3 if rerank else k
        candidates = self.vector_index.search(
            query_embedding, 
            k=initial_k, 
            score_threshold=0.1
        )
        
        if not candidates:
            return []
        
        # Convert to DocumentChunk objects
        chunks_with_scores = [
            (self._metadata_to_chunk(metadata), score) 
            for metadata, score in candidates
        ]
        
        # Optional reranking for better relevance
        if rerank and len(chunks_with_scores) > k:
            chunks_with_scores = self._rerank_results(query, chunks_with_scores, k)
        
        # Cache results
        final_results = chunks_with_scores[:k]
        self.query_cache[cache_key] = final_results
        
        return final_results
    
    def _preprocess_query(self, query: str) -> str:
        """Enhance query for better retrieval."""
        # Remove common question words that don't add semantic value
        stop_words = {'what', 'when', 'where', 'who', 'why', 'how', 'is', 'are', 'the'}
        
        # Simple query expansion - in production, use more sophisticated methods
        expansions = {
            'penalty': 'penalty fine sanction',
            'requirement': 'requirement obligation must',
            'deadline': 'deadline due date time limit',
            'company': 'company organization entity firm business'
        }
        
        words = query.lower().split()
        expanded_words = []
        
        for word in words:
            if word not in stop_words:
                expanded_words.append(word)
                if word in expansions:
                    expanded_words.extend(expansions[word].split())
        
        return ' '.join(expanded_words)
    
    def _rerank_results(self, query: str, candidates: List[Tuple[DocumentChunk, float]], 
                       k: int) -> List[Tuple[DocumentChunk, float]]:
        """Rerank candidates using cross-encoder or other sophisticated scoring."""
        # For this example, we'll use a simple keyword-based reranking
        # In production, consider using cross-encoders like ms-marco-MiniLM
        
        query_terms = set(query.lower().split())
        
        def compute_rerank_score(chunk: DocumentChunk, similarity_score: float) -> float:
            content_lower = chunk.content.lower()
            
            # Keyword overlap score
            content_terms = set(content_lower.split())
            overlap = len(query_terms.intersection(content_terms))
            keyword_score = overlap / len(query_terms) if query_terms else 0
            
            # Position boost for title/header content
            position_boost = 1.2 if any(term in content_lower[:100] 
                                      for term in query_terms) else 1.0
            
            # Document type boost (some document types might be more authoritative)
            type_boost = 1.1 if chunk.metadata.get('type') == 'pdf' else 1.0
            
            return similarity_score * (1 + keyword_score * 0.3) * position_boost * type_boost
        
        # Recompute scores
        reranked = [
            (chunk, self.compute_rerank_score(chunk, score))
            for chunk, score in candidates
        ]
        
        # Sort by new scores
        reranked.sort(key=lambda x: x[1], reverse=True)
        
        return reranked[:k]

This multi-stage approach first casts a wide net with vector similarity, then applies more sophisticated scoring to the top candidates. The reranking step can incorporate domain-specific signals—regulatory documents might boost content from official sources, while technical documentation might prioritize recent updates.

Language Model Integration and Prompt Engineering

With relevant context retrieved, we need to synthesize coherent answers. The prompt design determines whether your system produces helpful responses or confident nonsense.

from typing import Optional
import openai
from dataclasses import dataclass

@dataclass
class QAResponse:
    answer: str
    sources: List[Dict[str, Any]]
    confidence: float
    query: str

class DocumentQASystem:
    def __init__(self, query_processor: QueryProcessor, model: str = "gpt-4"):
        self.query_processor = query_processor
        self.model = model
        self.client = openai.OpenAI()
        
    def answer_question(self, question: str, max_context_length: int = 4000) -> QAResponse:
        """Generate an answer to a question using retrieved context."""
        # Retrieve relevant chunks
        relevant_chunks = self.query_processor.process_query(question, k=8)
        
        if not relevant_chunks:
            return QAResponse(
                answer="I couldn't find relevant information to answer this question.",
                sources=[],
                confidence=0.0,
                query=question
            )
        
        # Build context and sources
        context_parts = []
        sources = []
        total_length = 0
        
        for i, (chunk, score) in enumerate(relevant_chunks):
            source_info = {
                "source": chunk.metadata.get('source', 'Unknown'),
                "page": chunk.metadata.get('page_num'),
                "chunk_id": chunk.chunk_id,
                "relevance_score": score
            }
            
            chunk_text = f"[Source {i+1}] {chunk.content}"
            
            # Check if adding this chunk would exceed context limit
            if total_length + len(chunk_text) > max_context_length:
                break
                
            context_parts.append(chunk_text)
            sources.append(source_info)
            total_length += len(chunk_text)
        
        context = "\n\n".join(context_parts)
        
        # Generate answer using carefully crafted prompt
        prompt = self._build_qa_prompt(question, context)
        
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": self._get_system_prompt()},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.1,  # Low temperature for factual responses
                max_tokens=500
            )
            
            answer = response.choices[0].message.content
            confidence = self._estimate_confidence(answer, relevant_chunks)
            
            return QAResponse(
                answer=answer,
                sources=sources,
                confidence=confidence,
                query=question
            )
            
        except Exception as e:
            print(f"Error generating answer: {e}")
            return QAResponse(
                answer="I encountered an error while processing your question.",
                sources=sources,
                confidence=0.0,
                query=question
            )
    
    def _get_system_prompt(self) -> str:
        """System prompt that defines the assistant's behavior."""
        return """You are a helpful document analysis assistant. Your job is to answer questions based solely on the provided document excerpts.

CRITICAL INSTRUCTIONS:
- Only use information explicitly stated in the provided sources
- If the sources don't contain enough information to answer completely, say so
- When citing information, reference the source number (e.g., "According to Source 2...")
- Be precise and factual - avoid speculation or inference beyond what's directly stated
- If sources contradict each other, acknowledge the contradiction
- Structure your answers clearly with relevant details"""

    def _build_qa_prompt(self, question: str, context: str) -> str:
        """Build the main QA prompt with question and context."""
        return f"""Based on the following document excerpts, please answer this question: "{question}"

DOCUMENT EXCERPTS:
{context}

QUESTION: {question}

Please provide a comprehensive answer based on the information in the excerpts above. If the excerpts don't contain sufficient information to fully answer the question, please indicate what aspects cannot be answered based on the provided sources."""

    def _estimate_confidence(self, answer: str, chunks: List[Tuple[DocumentChunk, float]]) -> float:
        """Estimate confidence in the answer based on various signals."""
        # This is a simplified confidence estimation
        # Production systems might use more sophisticated methods
        
        base_confidence = 0.5
        
        # Boost confidence if multiple sources agree
        if len(chunks) >= 3:
            base_confidence += 0.2
        
        # Boost confidence for high similarity scores
        avg_score = sum(score for _, score in chunks) / len(chunks)
        if avg_score > 0.8:
            base_confidence += 0.2
        
        # Reduce confidence for hedged language
        hedge_words = ['might', 'could', 'possibly', 'unclear', 'insufficient']
        if any(word in answer.lower() for word in hedge_words):
            base_confidence -= 0.2
        
        # Reduce confidence for very short answers (might indicate missing info)
        if len(answer.split()) < 20:
            base_confidence -= 0.1
        
        return min(max(base_confidence, 0.0), 1.0)

The system prompt is crucial—it establishes ground rules that prevent hallucination and ensure source attribution. Notice how we explicitly instruct the model to acknowledge when information is insufficient rather than fabricating answers.

Prompt Engineering Tip: Test your prompts with edge cases—ambiguous questions, contradictory sources, and completely unrelated queries. The prompt should handle these gracefully rather than producing confident wrong answers.

Production Deployment and Optimization

Moving from prototype to production requires addressing scalability, reliability, and cost concerns. Document Q&A systems can become expensive quickly without proper optimization.

import redis
import hashlib
import json
from typing import Optional
import time
from contextlib import contextmanager

class ProductionQASystem:
    def __init__(self, base_qa_system: DocumentQASystem, redis_url: str = None):
        self.qa_system = base_qa_system
        self.redis_client = redis.from_url(redis_url) if redis_url else None
        self.cache_ttl = 3600  # 1 hour cache
        
        # Rate limiting
        self.rate_limit_window = 60  # 1 minute
        self.rate_limit_max = 10    # 10 requests per minute per user
        
        # Performance monitoring
        self.metrics = {
            'total_queries': 0,
            'cache_hits': 0,
            'avg_response_time': 0.0,
            'error_count': 0
        }
    
    @contextmanager
    def _performance_monitor(self):
        """Context manager for tracking performance metrics."""
        start_time = time.time()
        try:
            yield
        finally:
            duration = time.time() - start_time
            self.metrics['total_queries'] += 1
            
            # Update rolling average
            current_avg = self.metrics['avg_response_time']
            total_queries = self.metrics['total_queries']
            self.metrics['avg_response_time'] = (
                (current_avg * (total_queries - 1) + duration) / total_queries
            )
    
    def answer_question_cached(self, question: str, user_id: str = None) -> Optional[QAResponse]:
        """Answer question with caching and rate limiting."""
        
        with self._performance_monitor():
            # Check rate limits
            if user_id and not self._check_rate_limit(user_id):
                raise Exception("Rate limit exceeded")
            
            # Generate cache key
            cache_key = self._generate_cache_key(question)
            
            # Try cache first
            if self.redis_client:
                cached_result = self._get_from_cache(cache_key)
                if cached_result:
                    self.metrics['cache_hits'] += 1
                    return cached_result
            
            try:
                # Generate answer
                response = self.qa_system.answer_question(question)
                
                # Cache the result
                if self.redis_client and response.confidence > 0.7:
                    self._store_in_cache(cache_key, response)
                
                return response
                
            except Exception as e:
                self.metrics['error_count'] += 1
                print(f"Error in answer generation: {e}")
                return None
    
    def _check_rate_limit(self, user_id: str) -> bool:
        """Check if user has exceeded rate limits."""
        if not self.redis_client:
            return True
            
        key = f"rate_limit:{user_id}"
        current_count = self.redis_client.get(key)
        
        if current_count is None:
            # First request in window
            self.redis_client.setex(key, self.rate_limit_window, 1)
            return True
        
        current_count = int(current_count)
        if current_count >= self.rate_limit_max:
            return False
        
        # Increment counter
        self.redis_client.incr(key)
        return True
    
    def _generate_cache_key(self, question: str) -> str:
        """Generate a consistent cache key for a question."""
        # Normalize question for better cache hits
        normalized = question.lower().strip().replace(' ', '_')
        return hashlib.md5(normalized.encode()).hexdigest()
    
    def _get_from_cache(self, cache_key: str) -> Optional[QAResponse]:
        """Retrieve cached response."""
        try:
            cached_data = self.redis_client.get(f"qa:{cache_key}")
            if cached_data:
                data = json.loads(cached_data)
                return QAResponse(**data)
        except Exception as e:
            print(f"Cache retrieval error: {e}")
        return None
    
    def _store_in_cache(self, cache_key: str, response: QAResponse):
        """Store response in cache."""
        try:
            # Convert to dict for JSON serialization
            cache_data = {
                'answer': response.answer,
                'sources': response.sources,
                'confidence': response.confidence,
                'query': response.query
            }
            
            self.redis_client.setex(
                f"qa:{cache_key}", 
                self.cache_ttl, 
                json.dumps(cache_data)
            )
        except Exception as e:
            print(f"Cache storage error: {e}")
    
    def get_metrics(self) -> Dict[str, Any]:
        """Return current performance metrics."""
        cache_hit_rate = (
            self.metrics['cache_hits'] / max(self.metrics['total_queries'], 1)
        )
        
        return {
            **self.metrics,
            'cache_hit_rate': cache_hit_rate
        }

Caching is critical for both performance and cost control. High-confidence answers to common questions can be cached for hours, while lower-confidence responses might have shorter TTLs or skip caching entirely.

For large document corpora, consider these additional optimizations:

class ScalableVectorIndex:
    """Production-ready vector index with advanced features."""
    
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.shards = {}  # Document type or date-based sharding
        self.global_index = None
        
    def build_sharded_index(self, documents_by_shard: Dict[str, List[Dict]]):
        """Build separate indexes for different document types/dates."""
        for shard_name, documents in documents_by_shard.items():
            embeddings = np.array([doc['embedding'] for doc in documents])
            metadata = [doc['metadata'] for doc in documents]
            
            # Use IVF-PQ for large shards (product quantization saves memory)
            if len(embeddings) > 50000:
                index = self._create_ivf_pq_index(embeddings)
            else:
                index = self._create_hnsw_index(embeddings)
            
            self.shards[shard_name] = {
                'index': index,
                'metadata': metadata
            }
    
    def search_with_filters(self, query_embedding: List[float], 
                          document_types: List[str] = None,
                          date_range: Tuple[str, str] = None,
                          k: int = 5) -> List[Tuple[Dict, float]]:
        """Search with filters to reduce search space."""
        
        # Determine which shards to search
        relevant_shards = self._filter_shards(document_types, date_range)
        
        all_results = []
        
        for shard_name in relevant_shards:
            shard_data = self.shards[shard_name]
            
            # Search within shard
            scores, indices = shard_data['index'].search(
                np.array([query_embedding], dtype=np.float32), 
                k * 2  # Get extra candidates from each shard
            )
            
            # Add shard results
            for idx, score in zip(indices[0], scores[0]):
                if idx >= 0:
                    metadata = shard_data['metadata'][idx].copy()
                    metadata['shard'] = shard_name
                    all_results.append((metadata, float(score)))
        
        # Sort globally and return top k
        all_results.sort(key=lambda x: x[1], reverse=True)
        return all_results[:k]
    
    def _create_ivf_pq_index(self, embeddings: np.ndarray):
        """Create memory-efficient IVF-PQ index for large datasets."""
        nlist = min(4096, len(embeddings) // 39)  # Heuristic for cluster count
        m = 8  # Number of subquantizers
        bits = 8  # Bits per subquantizer
        
        quantizer = faiss.IndexFlatL2(self.dimension)
        index = faiss.IndexIVFPQ(quantizer, self.dimension, nlist, m, bits)
        
        # Train the quantizer
        index.train(embeddings)
        index.add(embeddings)
        
        return index

Advanced Evaluation and Quality Assurance

Production Q&A systems require rigorous evaluation beyond simple accuracy metrics. You need to detect various failure modes and continuously monitor performance in the wild.

import pandas as pd
from typing import List, Dict
from dataclasses import dataclass
from sklearn.metrics import precision_recall_fscore_support
import re

@dataclass
class EvaluationResult:
    question: str
    expected_answer: str
    actual_answer: str
    sources_used: List[str]
    relevance_score: float
    faithfulness_score: float
    completeness_score: float
    overall_score: float

class QAEvaluator:
    def __init__(self, qa_system: DocumentQASystem):
        self.qa_system = qa_system
        
    def evaluate_dataset(self, test_cases: List[Dict]) -> List[EvaluationResult]:
        """Evaluate Q&A system on a test dataset."""
        results = []
        
        for case in test_cases:
            question = case['question']
            expected_answer = case['expected_answer']
            expected_sources = case.get('expected_sources', [])
            
            # Generate actual answer
            response = self.qa_system.answer_question(question)
            
            # Evaluate different aspects
            relevance = self._score_relevance(question, response.answer)
            faithfulness = self._score_faithfulness(response.answer, response.sources)
            completeness = self._score_completeness(expected_answer, response.answer)
            
            overall = (relevance + faithfulness + completeness) / 3
            
            results.append(EvaluationResult(
                question=question,
                expected_answer=expected_answer,
                actual_answer=response.answer,
                sources_used=[s['source'] for s in response.sources],
                relevance_score=relevance,
                faithfulness_score=faithfulness,
                completeness_score=completeness,
                overall_score=overall
            ))
            
        return results
    
    def _score_relevance(self, question: str, answer: str) -> float:
        """Score how well the answer addresses the question."""
        # Extract key entities and concepts from question
        question_keywords = self._extract_keywords(question)
        answer_keywords = self._extract_keywords(answer)
        
        # Simple overlap-based relevance
        if not question_keywords:
            return 0.5  # Neutral if no keywords extracted
            
        overlap = len(question_keywords.intersection(answer_keywords))
        relevance = overlap / len(question_keywords)
        
        # Boost for direct question patterns
        if self._contains_direct_answer_pattern(question, answer):
            relevance += 0.2
            
        return min(relevance, 1.0)
    
    def _score_faithfulness(self, answer: str, sources: List[Dict]) -> float:
        """Score how well the answer is supported by the sources."""
        if not sources:
            return 0.0
        
        # Extract claims from answer
        answer_claims = self._extract_claims(answer)
        
        # Check if claims are supported by sources
        supported_claims = 0
        
        for claim in answer_claims:
            if self._claim_supported_by_sources(claim, sources):
                supported_claims += 1
        
        if not answer_claims:
            return 0.5  # Neutral for very short answers
            
        return supported_claims / len(answer_claims)
    
    def _score_completeness(self, expected: str, actual: str) -> float:
        """Score how completely the answer addresses expected content."""
        expected_concepts = self._extract_keywords(expected)
        actual_concepts = self._extract_keywords(actual)
        
        if not expected_concepts:
            return 1.0  # No specific expectations
            
        coverage = len(expected_concepts.intersection(actual_concepts))
        return coverage / len(expected_concepts)
    
    def _extract_keywords(self, text: str) -> set:
        """Extract meaningful keywords from text."""
        # Remove common stopwords and extract meaningful terms
        stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
        words = re.findall(r'\b\w+\b', text.lower())
        return {word for word in words if word not in stopwords and len(word) > 2}
    
    def generate_evaluation_report(self, results: List[EvaluationResult]) -> Dict:
        """Generate comprehensive evaluation report."""
        df = pd.DataFrame([
            {
                'question': r.question,
                'relevance': r.relevance_score,
                'faithfulness': r.faithfulness_score,
                'completeness': r.completeness_score,
                'overall': r.overall_score
            }
            for r in results
        ])
        
        # Aggregate statistics
        report = {
            'total_questions': len(results),
            'avg_relevance': df['relevance'].mean(),
            'avg_faithfulness': df['faithfulness'].mean(),
            'avg_completeness': df['completeness'].mean(),
            'avg_overall': df['overall'].mean(),
            'score_distribution': df['overall'].describe().to_dict()
        }
        
        # Identify failure modes
        low_relevance = df[df['relevance'] < 0.5]
        low_faithfulness = df[df['faithfulness'] < 0.5]
        
        report['failure_analysis'] = {
            'low_relevance_count': len(low_relevance),
            'low_faithfulness_count': len(low_faithfulness),
            'sample_low_relevance': low_relevance['question'].head().tolist(),
            'sample_low_faithfulness': low_faithfulness['question'].head().tolist()
        }
        
        return report

This evaluation framework catches multiple failure modes: answers that don't address the question (low relevance), answers that aren't supported by retrieved documents (low faithfulness), and answers that miss important information (low completeness).

Hands-On Exercise

Let's build a complete document Q&A system for a regulatory compliance use case. You'll process a corpus of financial regulations and create a system that can answer complex compliance questions.

# Complete implementation bringing everything together
import os
from pathlib import Path
import asyncio

async def build_regulatory_qa_system():
    """Build and test a complete document Q&A system."""
    
    # Initialize components
    doc_processor = DocumentProcessor(chunk_size=800, chunk_overlap=150)
    embedding_service = EmbeddingService(batch_size=50)
    vector_index = VectorIndex(dimension=1536, index_type="hnsw")
    
    # Process documents
    print("Processing documents...")
    document_dir = Path("regulatory_docs")  # Your document directory
    all_chunks = []
    
    for file_path in document_dir.glob("*.pdf"):
        try:
            # Extract and chunk document
            document = doc_processor.extract_text(file_path)
            chunks = doc_processor.chunk_document(document)
            all_chunks.extend(chunks)
            print(f"Processed {file_path.name}: {len(chunks)} chunks")
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
    
    print(f"Total chunks: {len(all_chunks)}")
    
    # Generate embeddings
    print("Generating embeddings...")
    chunk_texts = [chunk.content for chunk in all_chunks]
    embeddings = await embedding_service.get_embeddings_batch(chunk_texts)
    
    # Build index
    print("Building vector index...")
    chunk_metadata = [chunk.metadata for chunk in all_chunks]
    vector_index.build_index(embeddings, chunk_metadata)
    
    # Save index for later use
    vector_index.save("regulatory_qa_index")
    
    # Initialize Q&A system
    query_processor = QueryProcessor(embedding_service, vector_index)
    qa_system = DocumentQASystem(query_processor, model="gpt-4")
    
    # Test with sample questions
    test_questions = [
        "What are the reporting requirements for banks with assets over $50 billion?",
        "What penalties apply for late regulatory filings?",
        "How often must stress tests be conducted?",
        "What documentation is required for risk management frameworks?"
    ]
    
    print("\nTesting Q&A system:")
    for question in test_questions:
        print(f"\nQ: {question}")
        response = qa_system.answer_question(question)
        print(f"A: {response.answer}")
        print(f"Confidence: {response.confidence:.2f}")
        print(f"Sources: {len(response.sources)}")
        
        # Show top source
        if response.sources:
            top_source = response.sources[0]
            print(f"Top source: {top_source['source']} (score: {top_source['relevance_score']:.3f})")
    
    return qa_system

# Run the exercise
if __name__ == "__main__":
    qa_system = asyncio.run(build_regulatory_qa_system())

This exercise demonstrates the complete pipeline from document ingestion through answer generation. Try it with your own document corpus and adjust the parameters based on your specific use case.

Common Mistakes & Troubleshooting

Poor chunk boundaries: The most common error is chunking documents without considering semantic structure. Symptoms include incomplete answers that reference "the aforementioned requirements" without context. Fix this by preserving paragraph structure and using overlap between chunks.

Inadequate prompt engineering: Generic prompts produce generic answers. If your system gives vague responses like "The document mentions several requirements," your prompt isn't specific enough. Include explicit instructions about answer structure, source citation, and handling uncertainty.

Ignoring retrieval quality: High-level metrics might look good while individual queries fail badly. Monitor retrieval performance separately—are the right chunks being found? Use query logging and regular spot-checks to catch retrieval failures.

Over-reliance on similarity scores: Vector similarity doesn't always correlate with relevance for your specific domain. Consider hybrid approaches that combine semantic similarity with keyword matching, especially for technical domains with specific terminology.

Memory and performance issues: Vector indexes can consume enormous amounts of memory. If you're hitting memory limits, try:

  • Product quantization (IVF-PQ indexes)
  • Dimension reduction techniques
  • Sharding by document type or date
  • Lazy loading of embeddings

Caching inappropriate content: Don't cache answers with low confidence scores or time-sensitive information. Set different TTLs based on content type—regulatory guidance might cache for hours, while news content should have much shorter lifespans.

Security and data leakage: Document Q&A systems can inadvertently expose sensitive information. Implement:

  • Access controls on document ingestion
  • Query filtering to prevent prompt injection
  • Answer filtering to remove sensitive patterns
  • Audit logging for all queries and responses

Debugging Tip: When answers seem wrong, trace through the entire pipeline. Check what chunks were retrieved, examine their relevance scores, and review the exact prompt sent to the language model. Often the issue is in retrieval, not generation.

Summary & Next Steps

You've built a production-ready document Q&A system that can handle real-world document corpora at scale. The system combines semantic search through vector embeddings with careful prompt engineering to produce accurate, well-sourced answers.

Key achievements from this lesson:

  • Robust document processing that handles multiple formats while preserving semantic structure
  • Sophisticated retrieval using vector similarity with reranking for improved relevance
  • Production optimizations including caching, rate limiting, and performance monitoring
  • Comprehensive evaluation frameworks that catch multiple failure modes
  • Scalability considerations for large document corpora

The next frontier involves several advanced techniques:

Multi-modal document understanding: Real documents contain images, tables, and complex layouts. Explore vision-language models that can process these elements alongside text.

Conversational Q&A: Extend your system to handle follow-up questions and maintain conversation context. This requires conversation memory and query disambiguation.

Active learning: Implement feedback loops where user ratings improve retrieval and generation quality over time.

Cross-lingual capabilities: For global organizations, consider embedding models that work across languages and translation strategies for multilingual document corpora.

Domain adaptation: Fine-tune embedding models on your specific document types for improved retrieval quality, especially in specialized domains like legal, medical, or technical documentation.

The architecture you've built provides a solid foundation for these extensions. Start with the use case that provides the most immediate value to your organization, and iterate based on user feedback and performance metrics.

Learning Path: Building with LLMs

Previous

Function Calling and Tool Use with LLMs: Building Intelligent Agents

Related Articles

AI & Machine Learning⚡ Practitioner

Function Calling and Tool Use with LLMs: Building Intelligent Agents

27 min
AI & Machine Learning⚡ Practitioner

When to Use Claude vs Codex: Strategic AI Tool Selection for Developers

18 min
AI & Machine Learning⚡ Practitioner

Claude Code Prompting Best Practices to Save Tokens

14 min

On this page

  • Prerequisites
  • Understanding the Architecture
  • Document Ingestion and Preprocessing
  • Vector Embeddings and Similarity Search
  • Query Processing and Retrieval Optimization
  • Language Model Integration and Prompt Engineering
  • Production Deployment and Optimization
  • Advanced Evaluation and Quality Assurance
  • Hands-On Exercise
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps