RAG Pipeline
Tutorial 5: Building a RAG Pipeline¶
Complete end-to-end RAG pipeline with LLM integration.
Goal¶
Build a production RAG system that: 1. Imports documents from various sources 2. Builds a knowledge graph 3. Retrieves context for user queries 4. Generates responses with an LLM
Step 1: Data Pipeline¶
import json
from pathlib import Path
from src.network_rag import NetworkRAG
def import_jsonl(rag, file_path):
"""Import documents from JSONL file."""
count = 0
with rag.batch() as batch:
with open(file_path) as f:
for line in f:
doc = json.loads(line)
batch.add(
doc['content'],
id=doc.get('id'),
**doc.get('metadata', {})
)
count += 1
return count
# Initialize RAG
rag = (NetworkRAG.builder()
.with_storage('knowledge.db')
.from_config('config/documents.yaml')
.build())
# Import data
count = import_jsonl(rag, 'data/documents.jsonl')
print(f"Imported {count} documents")
# Build network
graph = rag.build_network()
print(f"Built network: {len(graph.nodes())} nodes, {len(graph.edges())} edges")
Step 2: Retrieval Function¶
def retrieve_context(rag, query, n=5, strategy='hybrid'):
"""Retrieve relevant context for a query."""
results = (rag.search(query)
.with_strategy(strategy)
.top(n))
# Format context
context_parts = []
for i, result in enumerate(results, 1):
context_parts.append(f"[{i}] {result.content}")
return "\n\n".join(context_parts), results
# Test retrieval
query = "How do transformers work?"
context, results = retrieve_context(rag, query, n=5)
print(f"\n=== Retrieved Context ===")
print(context)
Step 3: LLM Integration¶
def generate_response(query, context, model='gpt-4'):
"""Generate response using retrieved context."""
prompt = f"""Answer the following question using the provided context.
Context:
{context}
Question: {query}
Answer:"""
# Call your LLM (OpenAI, Anthropic, Ollama, etc.)
# response = llm.complete(prompt)
return response
# Complete RAG pipeline
def rag_query(rag, query, n_context=5):
"""Complete RAG pipeline: retrieve + generate."""
# Retrieve context
context, results = retrieve_context(rag, query, n=n_context)
# Generate response
response = generate_response(query, context)
return {
'query': query,
'response': response,
'sources': [r.id for r in results],
'scores': [r.score for r in results]
}
# Use pipeline
result = rag_query(rag, "How do transformers work?")
print(f"\nQuery: {result['query']}")
print(f"\nResponse: {result['response']}")
print(f"\nSources: {', '.join(result['sources'])}")
Step 4: Evaluation¶
def evaluate_retrieval(rag, test_queries):
"""Evaluate retrieval quality."""
metrics = []
for query, relevant_docs in test_queries:
# Retrieve
results = rag.search(query).top(10)
retrieved_ids = results.ids()
# Calculate metrics
relevant_set = set(relevant_docs)
retrieved_set = set(retrieved_ids[:5])
precision = len(relevant_set & retrieved_set) / len(retrieved_set)
recall = len(relevant_set & retrieved_set) / len(relevant_set)
metrics.append({
'query': query,
'precision': precision,
'recall': recall
})
return metrics
# Test evaluation
test_queries = [
("transformer architecture", ["transformer", "bert", "gpt3"]),
("image recognition", ["vit", "resnet"]),
]
metrics = evaluate_retrieval(rag, test_queries)
for m in metrics:
print(f"\nQuery: {m['query']}")
print(f" Precision: {m['precision']:.2f}")
print(f" Recall: {m['recall']:.2f}")
Step 5: Production Deployment¶
# app.py - FastAPI application
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from src.network_rag import NetworkRAG
app = FastAPI()
# Initialize RAG (once at startup)
rag = (NetworkRAG.builder()
.with_storage('knowledge.db')
.from_config('config/production.yaml')
.build())
class Query(BaseModel):
query: str
n: int = 5
strategy: str = 'hybrid'
@app.post("/search")
def search(q: Query):
"""Search endpoint."""
try:
results = (rag.search(q.query)
.with_strategy(q.strategy)
.top(q.n))
return {
'results': [
{
'id': r.id,
'score': r.score,
'content': r.content,
'metadata': r.metadata
}
for r in results
]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/add")
def add_document(doc: dict):
"""Add document endpoint."""
try:
node_id = rag.add(doc['content'], **doc.get('metadata', {}))
rag.build_network(rebuild=True)
return {'id': node_id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Run with: uvicorn app:app --reload
Summary¶
You've learned how to:
- Research Papers: Field-specific similarity with chunking
- Products: Composite embeddings with exact category matching
- Blogs: Hierarchical field weighting
- Conversations: Role-weighted embeddings
- RAG Pipeline: Complete retrieval-generation system
Next Steps¶
- Explore Core Concepts for deeper understanding
- See API Reference for all available methods
- Check YAML DSL Reference for configuration options
- Review examples/ for more working code
Complex Network RAG Tutorials - Learn by building.