Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Building a RAG Pipeline with Your Own Documents

Building a RAG Pipeline with Your Own Documents

AI & Machine Learning⚡ Practitioner18 min readMay 7, 2026Updated May 7, 2026
Table of Contents
  • Prerequisites
  • Understanding the RAG Architecture
  • Setting Up the Document Ingestion Pipeline
  • Intelligent Document Chunking
  • Building the Vector Database
  • Implementing the Query Pipeline
  • Hands-On Exercise: Building Your Document RAG System
  • Common Mistakes & Troubleshooting
  • Advanced: Evaluating Your RAG System
  • Summary & Next Steps

When you're staring at a mountain of PDFs, documentation, and internal knowledge scattered across your organization, the promise of RAG (Retrieval-Augmented Generation) suddenly becomes very real. Instead of having your team dig through hundreds of documents to answer questions, you want to build a system that can intelligently retrieve relevant information and generate comprehensive answers.

But here's the reality: most RAG tutorials show you how to query a few Wikipedia articles or sample datasets. What you actually need is a production-ready pipeline that can handle your messy, real-world documents—the 50-page technical specifications, the poorly formatted meeting notes, the PowerPoint presentations converted to PDF. You need something that works with your actual data, not toy examples.

By the end of this lesson, you'll have built a complete RAG pipeline that ingests your organization's documents, creates searchable embeddings, and answers questions with cited sources. You'll understand not just the "what" but the "why" behind each component, and you'll know how to troubleshoot when things inevitably go wrong.

What you'll learn:

  • How to build a document ingestion pipeline that handles multiple file formats and messy data
  • Strategies for chunking documents that preserve context and improve retrieval quality
  • How to implement semantic search with vector databases for fast, relevant document retrieval
  • Techniques for prompt engineering that generate accurate answers with proper source attribution
  • Methods for evaluating and improving your RAG system's performance over time

Prerequisites

You should be comfortable with Python programming and have basic familiarity with machine learning concepts like embeddings. Experience with APIs and databases will help, though we'll explain the specific tools as we go. You don't need deep NLP expertise—we'll build up the concepts as needed.

Understanding the RAG Architecture

Before diving into code, let's map out what we're building. A RAG pipeline has three core phases: ingestion, retrieval, and generation. Think of it like a research assistant who first organizes all your documents (ingestion), then finds relevant passages when you ask a question (retrieval), and finally writes a comprehensive answer using those passages (generation).

The ingestion phase is where most production RAG systems succeed or fail. Your documents aren't clean markdown files—they're PDFs with embedded images, Word documents with complex formatting, and spreadsheets with data in unexpected places. We need a robust ingestion pipeline that can handle this reality.

Here's the architecture we'll build:

# Document Ingestion Pipeline
documents → parsing → chunking → embedding → vector_database

# Query Pipeline  
question → embedding → similarity_search → context_retrieval → llm_generation → answer

The key insight is that we're not just storing documents—we're storing semantically meaningful chunks that can be retrieved independently. When someone asks "What's our data retention policy for customer emails?", we want to retrieve the specific paragraphs about email retention, not the entire 200-page privacy policy.

Setting Up the Document Ingestion Pipeline

Let's start by building the ingestion system. We'll use a combination of libraries that handle different document types gracefully:

import os
from pathlib import Path
from typing import List, Dict, Any
import hashlib
from dataclasses import dataclass
from datetime import datetime

# Document processing libraries
import pymupdf  # for PDFs
from docx import Document  # for Word docs
import pandas as pd  # for Excel/CSV
from bs4 import BeautifulSoup  # for HTML
import tiktoken  # for token counting

# Vector database and embeddings
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer

@dataclass
class DocumentChunk:
    """Represents a chunk of a document with metadata."""
    content: str
    source_file: str
    chunk_id: str
    page_number: int = None
    section_title: str = None
    chunk_index: int = None
    token_count: int = None
    created_at: datetime = None

class DocumentProcessor:
    """Handles parsing of different document types."""
    
    def __init__(self):
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
    
    def process_pdf(self, file_path: Path) -> List[Dict[str, Any]]:
        """Extract text from PDF with page information."""
        chunks = []
        
        try:
            pdf_document = pymupdf.open(file_path)
            
            for page_num, page in enumerate(pdf_document):
                text = page.get_text()
                
                # Skip empty pages
                if not text.strip():
                    continue
                    
                # Clean up common PDF artifacts
                text = self._clean_pdf_text(text)
                
                chunks.append({
                    'content': text,
                    'page_number': page_num + 1,
                    'source_file': str(file_path),
                    'document_type': 'pdf'
                })
                
            pdf_document.close()
            
        except Exception as e:
            print(f"Error processing PDF {file_path}: {str(e)}")
            
        return chunks
    
    def process_docx(self, file_path: Path) -> List[Dict[str, Any]]:
        """Extract text from Word documents with structure."""
        chunks = []
        
        try:
            doc = Document(file_path)
            current_section = None
            
            for para in doc.paragraphs:
                text = para.text.strip()
                
                if not text:
                    continue
                
                # Detect if this might be a heading
                is_heading = (
                    para.style.name.startswith('Heading') or
                    len(text) < 100 and 
                    not text.endswith('.')
                )
                
                if is_heading:
                    current_section = text
                else:
                    chunks.append({
                        'content': text,
                        'section_title': current_section,
                        'source_file': str(file_path),
                        'document_type': 'docx'
                    })
                    
        except Exception as e:
            print(f"Error processing DOCX {file_path}: {str(e)}")
            
        return chunks
    
    def _clean_pdf_text(self, text: str) -> str:
        """Clean common PDF extraction artifacts."""
        # Remove excessive whitespace
        import re
        text = re.sub(r'\s+', ' ', text)
        
        # Remove page headers/footers (simple heuristic)
        lines = text.split('\n')
        if len(lines) > 3:
            # Remove first and last line if they're very short (likely headers/footers)
            if len(lines[0]) < 50:
                lines = lines[1:]
            if len(lines) and len(lines[-1]) < 50:
                lines = lines[:-1]
        
        return ' '.join(lines).strip()
    
    def count_tokens(self, text: str) -> int:
        """Count tokens in text using tiktoken."""
        return len(self.tokenizer.encode(text))

This document processor handles the reality of messy documents. PDFs often have weird spacing and page artifacts, Word documents have complex formatting, and we need to preserve structural information like headings and page numbers for later citation.

Intelligent Document Chunking

Now comes the crucial part: breaking documents into chunks that preserve context while staying within token limits. Poor chunking is the #1 reason RAG systems give irrelevant answers. You want chunks that are semantically coherent—not arbitrary 500-character slices that cut sentences in half.

class IntelligentChunker:
    """Chunks documents while preserving semantic meaning."""
    
    def __init__(self, max_tokens: int = 500, overlap_tokens: int = 50):
        self.max_tokens = max_tokens
        self.overlap_tokens = overlap_tokens
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
    
    def chunk_documents(self, raw_chunks: List[Dict[str, Any]]) -> List[DocumentChunk]:
        """Convert raw document chunks into semantic chunks."""
        semantic_chunks = []
        
        for doc_chunk in raw_chunks:
            # For each document section, create overlapping semantic chunks
            section_chunks = self._create_semantic_chunks(
                doc_chunk['content'],
                doc_chunk
            )
            semantic_chunks.extend(section_chunks)
            
        return semantic_chunks
    
    def _create_semantic_chunks(self, text: str, metadata: Dict) -> List[DocumentChunk]:
        """Create overlapping chunks that respect sentence boundaries."""
        sentences = self._split_into_sentences(text)
        chunks = []
        current_chunk = []
        current_tokens = 0
        chunk_index = 0
        
        for sentence in sentences:
            sentence_tokens = len(self.tokenizer.encode(sentence))
            
            # If adding this sentence exceeds our limit, finalize current chunk
            if current_tokens + sentence_tokens > self.max_tokens and current_chunk:
                chunk_content = ' '.join(current_chunk)
                
                chunks.append(DocumentChunk(
                    content=chunk_content,
                    source_file=metadata['source_file'],
                    chunk_id=self._generate_chunk_id(chunk_content, metadata['source_file']),
                    page_number=metadata.get('page_number'),
                    section_title=metadata.get('section_title'),
                    chunk_index=chunk_index,
                    token_count=current_tokens,
                    created_at=datetime.now()
                ))
                
                # Start new chunk with overlap from previous chunk
                overlap_sentences = self._get_overlap_sentences(current_chunk)
                current_chunk = overlap_sentences
                current_tokens = sum(len(self.tokenizer.encode(s)) for s in overlap_sentences)
                chunk_index += 1
            
            current_chunk.append(sentence)
            current_tokens += sentence_tokens
        
        # Don't forget the final chunk
        if current_chunk:
            chunk_content = ' '.join(current_chunk)
            chunks.append(DocumentChunk(
                content=chunk_content,
                source_file=metadata['source_file'],
                chunk_id=self._generate_chunk_id(chunk_content, metadata['source_file']),
                page_number=metadata.get('page_number'),
                section_title=metadata.get('section_title'),
                chunk_index=chunk_index,
                token_count=current_tokens,
                created_at=datetime.now()
            ))
        
        return chunks
    
    def _split_into_sentences(self, text: str) -> List[str]:
        """Split text into sentences using multiple heuristics."""
        import re
        
        # First, handle common abbreviations that shouldn't trigger sentence breaks
        abbreviations = r'\b(?:Dr|Mr|Mrs|Ms|Prof|Inc|Ltd|Corp|etc|vs|Ph\.D|M\.D|B\.A|M\.A)\.'
        text = re.sub(abbreviations, lambda m: m.group().replace('.', '<!DOT!>'), text)
        
        # Split on sentence endings
        sentences = re.split(r'[.!?]+\s+', text)
        
        # Restore abbreviation dots
        sentences = [s.replace('<!DOT!>', '.') for s in sentences if s.strip()]
        
        return sentences
    
    def _get_overlap_sentences(self, sentences: List[str]) -> List[str]:
        """Get the last few sentences for overlap with next chunk."""
        if not sentences:
            return []
        
        # Take last 1-2 sentences for overlap, respecting token limit
        overlap = []
        overlap_tokens = 0
        
        for sentence in reversed(sentences):
            sentence_tokens = len(self.tokenizer.encode(sentence))
            if overlap_tokens + sentence_tokens <= self.overlap_tokens:
                overlap.insert(0, sentence)
                overlap_tokens += sentence_tokens
            else:
                break
        
        return overlap
    
    def _generate_chunk_id(self, content: str, source_file: str) -> str:
        """Generate a unique ID for this chunk."""
        content_hash = hashlib.md5(content.encode()).hexdigest()[:8]
        file_name = Path(source_file).stem
        return f"{file_name}_{content_hash}"

The overlap strategy is critical here. When someone asks about "quarterly revenue projections," the answer might span multiple chunks. By overlapping chunks with 1-2 sentences, we ensure that context doesn't get lost at chunk boundaries.

Building the Vector Database

Now we need to store our chunks in a way that enables fast semantic search. We'll use ChromaDB, which handles the complexities of vector storage and similarity search:

class RAGVectorStore:
    """Manages document chunks in a vector database with semantic search."""
    
    def __init__(self, persist_directory: str = "./chroma_db"):
        # Initialize ChromaDB with persistence
        self.client = chromadb.PersistentClient(path=persist_directory)
        
        # Create or get collection
        self.collection = self.client.get_or_create_collection(
            name="document_chunks",
            metadata={"hnsw:space": "cosine"}  # Use cosine similarity
        )
        
        # Initialize embedding model
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        
        print(f"Vector store initialized. Current chunk count: {self.collection.count()}")
    
    def add_chunks(self, chunks: List[DocumentChunk]) -> None:
        """Add document chunks to the vector store."""
        if not chunks:
            return
        
        # Prepare data for ChromaDB
        documents = []
        metadatas = []
        ids = []
        embeddings = []
        
        print(f"Processing {len(chunks)} chunks for embedding...")
        
        for chunk in chunks:
            # Skip chunks that are too short to be meaningful
            if len(chunk.content.strip()) < 50:
                continue
            
            # Generate embedding
            embedding = self.embedding_model.encode(chunk.content).tolist()
            
            documents.append(chunk.content)
            embeddings.append(embedding)
            ids.append(chunk.chunk_id)
            
            # Store metadata
            metadata = {
                'source_file': chunk.source_file,
                'page_number': chunk.page_number or 0,
                'section_title': chunk.section_title or '',
                'chunk_index': chunk.chunk_index or 0,
                'token_count': chunk.token_count or 0,
                'created_at': chunk.created_at.isoformat() if chunk.created_at else ''
            }
            metadatas.append(metadata)
        
        # Add to ChromaDB
        if documents:
            self.collection.add(
                documents=documents,
                embeddings=embeddings,
                metadatas=metadatas,
                ids=ids
            )
            print(f"Added {len(documents)} chunks to vector store")
    
    def search_similar(self, query: str, n_results: int = 5) -> List[Dict]:
        """Search for similar document chunks."""
        # Generate query embedding
        query_embedding = self.embedding_model.encode(query).tolist()
        
        # Search ChromaDB
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=n_results,
            include=['documents', 'metadatas', 'distances']
        )
        
        # Format results
        formatted_results = []
        for i in range(len(results['documents'][0])):
            formatted_results.append({
                'content': results['documents'][0][i],
                'metadata': results['metadatas'][0][i],
                'similarity_score': 1 - results['distances'][0][i],  # Convert distance to similarity
                'chunk_id': results['ids'][0][i] if 'ids' in results else None
            })
        
        return formatted_results
    
    def get_collection_stats(self) -> Dict:
        """Get statistics about the document collection."""
        count = self.collection.count()
        
        # Get sample of documents to analyze
        sample_size = min(100, count)
        sample = self.collection.get(limit=sample_size, include=['metadatas'])
        
        if not sample['metadatas']:
            return {'total_chunks': count}
        
        # Analyze source files
        source_files = [meta['source_file'] for meta in sample['metadatas']]
        unique_files = len(set(source_files))
        
        return {
            'total_chunks': count,
            'unique_documents': unique_files,
            'average_chunks_per_doc': count / unique_files if unique_files > 0 else 0
        }

The embedding model choice matters here. all-MiniLM-L6-v2 is fast and good for general text, but if you're working in a specialized domain (legal, medical, technical), consider fine-tuning or using domain-specific models.

Implementing the Query Pipeline

With our documents indexed, we need a query pipeline that retrieves relevant chunks and generates comprehensive answers:

import openai
from typing import Optional
import json

class RAGQueryEngine:
    """Handles queries against the RAG system."""
    
    def __init__(self, vector_store: RAGVectorStore, openai_api_key: str):
        self.vector_store = vector_store
        openai.api_key = openai_api_key
        
    def query(self, question: str, n_contexts: int = 5, model: str = "gpt-3.5-turbo") -> Dict:
        """Answer a question using retrieved context."""
        
        # Step 1: Retrieve relevant contexts
        print(f"Searching for contexts related to: {question}")
        contexts = self.vector_store.search_similar(question, n_results=n_contexts)
        
        if not contexts:
            return {
                'answer': "I couldn't find any relevant information in the documents to answer your question.",
                'sources': [],
                'confidence': 0.0
            }
        
        # Step 2: Filter contexts by relevance threshold
        relevant_contexts = [ctx for ctx in contexts if ctx['similarity_score'] > 0.3]
        
        if not relevant_contexts:
            return {
                'answer': "I couldn't find sufficiently relevant information to answer your question confidently.",
                'sources': [],
                'confidence': 0.0
            }
        
        # Step 3: Generate answer using retrieved contexts
        answer_data = self._generate_answer(question, relevant_contexts, model)
        
        # Step 4: Add source information
        answer_data['sources'] = self._format_sources(relevant_contexts)
        answer_data['confidence'] = self._calculate_confidence(relevant_contexts)
        
        return answer_data
    
    def _generate_answer(self, question: str, contexts: List[Dict], model: str) -> Dict:
        """Generate an answer using OpenAI with retrieved contexts."""
        
        # Prepare context string
        context_str = self._format_contexts_for_prompt(contexts)
        
        # Create the prompt
        system_prompt = """You are a helpful assistant that answers questions based on provided documents. 

Rules:
1. Answer based ONLY on the information provided in the contexts
2. If the contexts don't contain enough information to answer fully, say so
3. Cite specific information by referencing the source (e.g., "According to the Privacy Policy document...")
4. Be concise but comprehensive
5. If you're uncertain about any part of the answer, express that uncertainty

Do not make up information that isn't in the provided contexts."""

        user_prompt = f"""Based on the following document excerpts, please answer this question: {question}

Document excerpts:
{context_str}

Question: {question}

Answer:"""

        try:
            response = openai.ChatCompletion.create(
                model=model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                temperature=0.1,  # Low temperature for factual accuracy
                max_tokens=500
            )
            
            answer = response.choices[0].message.content.strip()
            
            return {
                'answer': answer,
                'model_used': model,
                'prompt_tokens': response.usage.prompt_tokens,
                'completion_tokens': response.usage.completion_tokens
            }
            
        except Exception as e:
            print(f"Error generating answer: {str(e)}")
            return {
                'answer': "I encountered an error while generating an answer. Please try again.",
                'error': str(e)
            }
    
    def _format_contexts_for_prompt(self, contexts: List[Dict]) -> str:
        """Format retrieved contexts for the LLM prompt."""
        formatted_contexts = []
        
        for i, ctx in enumerate(contexts):
            source_info = self._get_source_description(ctx['metadata'])
            
            context_block = f"""--- Context {i+1} ---
Source: {source_info}
Content: {ctx['content']}
Relevance Score: {ctx['similarity_score']:.2f}
"""
            formatted_contexts.append(context_block)
        
        return '\n'.join(formatted_contexts)
    
    def _get_source_description(self, metadata: Dict) -> str:
        """Create a human-readable source description."""
        source_file = Path(metadata['source_file']).name
        
        parts = [source_file]
        
        if metadata.get('page_number'):
            parts.append(f"page {metadata['page_number']}")
        
        if metadata.get('section_title'):
            parts.append(f"section '{metadata['section_title']}'")
        
        return ', '.join(parts)
    
    def _format_sources(self, contexts: List[Dict]) -> List[Dict]:
        """Format source information for the response."""
        sources = []
        
        for ctx in contexts:
            source = {
                'file': Path(ctx['metadata']['source_file']).name,
                'page': ctx['metadata'].get('page_number'),
                'section': ctx['metadata'].get('section_title'),
                'relevance_score': round(ctx['similarity_score'], 2),
                'excerpt': ctx['content'][:200] + "..." if len(ctx['content']) > 200 else ctx['content']
            }
            sources.append(source)
        
        return sources
    
    def _calculate_confidence(self, contexts: List[Dict]) -> float:
        """Calculate confidence score based on context relevance."""
        if not contexts:
            return 0.0
        
        # Average similarity score, weighted by position (earlier results are more important)
        weighted_scores = []
        for i, ctx in enumerate(contexts):
            weight = 1.0 / (i + 1)  # Decreasing weight for later results
            weighted_scores.append(ctx['similarity_score'] * weight)
        
        confidence = sum(weighted_scores) / sum(1.0 / (i + 1) for i in range(len(contexts)))
        return round(confidence, 2)

This query engine does several important things: it filters out low-relevance results, provides source attribution, and calculates a confidence score. The confidence score helps users understand how reliable the answer might be.

Hands-On Exercise: Building Your Document RAG System

Now let's put it all together into a complete system you can use with your own documents. Create this main pipeline class:

class DocumentRAGPipeline:
    """Complete RAG pipeline for your documents."""
    
    def __init__(self, openai_api_key: str, persist_directory: str = "./rag_system"):
        self.persist_directory = Path(persist_directory)
        self.persist_directory.mkdir(exist_ok=True)
        
        # Initialize components
        self.processor = DocumentProcessor()
        self.chunker = IntelligentChunker(max_tokens=400, overlap_tokens=50)
        self.vector_store = RAGVectorStore(str(self.persist_directory / "vector_db"))
        self.query_engine = RAGQueryEngine(self.vector_store, openai_api_key)
        
        print("RAG Pipeline initialized successfully")
    
    def ingest_documents(self, document_directory: str) -> None:
        """Ingest all supported documents from a directory."""
        doc_path = Path(document_directory)
        
        if not doc_path.exists():
            print(f"Directory {document_directory} does not exist")
            return
        
        # Find supported files
        supported_extensions = {'.pdf', '.docx', '.txt'}
        files_to_process = []
        
        for ext in supported_extensions:
            files_to_process.extend(doc_path.glob(f"**/*{ext}"))
        
        print(f"Found {len(files_to_process)} documents to process")
        
        all_chunks = []
        
        for file_path in files_to_process:
            print(f"Processing: {file_path.name}")
            
            try:
                # Process based on file type
                if file_path.suffix.lower() == '.pdf':
                    raw_chunks = self.processor.process_pdf(file_path)
                elif file_path.suffix.lower() == '.docx':
                    raw_chunks = self.processor.process_docx(file_path)
                elif file_path.suffix.lower() == '.txt':
                    # Simple text file processing
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                    raw_chunks = [{
                        'content': content,
                        'source_file': str(file_path),
                        'document_type': 'txt'
                    }]
                else:
                    continue
                
                # Chunk the document
                if raw_chunks:
                    chunks = self.chunker.chunk_documents(raw_chunks)
                    all_chunks.extend(chunks)
                    print(f"  Created {len(chunks)} chunks")
                
            except Exception as e:
                print(f"  Error processing {file_path}: {str(e)}")
        
        # Add all chunks to vector store
        if all_chunks:
            self.vector_store.add_chunks(all_chunks)
            print(f"\nIngestion complete! Total chunks: {len(all_chunks)}")
        else:
            print("No chunks were created. Check your documents and try again.")
    
    def ask_question(self, question: str, verbose: bool = True) -> Dict:
        """Ask a question and get an answer with sources."""
        if verbose:
            print(f"\nQuestion: {question}")
            print("Searching documents...")
        
        result = self.query_engine.query(question)
        
        if verbose:
            print(f"\nAnswer: {result['answer']}")
            print(f"Confidence: {result['confidence']}")
            
            if result['sources']:
                print(f"\nSources ({len(result['sources'])}):")
                for i, source in enumerate(result['sources'], 1):
                    print(f"{i}. {source['file']}")
                    if source['page']:
                        print(f"   Page {source['page']}")
                    if source['section']:
                        print(f"   Section: {source['section']}")
                    print(f"   Relevance: {source['relevance_score']}")
                    print()
        
        return result
    
    def get_system_stats(self) -> Dict:
        """Get statistics about the RAG system."""
        stats = self.vector_store.get_collection_stats()
        return stats

# Usage example
if __name__ == "__main__":
    # Initialize the pipeline
    rag = DocumentRAGPipeline(
        openai_api_key="your-openai-api-key-here",
        persist_directory="./my_rag_system"
    )
    
    # Ingest your documents
    rag.ingest_documents("./my_documents")
    
    # Ask questions
    rag.ask_question("What is our company's data retention policy?")
    rag.ask_question("How do we handle customer complaints?")
    rag.ask_question("What are the technical requirements for our API?")
    
    # Get system statistics
    stats = rag.get_system_stats()
    print(f"\nSystem Stats: {stats}")

To test this with your own documents:

  1. Create a folder called my_documents and add some PDFs, Word docs, or text files
  2. Set your OpenAI API key (or use a local model like Ollama)
  3. Run the pipeline and start asking questions

Tip: Start with a small set of documents (5-10 files) to test the system, then scale up. This makes debugging much easier.

Common Mistakes & Troubleshooting

Problem: Answers are irrelevant or off-topic

This usually means your chunking strategy isn't preserving enough context. Try:

  • Increasing chunk size (more tokens per chunk)
  • Increasing overlap between chunks
  • Adjusting the similarity threshold in your query engine
  • Using a different embedding model that's better suited to your domain

Problem: The system can't find information you know is in the documents

Check your document processing pipeline:

# Debug by examining what chunks were created
chunks = rag.vector_store.collection.get(limit=10, include=['documents', 'metadatas'])
for i, chunk in enumerate(chunks['documents']):
    print(f"Chunk {i}: {chunk[:200]}...")
    print(f"Metadata: {chunks['metadatas'][i]}")
    print()

Common issues:

  • PDF text extraction failed (try a different library like pdfplumber)
  • Text is buried in tables or images (consider OCR with pytesseract)
  • Document structure isn't being preserved (improve your section detection)

Problem: Slow query performance

Vector search should be fast, but if you're seeing slowdowns:

  • Check your ChromaDB configuration (consider adjusting HNSW parameters)
  • Reduce the number of retrieved contexts
  • Use a smaller, faster embedding model for development
  • Consider batch processing for multiple queries

Problem: High costs from OpenAI API

Monitor your token usage and optimize:

# Track API costs
def calculate_cost(prompt_tokens, completion_tokens, model="gpt-3.5-turbo"):
    if model == "gpt-3.5-turbo":
        prompt_cost = prompt_tokens * 0.001 / 1000  # $0.001 per 1K tokens
        completion_cost = completion_tokens * 0.002 / 1000  # $0.002 per 1K tokens
        return prompt_cost + completion_cost
    return 0

Optimization strategies:

  • Use shorter, more focused contexts
  • Switch to gpt-3.5-turbo instead of gpt-4 for most queries
  • Implement caching for repeated questions
  • Consider using open-source models like Llama 2 for development

Advanced: Evaluating Your RAG System

Once your system is running, you need to evaluate how well it's performing. Here's a framework for systematic evaluation:

class RAGEvaluator:
    """Evaluates RAG system performance."""
    
    def __init__(self, rag_pipeline: DocumentRAGPipeline):
        self.rag = rag_pipeline
        self.test_questions = []
        self.results = []
    
    def create_test_set(self, questions_and_expected: List[Dict]) -> None:
        """Create a test set with questions and expected answer elements."""
        self.test_questions = questions_and_expected
    
    def evaluate_retrieval(self, question: str, expected_sources: List[str]) -> Dict:
        """Evaluate how well the system retrieves relevant documents."""
        contexts = self.rag.vector_store.search_similar(question, n_results=5)
        
        retrieved_files = {Path(ctx['metadata']['source_file']).name 
                          for ctx in contexts}
        expected_files = set(expected_sources)
        
        # Calculate retrieval metrics
        relevant_retrieved = len(retrieved_files.intersection(expected_files))
        precision = relevant_retrieved / len(retrieved_files) if retrieved_files else 0
        recall = relevant_retrieved / len(expected_files) if expected_files else 0
        
        return {
            'precision': precision,
            'recall': recall,
            'f1': 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0,
            'retrieved_sources': list(retrieved_files),
            'expected_sources': list(expected_files)
        }
    
    def run_evaluation(self) -> Dict:
        """Run complete evaluation on test set."""
        if not self.test_questions:
            print("No test questions defined. Use create_test_set() first.")
            return {}
        
        results = {
            'retrieval_metrics': [],
            'answer_quality': [],
            'response_times': []
        }
        
        for test_case in self.test_questions:
            question = test_case['question']
            expected_sources = test_case.get('expected_sources', [])
            
            # Time the query
            import time
            start_time = time.time()
            answer_data = self.rag.ask_question(question, verbose=False)
            response_time = time.time() - start_time
            
            # Evaluate retrieval
            if expected_sources:
                retrieval_metrics = self.evaluate_retrieval(question, expected_sources)
                results['retrieval_metrics'].append(retrieval_metrics)
            
            # Store results
            results['response_times'].append(response_time)
            results['answer_quality'].append({
                'question': question,
                'answer': answer_data['answer'],
                'confidence': answer_data['confidence'],
                'sources_count': len(answer_data['sources'])
            })
        
        # Calculate averages
        if results['retrieval_metrics']:
            avg_precision = sum(r['precision'] for r in results['retrieval_metrics']) / len(results['retrieval_metrics'])
            avg_recall = sum(r['recall'] for r in results['retrieval_metrics']) / len(results['retrieval_metrics'])
            avg_f1 = sum(r['f1'] for r in results['retrieval_metrics']) / len(results['retrieval_metrics'])
            
            results['average_metrics'] = {
                'precision': avg_precision,
                'recall': avg_recall,
                'f1': avg_f1,
                'avg_response_time': sum(results['response_times']) / len(results['response_times'])
            }
        
        return results

# Example usage
evaluator = RAGEvaluator(rag)

# Define test cases
test_cases = [
    {
        'question': 'What is our data retention policy?',
        'expected_sources': ['privacy_policy.pdf', 'data_governance.docx']
    },
    {
        'question': 'How do we handle API rate limiting?',
        'expected_sources': ['api_documentation.pdf']
    }
]

evaluator.create_test_set(test_cases)
evaluation_results = evaluator.run_evaluation()

This evaluation framework helps you systematically improve your RAG system by identifying where retrieval fails and tracking performance over time.

Summary & Next Steps

You've now built a production-ready RAG pipeline that can handle real-world documents and answer questions with proper source attribution. Your system includes document ingestion for multiple formats, intelligent chunking that preserves context, semantic search with vector databases, and a query pipeline that generates accurate answers.

The key insights from this implementation:

  • Document processing matters: Real documents are messy, and robust parsing makes the difference between a working system and a frustrating one
  • Chunking strategy is critical: How you break documents into chunks directly impacts answer quality
  • Source attribution builds trust: Users need to verify answers against original sources
  • Evaluation drives improvement: Systematic testing helps you identify and fix problems

Next steps to enhance your RAG system:

  1. Add more document types: Implement processors for Excel files, HTML pages, and emails
  2. Improve chunking: Experiment with semantic chunking using NLP libraries like spaCy
  3. Add conversation memory: Implement follow-up questions that reference previous context
  4. Scale up storage: Move from ChromaDB to production vector databases like Pinecone or Weaviate
  5. Implement caching: Cache frequent queries to reduce API costs and improve response times

Your RAG pipeline is now ready to handle your organization's documents and provide intelligent, source-backed answers to user questions. The foundation you've built can scale from hundreds to thousands of documents with the right infrastructure choices.

Learning Path: RAG & AI Agents

Previous

Vector Databases: Pinecone, Weaviate, and pgvector Compared - Complete Implementation Guide

Next

Chunking Strategies: How to Split Documents for Better Retrieval

Related Articles

AI & Machine Learning🔥 Expert

Enterprise RAG: Security, Permissions, and Multi-Tenant Architecture

27 min
AI & Machine Learning⚡ Practitioner

Production RAG: Caching, Monitoring, and Continuous Improvement

21 min
AI & Machine Learning🌱 Foundation

Hybrid Search: Combining Keyword and Semantic Search for Better Results

14 min

On this page

  • Prerequisites
  • Understanding the RAG Architecture
  • Setting Up the Document Ingestion Pipeline
  • Intelligent Document Chunking
  • Building the Vector Database
  • Implementing the Query Pipeline
  • Hands-On Exercise: Building Your Document RAG System
  • Common Mistakes & Troubleshooting
  • Advanced: Evaluating Your RAG System
  • Summary & Next Steps