Skip to content

RAG Pipeline

Tutorial 5: Building a RAG Pipeline

Complete end-to-end RAG pipeline with LLM integration.

Goal

Build a production RAG system that: 1. Imports documents from various sources 2. Builds a knowledge graph 3. Retrieves context for user queries 4. Generates responses with an LLM

Step 1: Data Pipeline

import json
from pathlib import Path
from src.network_rag import NetworkRAG

def import_jsonl(rag, file_path):
    """Import documents from JSONL file."""
    count = 0

    with rag.batch() as batch:
        with open(file_path) as f:
            for line in f:
                doc = json.loads(line)
                batch.add(
                    doc['content'],
                    id=doc.get('id'),
                    **doc.get('metadata', {})
                )
                count += 1

    return count

# Initialize RAG
rag = (NetworkRAG.builder()
       .with_storage('knowledge.db')
       .from_config('config/documents.yaml')
       .build())

# Import data
count = import_jsonl(rag, 'data/documents.jsonl')
print(f"Imported {count} documents")

# Build network
graph = rag.build_network()
print(f"Built network: {len(graph.nodes())} nodes, {len(graph.edges())} edges")

Step 2: Retrieval Function

def retrieve_context(rag, query, n=5, strategy='hybrid'):
    """Retrieve relevant context for a query."""
    results = (rag.search(query)
               .with_strategy(strategy)
               .top(n))

    # Format context
    context_parts = []
    for i, result in enumerate(results, 1):
        context_parts.append(f"[{i}] {result.content}")

    return "\n\n".join(context_parts), results

# Test retrieval
query = "How do transformers work?"
context, results = retrieve_context(rag, query, n=5)

print(f"\n=== Retrieved Context ===")
print(context)

Step 3: LLM Integration

def generate_response(query, context, model='gpt-4'):
    """Generate response using retrieved context."""
    prompt = f"""Answer the following question using the provided context.

Context:
{context}

Question: {query}

Answer:"""

    # Call your LLM (OpenAI, Anthropic, Ollama, etc.)
    # response = llm.complete(prompt)

    return response

# Complete RAG pipeline
def rag_query(rag, query, n_context=5):
    """Complete RAG pipeline: retrieve + generate."""

    # Retrieve context
    context, results = retrieve_context(rag, query, n=n_context)

    # Generate response
    response = generate_response(query, context)

    return {
        'query': query,
        'response': response,
        'sources': [r.id for r in results],
        'scores': [r.score for r in results]
    }

# Use pipeline
result = rag_query(rag, "How do transformers work?")
print(f"\nQuery: {result['query']}")
print(f"\nResponse: {result['response']}")
print(f"\nSources: {', '.join(result['sources'])}")

Step 4: Evaluation

def evaluate_retrieval(rag, test_queries):
    """Evaluate retrieval quality."""
    metrics = []

    for query, relevant_docs in test_queries:
        # Retrieve
        results = rag.search(query).top(10)
        retrieved_ids = results.ids()

        # Calculate metrics
        relevant_set = set(relevant_docs)
        retrieved_set = set(retrieved_ids[:5])

        precision = len(relevant_set & retrieved_set) / len(retrieved_set)
        recall = len(relevant_set & retrieved_set) / len(relevant_set)

        metrics.append({
            'query': query,
            'precision': precision,
            'recall': recall
        })

    return metrics

# Test evaluation
test_queries = [
    ("transformer architecture", ["transformer", "bert", "gpt3"]),
    ("image recognition", ["vit", "resnet"]),
]

metrics = evaluate_retrieval(rag, test_queries)

for m in metrics:
    print(f"\nQuery: {m['query']}")
    print(f"  Precision: {m['precision']:.2f}")
    print(f"  Recall: {m['recall']:.2f}")

Step 5: Production Deployment

# app.py - FastAPI application
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from src.network_rag import NetworkRAG

app = FastAPI()

# Initialize RAG (once at startup)
rag = (NetworkRAG.builder()
       .with_storage('knowledge.db')
       .from_config('config/production.yaml')
       .build())

class Query(BaseModel):
    query: str
    n: int = 5
    strategy: str = 'hybrid'

@app.post("/search")
def search(q: Query):
    """Search endpoint."""
    try:
        results = (rag.search(q.query)
                   .with_strategy(q.strategy)
                   .top(q.n))

        return {
            'results': [
                {
                    'id': r.id,
                    'score': r.score,
                    'content': r.content,
                    'metadata': r.metadata
                }
                for r in results
            ]
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/add")
def add_document(doc: dict):
    """Add document endpoint."""
    try:
        node_id = rag.add(doc['content'], **doc.get('metadata', {}))
        rag.build_network(rebuild=True)
        return {'id': node_id}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Run with: uvicorn app:app --reload

Summary

You've learned how to:

  1. Research Papers: Field-specific similarity with chunking
  2. Products: Composite embeddings with exact category matching
  3. Blogs: Hierarchical field weighting
  4. Conversations: Role-weighted embeddings
  5. RAG Pipeline: Complete retrieval-generation system

Next Steps


Complex Network RAG Tutorials - Learn by building.