feat(llm-application-dev): modernize to LangGraph and latest models v2.0.0

- Migrate from LangChain 0.x to LangChain 1.x/LangGraph patterns
- Update model references to Claude 4.5 and GPT-5.2
- Add Voyage AI as primary embedding recommendation
- Add structured outputs with Pydantic
- Replace deprecated initialize_agent() with StateGraph
- Fix security: use AST-based safe math instead of unsafe execution
- Add plugin.json and README.md for consistency
- Bump marketplace version to 1.3.3
This commit is contained in:
Seth Hobson
2026-01-19 15:43:25 -05:00
parent e827cc713a
commit 8be0e8ac7a
12 changed files with 1940 additions and 708 deletions

View File

@@ -18,14 +18,18 @@ Guide to selecting and optimizing embedding models for vector search application
## Core Concepts
### 1. Embedding Model Comparison
### 1. Embedding Model Comparison (2026)
| Model | Dimensions | Max Tokens | Best For |
|-------|------------|------------|----------|
| **text-embedding-3-large** | 3072 | 8191 | High accuracy |
| **text-embedding-3-small** | 1536 | 8191 | Cost-effective |
| **voyage-2** | 1024 | 4000 | Code, legal |
| **bge-large-en-v1.5** | 1024 | 512 | Open source |
| **voyage-3-large** | 1024 | 32000 | Claude apps (Anthropic recommended) |
| **voyage-3** | 1024 | 32000 | Claude apps, cost-effective |
| **voyage-code-3** | 1024 | 32000 | Code search |
| **voyage-finance-2** | 1024 | 32000 | Financial documents |
| **voyage-law-2** | 1024 | 32000 | Legal documents |
| **text-embedding-3-large** | 3072 | 8191 | OpenAI apps, high accuracy |
| **text-embedding-3-small** | 1536 | 8191 | OpenAI apps, cost-effective |
| **bge-large-en-v1.5** | 1024 | 512 | Open source, local deployment |
| **all-MiniLM-L6-v2** | 384 | 256 | Fast, lightweight |
| **multilingual-e5-large** | 1024 | 512 | Multi-language |
@@ -39,7 +43,34 @@ Document → Chunking → Preprocessing → Embedding Model → Vector
## Templates
### Template 1: OpenAI Embeddings
### Template 1: Voyage AI Embeddings (Recommended for Claude)
```python
from langchain_voyageai import VoyageAIEmbeddings
from typing import List
import os
# Initialize Voyage AI embeddings (recommended by Anthropic for Claude)
embeddings = VoyageAIEmbeddings(
model="voyage-3-large",
voyage_api_key=os.environ.get("VOYAGE_API_KEY")
)
def get_embeddings(texts: List[str]) -> List[List[float]]:
"""Get embeddings from Voyage AI."""
return embeddings.embed_documents(texts)
def get_query_embedding(query: str) -> List[float]:
"""Get single query embedding."""
return embeddings.embed_query(query)
# Specialized models for domains
code_embeddings = VoyageAIEmbeddings(model="voyage-code-3")
finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2")
legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")
```
### Template 2: OpenAI Embeddings
```python
from openai import OpenAI
@@ -53,7 +84,7 @@ def get_embeddings(
model: str = "text-embedding-3-small",
dimensions: int = None
) -> List[List[float]]:
"""Get embeddings from OpenAI."""
"""Get embeddings from OpenAI with optional dimension reduction."""
# Handle batching for large lists
batch_size = 100
all_embeddings = []
@@ -63,6 +94,7 @@ def get_embeddings(
kwargs = {"input": batch, "model": model}
if dimensions:
# Matryoshka dimensionality reduction
kwargs["dimensions"] = dimensions
response = client.embeddings.create(**kwargs)
@@ -77,7 +109,7 @@ def get_embedding(text: str, **kwargs) -> List[float]:
return get_embeddings([text], **kwargs)[0]
# Dimension reduction with OpenAI
# Dimension reduction with Matryoshka embeddings
def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:
"""Get embedding with reduced dimensions (Matryoshka)."""
return get_embedding(
@@ -87,7 +119,7 @@ def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:
)
```
### Template 2: Local Embeddings with Sentence Transformers
### Template 3: Local Embeddings with Sentence Transformers
```python
from sentence_transformers import SentenceTransformer
@@ -103,6 +135,7 @@ class LocalEmbedder:
device: str = "cuda"
):
self.model = SentenceTransformer(model_name, device=device)
self.model_name = model_name
def embed(
self,
@@ -120,9 +153,9 @@ class LocalEmbedder:
return embeddings
def embed_query(self, query: str) -> np.ndarray:
"""Embed a query with BGE-style prefix."""
# BGE models benefit from query prefix
if "bge" in self.model.get_sentence_embedding_dimension():
"""Embed a query with appropriate prefix for retrieval models."""
# BGE and similar models benefit from query prefix
if "bge" in self.model_name.lower():
query = f"Represent this sentence for searching relevant passages: {query}"
return self.embed([query])[0]
@@ -137,13 +170,15 @@ class E5Embedder:
self.model = SentenceTransformer(model_name)
def embed_query(self, query: str) -> np.ndarray:
"""E5 requires 'query:' prefix for queries."""
return self.model.encode(f"query: {query}")
def embed_document(self, document: str) -> np.ndarray:
"""E5 requires 'passage:' prefix for documents."""
return self.model.encode(f"passage: {document}")
```
### Template 3: Chunking Strategies
### Template 4: Chunking Strategies
```python
from typing import List, Tuple
@@ -288,20 +323,33 @@ def recursive_character_splitter(
return split_text(text, separators)
```
### Template 4: Domain-Specific Embedding Pipeline
### Template 5: Domain-Specific Embedding Pipeline
```python
import re
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class EmbeddedDocument:
id: str
document_id: str
chunk_index: int
text: str
embedding: List[float]
metadata: dict
class DomainEmbeddingPipeline:
"""Pipeline for domain-specific embeddings."""
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
embedding_model: str = "voyage-3-large",
chunk_size: int = 512,
chunk_overlap: int = 50,
preprocessing_fn=None
):
self.embedding_model = embedding_model
self.embeddings = VoyageAIEmbeddings(model=embedding_model)
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.preprocess = preprocessing_fn or self._default_preprocess
@@ -310,7 +358,7 @@ class DomainEmbeddingPipeline:
"""Default preprocessing."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters
# Remove special characters (customize for your domain)
text = re.sub(r'[^\w\s.,!?-]', '', text)
return text.strip()
@@ -319,8 +367,8 @@ class DomainEmbeddingPipeline:
documents: List[dict],
id_field: str = "id",
content_field: str = "content",
metadata_fields: List[str] = None
) -> List[dict]:
metadata_fields: Optional[List[str]] = None
) -> List[EmbeddedDocument]:
"""Process documents for vector storage."""
processed = []
@@ -339,25 +387,26 @@ class DomainEmbeddingPipeline:
)
# Create embeddings
embeddings = get_embeddings(chunks, self.embedding_model)
embeddings = await self.embeddings.aembed_documents(chunks)
# Create records
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
record = {
"id": f"{doc_id}_chunk_{i}",
"document_id": doc_id,
"chunk_index": i,
"text": chunk,
"embedding": embedding
}
metadata = {"document_id": doc_id, "chunk_index": i}
# Add metadata
# Add specified metadata fields
if metadata_fields:
for field in metadata_fields:
if field in doc:
record[field] = doc[field]
metadata[field] = doc[field]
processed.append(record)
processed.append(EmbeddedDocument(
id=f"{doc_id}_chunk_{i}",
document_id=doc_id,
chunk_index=i,
text=chunk,
embedding=embedding,
metadata=metadata
))
return processed
@@ -366,42 +415,77 @@ class DomainEmbeddingPipeline:
class CodeEmbeddingPipeline:
"""Specialized pipeline for code embeddings."""
def __init__(self, model: str = "voyage-code-2"):
self.model = model
def __init__(self):
# Use Voyage's code-specific model
self.embeddings = VoyageAIEmbeddings(model="voyage-code-3")
def chunk_code(self, code: str, language: str) -> List[dict]:
"""Chunk code by functions/classes."""
import tree_sitter
"""Chunk code by functions/classes using tree-sitter."""
try:
import tree_sitter_languages
parser = tree_sitter_languages.get_parser(language)
tree = parser.parse(bytes(code, "utf8"))
# Parse with tree-sitter
# Extract functions, classes, methods
# Return chunks with context
pass
chunks = []
# Extract function and class definitions
self._extract_nodes(tree.root_node, code, chunks)
return chunks
except ImportError:
# Fallback to simple chunking
return [{"text": code, "type": "module"}]
def embed_with_context(self, chunk: str, context: str) -> List[float]:
def _extract_nodes(self, node, source_code: str, chunks: list):
"""Recursively extract function/class definitions."""
if node.type in ['function_definition', 'class_definition', 'method_definition']:
text = source_code[node.start_byte:node.end_byte]
chunks.append({
"text": text,
"type": node.type,
"name": self._get_name(node),
"start_line": node.start_point[0],
"end_line": node.end_point[0]
})
for child in node.children:
self._extract_nodes(child, source_code, chunks)
def _get_name(self, node) -> str:
"""Extract name from function/class node."""
for child in node.children:
if child.type == 'identifier' or child.type == 'name':
return child.text.decode('utf8')
return "unknown"
async def embed_with_context(
self,
chunk: str,
context: str = ""
) -> List[float]:
"""Embed code with surrounding context."""
combined = f"Context: {context}\n\nCode:\n{chunk}"
return get_embedding(combined, model=self.model)
if context:
combined = f"Context: {context}\n\nCode:\n{chunk}"
else:
combined = chunk
return await self.embeddings.aembed_query(combined)
```
### Template 5: Embedding Quality Evaluation
### Template 6: Embedding Quality Evaluation
```python
import numpy as np
from typing import List, Tuple
from typing import List, Dict
def evaluate_retrieval_quality(
queries: List[str],
relevant_docs: List[List[str]], # List of relevant doc IDs per query
retrieved_docs: List[List[str]], # List of retrieved doc IDs per query
k: int = 10
) -> dict:
) -> Dict[str, float]:
"""Evaluate embedding quality for retrieval."""
def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / k
return relevant_retrieved / k if k > 0 else 0
def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
@@ -446,7 +530,7 @@ def compute_embedding_similarity(
) -> np.ndarray:
"""Compute similarity matrix between embedding sets."""
if metric == "cosine":
# Normalize
# Normalize and compute dot product
norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)
norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)
return norm1 @ norm2.T
@@ -455,25 +539,68 @@ def compute_embedding_similarity(
return -cdist(embeddings1, embeddings2, metric='euclidean')
elif metric == "dot":
return embeddings1 @ embeddings2.T
else:
raise ValueError(f"Unknown metric: {metric}")
def compare_embedding_models(
texts: List[str],
models: Dict[str, callable],
queries: List[str],
relevant_indices: List[List[int]],
k: int = 5
) -> Dict[str, Dict[str, float]]:
"""Compare multiple embedding models on retrieval quality."""
results = {}
for model_name, embed_fn in models.items():
# Embed all texts
doc_embeddings = np.array(embed_fn(texts))
retrieved_per_query = []
for query in queries:
query_embedding = np.array(embed_fn([query])[0])
# Compute similarities
similarities = compute_embedding_similarity(
query_embedding.reshape(1, -1),
doc_embeddings,
metric="cosine"
)[0]
# Get top-k indices
top_k_indices = np.argsort(similarities)[::-1][:k]
retrieved_per_query.append([str(i) for i in top_k_indices])
# Convert relevant indices to string IDs
relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]
results[model_name] = evaluate_retrieval_quality(
queries, relevant_docs, retrieved_per_query, k
)
return results
```
## Best Practices
### Do's
- **Match model to use case** - Code vs prose vs multilingual
- **Chunk thoughtfully** - Preserve semantic boundaries
- **Normalize embeddings** - For cosine similarity
- **Batch requests** - More efficient than one-by-one
- **Cache embeddings** - Avoid recomputing
- **Match model to use case**: Code vs prose vs multilingual
- **Chunk thoughtfully**: Preserve semantic boundaries
- **Normalize embeddings**: For cosine similarity search
- **Batch requests**: More efficient than one-by-one
- **Cache embeddings**: Avoid recomputing for static content
- **Use Voyage AI for Claude apps**: Recommended by Anthropic
### Don'ts
- **Don't ignore token limits** - Truncation loses info
- **Don't mix embedding models** - Incompatible spaces
- **Don't skip preprocessing** - Garbage in, garbage out
- **Don't over-chunk** - Lose context
- **Don't ignore token limits**: Truncation loses information
- **Don't mix embedding models**: Incompatible vector spaces
- **Don't skip preprocessing**: Garbage in, garbage out
- **Don't over-chunk**: Lose important context
- **Don't forget metadata**: Essential for filtering and debugging
## Resources
- [OpenAI Embeddings](https://platform.openai.com/docs/guides/embeddings)
- [Voyage AI Documentation](https://docs.voyageai.com/)
- [OpenAI Embeddings Guide](https://platform.openai.com/docs/guides/embeddings)
- [Sentence Transformers](https://www.sbert.net/)
- [MTEB Benchmark](https://huggingface.co/spaces/mteb/leaderboard)
- [LangChain Embedding Models](https://python.langchain.com/docs/integrations/text_embedding/)

View File

@@ -1,11 +1,11 @@
---
name: langchain-architecture
description: Design LLM applications using the LangChain framework with agents, memory, and tool integration patterns. Use when building LangChain applications, implementing AI agents, or creating complex LLM workflows.
description: Design LLM applications using LangChain 1.x and LangGraph for agents, memory, and tool integration. Use when building LangChain applications, implementing AI agents, or creating complex LLM workflows.
---
# LangChain Architecture
# LangChain & LangGraph Architecture
Master the LangChain framework for building sophisticated LLM applications with agents, chains, memory, and tool integration.
Master modern LangChain 1.x and LangGraph for building sophisticated LLM applications with agents, state management, memory, and tool integration.
## When to Use This Skill
@@ -17,126 +17,100 @@ Master the LangChain framework for building sophisticated LLM applications with
- Implementing document processing pipelines
- Building production-grade LLM applications
## Package Structure (LangChain 1.x)
```
langchain (1.2.x) # High-level orchestration
langchain-core (1.2.x) # Core abstractions (messages, prompts, tools)
langchain-community # Third-party integrations
langgraph # Agent orchestration and state management
langchain-openai # OpenAI integrations
langchain-anthropic # Anthropic/Claude integrations
langchain-voyageai # Voyage AI embeddings
langchain-pinecone # Pinecone vector store
```
## Core Concepts
### 1. Agents
Autonomous systems that use LLMs to decide which actions to take.
### 1. LangGraph Agents
LangGraph is the standard for building agents in 2026. It provides:
**Agent Types:**
- **ReAct**: Reasoning + Acting in interleaved manner
- **OpenAI Functions**: Leverages function calling API
- **Structured Chat**: Handles multi-input tools
- **Conversational**: Optimized for chat interfaces
- **Self-Ask with Search**: Decomposes complex queries
**Key Features:**
- **StateGraph**: Explicit state management with typed state
- **Durable Execution**: Agents persist through failures
- **Human-in-the-Loop**: Inspect and modify state at any point
- **Memory**: Short-term and long-term memory across sessions
- **Checkpointing**: Save and resume agent state
### 2. Chains
Sequences of calls to LLMs or other utilities.
**Agent Patterns:**
- **ReAct**: Reasoning + Acting with `create_react_agent`
- **Plan-and-Execute**: Separate planning and execution nodes
- **Multi-Agent**: Supervisor routing between specialized agents
- **Tool-Calling**: Structured tool invocation with Pydantic schemas
**Chain Types:**
- **LLMChain**: Basic prompt + LLM combination
- **SequentialChain**: Multiple chains in sequence
- **RouterChain**: Routes inputs to specialized chains
- **TransformChain**: Data transformations between steps
- **MapReduceChain**: Parallel processing with aggregation
### 2. State Management
LangGraph uses TypedDict for explicit state:
### 3. Memory
Systems for maintaining context across interactions.
```python
from typing import Annotated, TypedDict
from langgraph.graph import MessagesState
**Memory Types:**
- **ConversationBufferMemory**: Stores all messages
- **ConversationSummaryMemory**: Summarizes older messages
- **ConversationBufferWindowMemory**: Keeps last N messages
- **EntityMemory**: Tracks information about entities
- **VectorStoreMemory**: Semantic similarity retrieval
# Simple message-based state
class AgentState(MessagesState):
"""Extends MessagesState with custom fields."""
context: Annotated[list, "retrieved documents"]
# Custom state for complex agents
class CustomState(TypedDict):
messages: Annotated[list, "conversation history"]
context: Annotated[dict, "retrieved context"]
current_step: str
results: list
```
### 3. Memory Systems
Modern memory implementations:
- **ConversationBufferMemory**: Stores all messages (short conversations)
- **ConversationSummaryMemory**: Summarizes older messages (long conversations)
- **ConversationTokenBufferMemory**: Token-based windowing
- **VectorStoreRetrieverMemory**: Semantic similarity retrieval
- **LangGraph Checkpointers**: Persistent state across sessions
### 4. Document Processing
Loading, transforming, and storing documents for retrieval.
Loading, transforming, and storing documents:
**Components:**
- **Document Loaders**: Load from various sources
- **Text Splitters**: Chunk documents intelligently
- **Vector Stores**: Store and retrieve embeddings
- **Retrievers**: Fetch relevant documents
- **Indexes**: Organize documents for efficient access
### 5. Callbacks
Hooks for logging, monitoring, and debugging.
### 5. Callbacks & Tracing
LangSmith is the standard for observability:
**Use Cases:**
- Request/response logging
- Token usage tracking
- Latency monitoring
- Error handling
- Custom metrics collection
- Error tracking
- Trace visualization
## Quick Start
### Modern ReAct Agent with LangGraph
```python
from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.llms import OpenAI
from langchain.memory import ConversationBufferMemory
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
import ast
import operator
# Initialize LLM
llm = OpenAI(temperature=0)
# Load tools
tools = load_tools(["serpapi", "llm-math"], llm=llm)
# Add memory
memory = ConversationBufferMemory(memory_key="chat_history")
# Create agent
agent = initialize_agent(
tools,
llm,
agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
memory=memory,
verbose=True
)
# Run agent
result = agent.run("What's the weather in SF? Then calculate 25 * 4")
```
## Architecture Patterns
### Pattern 1: RAG with LangChain
```python
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
# Load and process documents
loader = TextLoader('documents.txt')
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
texts = text_splitter.split_documents(documents)
# Create vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(texts, embeddings)
# Create retrieval chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(),
return_source_documents=True
)
# Query
result = qa_chain({"query": "What is the main topic?"})
```
### Pattern 2: Custom Agent with Tools
```python
from langchain.agents import Tool, AgentExecutor
from langchain.agents.react.base import ReActDocstoreAgent
from langchain.tools import tool
# Initialize LLM (Claude Sonnet 4.5 recommended)
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)
# Define tools with Pydantic schemas
@tool
def search_database(query: str) -> str:
"""Search internal database for information."""
@@ -144,195 +118,541 @@ def search_database(query: str) -> str:
return f"Results for: {query}"
@tool
def send_email(recipient: str, content: str) -> str:
def calculate(expression: str) -> str:
"""Safely evaluate a mathematical expression.
Supports: +, -, *, /, **, %, parentheses
Example: '(2 + 3) * 4' returns '20'
"""
# Safe math evaluation using ast
allowed_operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
ast.USub: operator.neg,
}
def _eval(node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
left = _eval(node.left)
right = _eval(node.right)
return allowed_operators[type(node.op)](left, right)
elif isinstance(node, ast.UnaryOp):
operand = _eval(node.operand)
return allowed_operators[type(node.op)](operand)
else:
raise ValueError(f"Unsupported operation: {type(node)}")
try:
tree = ast.parse(expression, mode='eval')
return str(_eval(tree.body))
except Exception as e:
return f"Error: {e}"
tools = [search_database, calculate]
# Create checkpointer for memory persistence
checkpointer = MemorySaver()
# Create ReAct agent
agent = create_react_agent(
llm,
tools,
checkpointer=checkpointer
)
# Run agent with thread ID for memory
config = {"configurable": {"thread_id": "user-123"}}
result = await agent.ainvoke(
{"messages": [("user", "Search for Python tutorials and calculate 25 * 4")]},
config=config
)
```
## Architecture Patterns
### Pattern 1: RAG with LangGraph
```python
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_voyageai import VoyageAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, Annotated
class RAGState(TypedDict):
question: str
context: Annotated[list[Document], "retrieved documents"]
answer: str
# Initialize components
llm = ChatAnthropic(model="claude-sonnet-4-5")
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
vectorstore = PineconeVectorStore(index_name="docs", embedding=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# Define nodes
async def retrieve(state: RAGState) -> RAGState:
"""Retrieve relevant documents."""
docs = await retriever.ainvoke(state["question"])
return {"context": docs}
async def generate(state: RAGState) -> RAGState:
"""Generate answer from context."""
prompt = ChatPromptTemplate.from_template(
"""Answer based on the context below. If you cannot answer, say so.
Context: {context}
Question: {question}
Answer:"""
)
context_text = "\n\n".join(doc.page_content for doc in state["context"])
response = await llm.ainvoke(
prompt.format(context=context_text, question=state["question"])
)
return {"answer": response.content}
# Build graph
builder = StateGraph(RAGState)
builder.add_node("retrieve", retrieve)
builder.add_node("generate", generate)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", END)
rag_chain = builder.compile()
# Use the chain
result = await rag_chain.ainvoke({"question": "What is the main topic?"})
```
### Pattern 2: Custom Agent with Structured Tools
```python
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
"""Input for database search."""
query: str = Field(description="Search query")
filters: dict = Field(default={}, description="Optional filters")
class EmailInput(BaseModel):
"""Input for sending email."""
recipient: str = Field(description="Email recipient")
subject: str = Field(description="Email subject")
content: str = Field(description="Email body")
async def search_database(query: str, filters: dict = {}) -> str:
"""Search internal database for information."""
# Your database search logic
return f"Results for '{query}' with filters {filters}"
async def send_email(recipient: str, subject: str, content: str) -> str:
"""Send an email to specified recipient."""
# Email sending logic
return f"Email sent to {recipient}"
tools = [search_database, send_email]
tools = [
StructuredTool.from_function(
coroutine=search_database,
name="search_database",
description="Search internal database",
args_schema=SearchInput
),
StructuredTool.from_function(
coroutine=send_email,
name="send_email",
description="Send an email",
args_schema=EmailInput
)
]
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
agent = create_react_agent(llm, tools)
```
### Pattern 3: Multi-Step Chain
### Pattern 3: Multi-Step Workflow with StateGraph
```python
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal
# Step 1: Extract key information
extract_prompt = PromptTemplate(
input_variables=["text"],
template="Extract key entities from: {text}\n\nEntities:"
)
extract_chain = LLMChain(llm=llm, prompt=extract_prompt, output_key="entities")
class WorkflowState(TypedDict):
text: str
entities: list
analysis: str
summary: str
current_step: str
# Step 2: Analyze entities
analyze_prompt = PromptTemplate(
input_variables=["entities"],
template="Analyze these entities: {entities}\n\nAnalysis:"
)
analyze_chain = LLMChain(llm=llm, prompt=analyze_prompt, output_key="analysis")
async def extract_entities(state: WorkflowState) -> WorkflowState:
"""Extract key entities from text."""
prompt = f"Extract key entities from: {state['text']}\n\nReturn as JSON list."
response = await llm.ainvoke(prompt)
return {"entities": response.content, "current_step": "analyze"}
# Step 3: Generate summary
summary_prompt = PromptTemplate(
input_variables=["entities", "analysis"],
template="Summarize:\nEntities: {entities}\nAnalysis: {analysis}\n\nSummary:"
)
summary_chain = LLMChain(llm=llm, prompt=summary_prompt, output_key="summary")
async def analyze_entities(state: WorkflowState) -> WorkflowState:
"""Analyze extracted entities."""
prompt = f"Analyze these entities: {state['entities']}\n\nProvide insights."
response = await llm.ainvoke(prompt)
return {"analysis": response.content, "current_step": "summarize"}
# Combine into sequential chain
overall_chain = SequentialChain(
chains=[extract_chain, analyze_chain, summary_chain],
input_variables=["text"],
output_variables=["entities", "analysis", "summary"],
verbose=True
)
async def generate_summary(state: WorkflowState) -> WorkflowState:
"""Generate final summary."""
prompt = f"""Summarize:
Entities: {state['entities']}
Analysis: {state['analysis']}
Provide a concise summary."""
response = await llm.ainvoke(prompt)
return {"summary": response.content, "current_step": "complete"}
def route_step(state: WorkflowState) -> Literal["analyze", "summarize", "end"]:
"""Route to next step based on current state."""
step = state.get("current_step", "extract")
if step == "analyze":
return "analyze"
elif step == "summarize":
return "summarize"
return "end"
# Build workflow
builder = StateGraph(WorkflowState)
builder.add_node("extract", extract_entities)
builder.add_node("analyze", analyze_entities)
builder.add_node("summarize", generate_summary)
builder.add_edge(START, "extract")
builder.add_conditional_edges("extract", route_step, {
"analyze": "analyze",
"summarize": "summarize",
"end": END
})
builder.add_conditional_edges("analyze", route_step, {
"summarize": "summarize",
"end": END
})
builder.add_edge("summarize", END)
workflow = builder.compile()
```
## Memory Management Best Practices
### Pattern 4: Multi-Agent Orchestration
### Choosing the Right Memory Type
```python
# For short conversations (< 10 messages)
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory()
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from typing import Literal
# For long conversations (summarize old messages)
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(llm=llm)
class MultiAgentState(TypedDict):
messages: list
next_agent: str
# For sliding window (last N messages)
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(k=5)
# Create specialized agents
researcher = create_react_agent(llm, research_tools)
writer = create_react_agent(llm, writing_tools)
reviewer = create_react_agent(llm, review_tools)
# For entity tracking
from langchain.memory import ConversationEntityMemory
memory = ConversationEntityMemory(llm=llm)
async def supervisor(state: MultiAgentState) -> MultiAgentState:
"""Route to appropriate agent based on task."""
prompt = f"""Based on the conversation, which agent should handle this?
# For semantic retrieval of relevant history
from langchain.memory import VectorStoreRetrieverMemory
memory = VectorStoreRetrieverMemory(retriever=retriever)
Options:
- researcher: For finding information
- writer: For creating content
- reviewer: For reviewing and editing
- FINISH: Task is complete
Messages: {state['messages']}
Respond with just the agent name."""
response = await llm.ainvoke(prompt)
return {"next_agent": response.content.strip().lower()}
def route_to_agent(state: MultiAgentState) -> Literal["researcher", "writer", "reviewer", "end"]:
"""Route based on supervisor decision."""
next_agent = state.get("next_agent", "").lower()
if next_agent == "finish":
return "end"
return next_agent if next_agent in ["researcher", "writer", "reviewer"] else "end"
# Build multi-agent graph
builder = StateGraph(MultiAgentState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("reviewer", reviewer)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", route_to_agent, {
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
"end": END
})
# Each agent returns to supervisor
for agent in ["researcher", "writer", "reviewer"]:
builder.add_edge(agent, "supervisor")
multi_agent = builder.compile()
```
## Callback System
## Memory Management
### Token-Based Memory with LangGraph
```python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
# In-memory checkpointer (development)
checkpointer = MemorySaver()
# Create agent with persistent memory
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
# Each thread_id maintains separate conversation
config = {"configurable": {"thread_id": "session-abc123"}}
# Messages persist across invocations with same thread_id
result1 = await agent.ainvoke({"messages": [("user", "My name is Alice")]}, config)
result2 = await agent.ainvoke({"messages": [("user", "What's my name?")]}, config)
# Agent remembers: "Your name is Alice"
```
### Production Memory with PostgreSQL
```python
from langgraph.checkpoint.postgres import PostgresSaver
# Production checkpointer
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
```
### Vector Store Memory for Long-Term Context
```python
from langchain_community.vectorstores import Chroma
from langchain_voyageai import VoyageAIEmbeddings
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
memory_store = Chroma(
collection_name="conversation_memory",
embedding_function=embeddings,
persist_directory="./memory_db"
)
async def retrieve_relevant_memory(query: str, k: int = 5) -> list:
"""Retrieve relevant past conversations."""
docs = await memory_store.asimilarity_search(query, k=k)
return [doc.page_content for doc in docs]
async def store_memory(content: str, metadata: dict = {}):
"""Store conversation in long-term memory."""
await memory_store.aadd_texts([content], metadatas=[metadata])
```
## Callback System & LangSmith
### LangSmith Tracing
```python
import os
from langchain_anthropic import ChatAnthropic
# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# All LangChain/LangGraph operations are automatically traced
llm = ChatAnthropic(model="claude-sonnet-4-5")
```
### Custom Callback Handler
```python
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict, List
class CustomCallbackHandler(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"LLM started with prompts: {prompts}")
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs
) -> None:
print(f"LLM started with {len(prompts)} prompts")
def on_llm_end(self, response, **kwargs):
print(f"LLM ended with response: {response}")
def on_llm_end(self, response, **kwargs) -> None:
print(f"LLM completed: {len(response.generations)} generations")
def on_llm_error(self, error, **kwargs):
def on_llm_error(self, error: Exception, **kwargs) -> None:
print(f"LLM error: {error}")
def on_chain_start(self, serialized, inputs, **kwargs):
print(f"Chain started with inputs: {inputs}")
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs
) -> None:
print(f"Tool started: {serialized.get('name')}")
def on_agent_action(self, action, **kwargs):
print(f"Agent taking action: {action}")
def on_tool_end(self, output: str, **kwargs) -> None:
print(f"Tool completed: {output[:100]}...")
# Use callback
agent.run("query", callbacks=[CustomCallbackHandler()])
# Use callbacks
result = await agent.ainvoke(
{"messages": [("user", "query")]},
config={"callbacks": [CustomCallbackHandler()]}
)
```
## Streaming Responses
```python
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-5", streaming=True)
# Stream tokens
async for chunk in llm.astream("Tell me a story"):
print(chunk.content, end="", flush=True)
# Stream agent events
async for event in agent.astream_events(
{"messages": [("user", "Search and summarize")]},
version="v2"
):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")
elif event["event"] == "on_tool_start":
print(f"\n[Using tool: {event['name']}]")
```
## Testing Strategies
```python
import pytest
from unittest.mock import Mock
from unittest.mock import AsyncMock, patch
def test_agent_tool_selection():
# Mock LLM to return specific tool selection
mock_llm = Mock()
mock_llm.predict.return_value = "Action: search_database\nAction Input: test query"
@pytest.mark.asyncio
async def test_agent_tool_selection():
"""Test agent selects correct tool."""
with patch.object(llm, 'ainvoke') as mock_llm:
mock_llm.return_value = AsyncMock(content="Using search_database")
agent = initialize_agent(tools, mock_llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION)
result = await agent.ainvoke({
"messages": [("user", "search for documents")]
})
result = agent.run("test query")
# Verify tool was called
assert "search_database" in str(result)
# Verify correct tool was selected
assert "search_database" in str(mock_llm.predict.call_args)
@pytest.mark.asyncio
async def test_memory_persistence():
"""Test memory persists across invocations."""
config = {"configurable": {"thread_id": "test-thread"}}
def test_memory_persistence():
memory = ConversationBufferMemory()
# First message
await agent.ainvoke(
{"messages": [("user", "Remember: the code is 12345")]},
config
)
memory.save_context({"input": "Hi"}, {"output": "Hello!"})
# Second message should remember
result = await agent.ainvoke(
{"messages": [("user", "What was the code?")]},
config
)
assert "Hi" in memory.load_memory_variables({})['history']
assert "Hello!" in memory.load_memory_variables({})['history']
assert "12345" in result["messages"][-1].content
```
## Performance Optimization
### 1. Caching
```python
from langchain.cache import InMemoryCache
import langchain
### 1. Caching with Redis
langchain.llm_cache = InMemoryCache()
```python
from langchain_community.cache import RedisCache
from langchain_core.globals import set_llm_cache
import redis
redis_client = redis.Redis.from_url("redis://localhost:6379")
set_llm_cache(RedisCache(redis_client))
```
### 2. Batch Processing
### 2. Async Batch Processing
```python
# Process multiple documents in parallel
from langchain.document_loaders import DirectoryLoader
from concurrent.futures import ThreadPoolExecutor
import asyncio
from langchain_core.documents import Document
loader = DirectoryLoader('./docs')
docs = loader.load()
async def process_documents(documents: list[Document]) -> list:
"""Process documents in parallel."""
tasks = [process_single(doc) for doc in documents]
return await asyncio.gather(*tasks)
def process_doc(doc):
return text_splitter.split_documents([doc])
with ThreadPoolExecutor(max_workers=4) as executor:
split_docs = list(executor.map(process_doc, docs))
async def process_single(doc: Document) -> dict:
"""Process a single document."""
chunks = text_splitter.split_documents([doc])
embeddings = await embeddings_model.aembed_documents(
[c.page_content for c in chunks]
)
return {"doc_id": doc.metadata.get("id"), "embeddings": embeddings}
```
### 3. Streaming Responses
```python
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
### 3. Connection Pooling
llm = OpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()])
```python
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone
# Reuse Pinecone client
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index = pc.Index("my-index")
# Create vector store with existing index
vectorstore = PineconeVectorStore(index=index, embedding=embeddings)
```
## Resources
- **references/agents.md**: Deep dive on agent architectures
- **references/memory.md**: Memory system patterns
- **references/chains.md**: Chain composition strategies
- **references/document-processing.md**: Document loading and indexing
- **references/callbacks.md**: Monitoring and observability
- **assets/agent-template.py**: Production-ready agent template
- **assets/memory-config.yaml**: Memory configuration examples
- **assets/chain-example.py**: Complex chain examples
- [LangChain Documentation](https://python.langchain.com/docs/)
- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)
- [LangSmith Platform](https://smith.langchain.com/)
- [LangChain GitHub](https://github.com/langchain-ai/langchain)
- [LangGraph GitHub](https://github.com/langchain-ai/langgraph)
## Common Pitfalls
1. **Memory Overflow**: Not managing conversation history length
2. **Tool Selection Errors**: Poor tool descriptions confuse agents
3. **Context Window Exceeded**: Exceeding LLM token limits
4. **No Error Handling**: Not catching and handling agent failures
5. **Inefficient Retrieval**: Not optimizing vector store queries
1. **Using Deprecated APIs**: Use LangGraph for agents, not `initialize_agent`
2. **Memory Overflow**: Use checkpointers with TTL for long-running agents
3. **Poor Tool Descriptions**: Clear descriptions help LLM select correct tools
4. **Context Window Exceeded**: Use summarization or sliding window memory
5. **No Error Handling**: Wrap tool functions with try/except
6. **Blocking Operations**: Use async methods (`ainvoke`, `astream`)
7. **Missing Observability**: Always enable LangSmith tracing in production
## Production Checklist
- [ ] Implement proper error handling
- [ ] Add request/response logging
- [ ] Monitor token usage and costs
- [ ] Set timeout limits for agent execution
- [ ] Use LangGraph StateGraph for agent orchestration
- [ ] Implement async patterns throughout (`ainvoke`, `astream`)
- [ ] Add production checkpointer (PostgreSQL, Redis)
- [ ] Enable LangSmith tracing
- [ ] Implement structured tools with Pydantic schemas
- [ ] Add timeout limits for agent execution
- [ ] Implement rate limiting
- [ ] Add input validation
- [ ] Test with edge cases
- [ ] Set up observability (callbacks)
- [ ] Implement fallback strategies
- [ ] Add comprehensive error handling
- [ ] Set up health checks
- [ ] Version control prompts and configurations
- [ ] Write integration tests for agent workflows

View File

@@ -64,34 +64,71 @@ Use stronger LLMs to evaluate weaker model outputs.
## Quick Start
```python
from llm_eval import EvaluationSuite, Metric
from dataclasses import dataclass
from typing import Callable
import numpy as np
# Define evaluation suite
@dataclass
class Metric:
name: str
fn: Callable
@staticmethod
def accuracy():
return Metric("accuracy", calculate_accuracy)
@staticmethod
def bleu():
return Metric("bleu", calculate_bleu)
@staticmethod
def bertscore():
return Metric("bertscore", calculate_bertscore)
@staticmethod
def custom(name: str, fn: Callable):
return Metric(name, fn)
class EvaluationSuite:
def __init__(self, metrics: list[Metric]):
self.metrics = metrics
async def evaluate(self, model, test_cases: list[dict]) -> dict:
results = {m.name: [] for m in self.metrics}
for test in test_cases:
prediction = await model.predict(test["input"])
for metric in self.metrics:
score = metric.fn(
prediction=prediction,
reference=test.get("expected"),
context=test.get("context")
)
results[metric.name].append(score)
return {
"metrics": {k: np.mean(v) for k, v in results.items()},
"raw_scores": results
}
# Usage
suite = EvaluationSuite([
Metric.accuracy(),
Metric.bleu(),
Metric.bertscore(),
Metric.custom(name="groundedness", fn=check_groundedness)
Metric.custom("groundedness", check_groundedness)
])
# Prepare test cases
test_cases = [
{
"input": "What is the capital of France?",
"expected": "Paris",
"context": "France is a country in Europe. Paris is its capital."
},
# ... more test cases
]
# Run evaluation
results = suite.evaluate(
model=your_model,
test_cases=test_cases
)
print(f"Overall Accuracy: {results.metrics['accuracy']}")
print(f"BLEU Score: {results.metrics['bleu']}")
results = await suite.evaluate(model=your_model, test_cases=test_cases)
```
## Automated Metrics Implementation
@@ -100,7 +137,7 @@ print(f"BLEU Score: {results.metrics['bleu']}")
```python
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
def calculate_bleu(reference, hypothesis):
def calculate_bleu(reference: str, hypothesis: str, **kwargs) -> float:
"""Calculate BLEU score between reference and hypothesis."""
smoothie = SmoothingFunction().method4
@@ -109,21 +146,18 @@ def calculate_bleu(reference, hypothesis):
hypothesis.split(),
smoothing_function=smoothie
)
# Usage
bleu = calculate_bleu(
reference="The cat sat on the mat",
hypothesis="A cat is sitting on the mat"
)
```
### ROUGE Score
```python
from rouge_score import rouge_scorer
def calculate_rouge(reference, hypothesis):
def calculate_rouge(reference: str, hypothesis: str, **kwargs) -> dict:
"""Calculate ROUGE scores."""
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
scorer = rouge_scorer.RougeScorer(
['rouge1', 'rouge2', 'rougeL'],
use_stemmer=True
)
scores = scorer.score(reference, hypothesis)
return {
@@ -137,8 +171,12 @@ def calculate_rouge(reference, hypothesis):
```python
from bert_score import score
def calculate_bertscore(references, hypotheses):
"""Calculate BERTScore using pre-trained BERT."""
def calculate_bertscore(
references: list[str],
hypotheses: list[str],
**kwargs
) -> dict:
"""Calculate BERTScore using pre-trained model."""
P, R, F1 = score(
hypotheses,
references,
@@ -155,44 +193,72 @@ def calculate_bertscore(references, hypotheses):
### Custom Metrics
```python
def calculate_groundedness(response, context):
def calculate_groundedness(response: str, context: str, **kwargs) -> float:
"""Check if response is grounded in provided context."""
# Use NLI model to check entailment
from transformers import pipeline
nli = pipeline("text-classification", model="microsoft/deberta-large-mnli")
nli = pipeline(
"text-classification",
model="microsoft/deberta-large-mnli"
)
result = nli(f"{context} [SEP] {response}")[0]
# Return confidence that response is entailed by context
return result['score'] if result['label'] == 'ENTAILMENT' else 0.0
def calculate_toxicity(text):
def calculate_toxicity(text: str, **kwargs) -> float:
"""Measure toxicity in generated text."""
from detoxify import Detoxify
results = Detoxify('original').predict(text)
return max(results.values()) # Return highest toxicity score
def calculate_factuality(claim, knowledge_base):
"""Verify factual claims against knowledge base."""
# Implementation depends on your knowledge base
# Could use retrieval + NLI, or fact-checking API
pass
def calculate_factuality(claim: str, sources: list[str], **kwargs) -> float:
"""Verify factual claims against sources."""
from transformers import pipeline
nli = pipeline("text-classification", model="facebook/bart-large-mnli")
scores = []
for source in sources:
result = nli(f"{source}</s></s>{claim}")[0]
if result['label'] == 'entailment':
scores.append(result['score'])
return max(scores) if scores else 0.0
```
## LLM-as-Judge Patterns
### Single Output Evaluation
```python
def llm_judge_quality(response, question):
"""Use GPT-5 to judge response quality."""
prompt = f"""Rate the following response on a scale of 1-10 for:
1. Accuracy (factually correct)
2. Helpfulness (answers the question)
3. Clarity (well-written and understandable)
from anthropic import Anthropic
from pydantic import BaseModel, Field
import json
class QualityRating(BaseModel):
accuracy: int = Field(ge=1, le=10, description="Factual correctness")
helpfulness: int = Field(ge=1, le=10, description="Answers the question")
clarity: int = Field(ge=1, le=10, description="Well-written and understandable")
reasoning: str = Field(description="Brief explanation")
async def llm_judge_quality(
response: str,
question: str,
context: str = None
) -> QualityRating:
"""Use Claude to judge response quality."""
client = Anthropic()
system = """You are an expert evaluator of AI responses.
Rate responses on accuracy, helpfulness, and clarity (1-10 scale).
Provide brief reasoning for your ratings."""
prompt = f"""Rate the following response:
Question: {question}
{f'Context: {context}' if context else ''}
Response: {response}
Provide ratings in JSON format:
@@ -201,23 +267,37 @@ Provide ratings in JSON format:
"helpfulness": <1-10>,
"clarity": <1-10>,
"reasoning": "<brief explanation>"
}}
"""
}}"""
result = openai.ChatCompletion.create(
model="gpt-5",
messages=[{"role": "user", "content": prompt}],
temperature=0
message = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=500,
system=system,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(result.choices[0].message.content)
return QualityRating(**json.loads(message.content[0].text))
```
### Pairwise Comparison
```python
def compare_responses(question, response_a, response_b):
from pydantic import BaseModel, Field
from typing import Literal
class ComparisonResult(BaseModel):
winner: Literal["A", "B", "tie"]
reasoning: str
confidence: int = Field(ge=1, le=10)
async def compare_responses(
question: str,
response_a: str,
response_b: str
) -> ComparisonResult:
"""Compare two responses using LLM judge."""
prompt = f"""Compare these two responses to the question and determine which is better.
client = Anthropic()
prompt = f"""Compare these two responses and determine which is better.
Question: {question}
@@ -225,38 +305,84 @@ Response A: {response_a}
Response B: {response_b}
Which response is better and why? Consider accuracy, helpfulness, and clarity.
Consider accuracy, helpfulness, and clarity.
Answer with JSON:
{{
"winner": "A" or "B" or "tie",
"reasoning": "<explanation>",
"confidence": <1-10>
}}
"""
}}"""
result = openai.ChatCompletion.create(
model="gpt-5",
messages=[{"role": "user", "content": prompt}],
temperature=0
message = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(result.choices[0].message.content)
return ComparisonResult(**json.loads(message.content[0].text))
```
### Reference-Based Evaluation
```python
class ReferenceEvaluation(BaseModel):
semantic_similarity: float = Field(ge=0, le=1)
factual_accuracy: float = Field(ge=0, le=1)
completeness: float = Field(ge=0, le=1)
issues: list[str]
async def evaluate_against_reference(
response: str,
reference: str,
question: str
) -> ReferenceEvaluation:
"""Evaluate response against gold standard reference."""
client = Anthropic()
prompt = f"""Compare the response to the reference answer.
Question: {question}
Reference Answer: {reference}
Response to Evaluate: {response}
Evaluate:
1. Semantic similarity (0-1): How similar is the meaning?
2. Factual accuracy (0-1): Are all facts correct?
3. Completeness (0-1): Does it cover all key points?
4. List any specific issues or errors.
Respond in JSON:
{{
"semantic_similarity": <0-1>,
"factual_accuracy": <0-1>,
"completeness": <0-1>,
"issues": ["issue1", "issue2"]
}}"""
message = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
return ReferenceEvaluation(**json.loads(message.content[0].text))
```
## Human Evaluation Frameworks
### Annotation Guidelines
```python
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AnnotationTask:
"""Structure for human annotation task."""
response: str
question: str
context: Optional[str] = None
def __init__(self, response, question, context=None):
self.response = response
self.question = question
self.context = context
def get_annotation_form(self):
def get_annotation_form(self) -> dict:
return {
"question": self.question,
"context": self.context,
@@ -289,22 +415,29 @@ class AnnotationTask:
```python
from sklearn.metrics import cohen_kappa_score
def calculate_agreement(rater1_scores, rater2_scores):
def calculate_agreement(
rater1_scores: list[int],
rater2_scores: list[int]
) -> dict:
"""Calculate inter-rater agreement."""
kappa = cohen_kappa_score(rater1_scores, rater2_scores)
interpretation = {
kappa < 0: "Poor",
kappa < 0.2: "Slight",
kappa < 0.4: "Fair",
kappa < 0.6: "Moderate",
kappa < 0.8: "Substantial",
kappa <= 1.0: "Almost Perfect"
}
if kappa < 0:
interpretation = "Poor"
elif kappa < 0.2:
interpretation = "Slight"
elif kappa < 0.4:
interpretation = "Fair"
elif kappa < 0.6:
interpretation = "Moderate"
elif kappa < 0.8:
interpretation = "Substantial"
else:
interpretation = "Almost Perfect"
return {
"kappa": kappa,
"interpretation": interpretation[True]
"interpretation": interpretation
}
```
@@ -314,23 +447,26 @@ def calculate_agreement(rater1_scores, rater2_scores):
```python
from scipy import stats
import numpy as np
from dataclasses import dataclass, field
@dataclass
class ABTest:
def __init__(self, variant_a_name="A", variant_b_name="B"):
self.variant_a = {"name": variant_a_name, "scores": []}
self.variant_b = {"name": variant_b_name, "scores": []}
variant_a_name: str = "A"
variant_b_name: str = "B"
variant_a_scores: list[float] = field(default_factory=list)
variant_b_scores: list[float] = field(default_factory=list)
def add_result(self, variant, score):
def add_result(self, variant: str, score: float):
"""Add evaluation result for a variant."""
if variant == "A":
self.variant_a["scores"].append(score)
self.variant_a_scores.append(score)
else:
self.variant_b["scores"].append(score)
self.variant_b_scores.append(score)
def analyze(self, alpha=0.05):
def analyze(self, alpha: float = 0.05) -> dict:
"""Perform statistical analysis."""
a_scores = self.variant_a["scores"]
b_scores = self.variant_b["scores"]
a_scores = np.array(self.variant_a_scores)
b_scores = np.array(self.variant_b_scores)
# T-test
t_stat, p_value = stats.ttest_ind(a_scores, b_scores)
@@ -347,12 +483,12 @@ class ABTest:
"p_value": p_value,
"statistically_significant": p_value < alpha,
"cohens_d": cohens_d,
"effect_size": self.interpret_cohens_d(cohens_d),
"winner": "B" if np.mean(b_scores) > np.mean(a_scores) else "A"
"effect_size": self._interpret_cohens_d(cohens_d),
"winner": self.variant_b_name if np.mean(b_scores) > np.mean(a_scores) else self.variant_a_name
}
@staticmethod
def interpret_cohens_d(d):
def _interpret_cohens_d(d: float) -> str:
"""Interpret Cohen's d effect size."""
abs_d = abs(d)
if abs_d < 0.2:
@@ -369,12 +505,22 @@ class ABTest:
### Regression Detection
```python
from dataclasses import dataclass
@dataclass
class RegressionResult:
metric: str
baseline: float
current: float
change: float
is_regression: bool
class RegressionDetector:
def __init__(self, baseline_results, threshold=0.05):
def __init__(self, baseline_results: dict, threshold: float = 0.05):
self.baseline = baseline_results
self.threshold = threshold
def check_for_regression(self, new_results):
def check_for_regression(self, new_results: dict) -> dict:
"""Detect if new results show regression."""
regressions = []
@@ -389,39 +535,97 @@ class RegressionDetector:
relative_change = (new_score - baseline_score) / baseline_score
# Flag if significant decrease
if relative_change < -self.threshold:
regressions.append({
"metric": metric,
"baseline": baseline_score,
"current": new_score,
"change": relative_change
})
is_regression = relative_change < -self.threshold
if is_regression:
regressions.append(RegressionResult(
metric=metric,
baseline=baseline_score,
current=new_score,
change=relative_change,
is_regression=True
))
return {
"has_regression": len(regressions) > 0,
"regressions": regressions
"regressions": regressions,
"summary": f"{len(regressions)} metric(s) regressed"
}
```
## LangSmith Evaluation Integration
```python
from langsmith import Client
from langsmith.evaluation import evaluate, LangChainStringEvaluator
# Initialize LangSmith client
client = Client()
# Create dataset
dataset = client.create_dataset("qa_test_cases")
client.create_examples(
inputs=[{"question": q} for q in questions],
outputs=[{"answer": a} for a in expected_answers],
dataset_id=dataset.id
)
# Define evaluators
evaluators = [
LangChainStringEvaluator("qa"), # QA correctness
LangChainStringEvaluator("context_qa"), # Context-grounded QA
LangChainStringEvaluator("cot_qa"), # Chain-of-thought QA
]
# Run evaluation
async def target_function(inputs: dict) -> dict:
result = await your_chain.ainvoke(inputs)
return {"answer": result}
experiment_results = await evaluate(
target_function,
data=dataset.name,
evaluators=evaluators,
experiment_prefix="v1.0.0",
metadata={"model": "claude-sonnet-4-5", "version": "1.0.0"}
)
print(f"Mean score: {experiment_results.aggregate_metrics['qa']['mean']}")
```
## Benchmarking
### Running Benchmarks
```python
from dataclasses import dataclass
import numpy as np
@dataclass
class BenchmarkResult:
metric: str
mean: float
std: float
min: float
max: float
class BenchmarkRunner:
def __init__(self, benchmark_dataset):
def __init__(self, benchmark_dataset: list[dict]):
self.dataset = benchmark_dataset
def run_benchmark(self, model, metrics):
async def run_benchmark(
self,
model,
metrics: list[Metric]
) -> dict[str, BenchmarkResult]:
"""Run model on benchmark and calculate metrics."""
results = {metric.name: [] for metric in metrics}
for example in self.dataset:
# Generate prediction
prediction = model.predict(example["input"])
prediction = await model.predict(example["input"])
# Calculate each metric
for metric in metrics:
score = metric.calculate(
score = metric.fn(
prediction=prediction,
reference=example["reference"],
context=example.get("context")
@@ -430,26 +634,24 @@ class BenchmarkRunner:
# Aggregate results
return {
metric: {
"mean": np.mean(scores),
"std": np.std(scores),
"min": min(scores),
"max": max(scores)
}
metric: BenchmarkResult(
metric=metric,
mean=np.mean(scores),
std=np.std(scores),
min=min(scores),
max=max(scores)
)
for metric, scores in results.items()
}
```
## Resources
- **references/metrics.md**: Comprehensive metric guide
- **references/human-evaluation.md**: Annotation best practices
- **references/benchmarking.md**: Standard benchmarks
- **references/a-b-testing.md**: Statistical testing guide
- **references/regression-testing.md**: CI/CD integration
- **assets/evaluation-framework.py**: Complete evaluation harness
- **assets/benchmark-dataset.jsonl**: Example datasets
- **scripts/evaluate-model.py**: Automated evaluation runner
- [LangSmith Evaluation Guide](https://docs.smith.langchain.com/evaluation)
- [RAGAS Framework](https://docs.ragas.io/)
- [DeepEval Library](https://docs.deepeval.com/)
- [Arize Phoenix](https://docs.arize.com/phoenix/)
- [HELM Benchmark](https://crfm.stanford.edu/helm/)
## Best Practices
@@ -469,3 +671,5 @@ class BenchmarkRunner:
- **Data Contamination**: Testing on training data
- **Ignoring Variance**: Not accounting for statistical uncertainty
- **Metric Mismatch**: Using metrics not aligned with business goals
- **Position Bias**: In pairwise evals, randomize order
- **Overfitting Prompts**: Optimizing for test set instead of real use

View File

@@ -16,6 +16,7 @@ Master advanced prompt engineering techniques to maximize LLM performance, relia
- Creating reusable prompt templates with variable interpolation
- Debugging and refining prompts that produce inconsistent outputs
- Implementing system prompts for specialized AI assistants
- Using structured outputs (JSON mode) for reliable parsing
## Core Capabilities
@@ -33,21 +34,27 @@ Master advanced prompt engineering techniques to maximize LLM performance, relia
- Self-consistency techniques (sampling multiple reasoning paths)
- Verification and validation steps
### 3. Prompt Optimization
### 3. Structured Outputs
- JSON mode for reliable parsing
- Pydantic schema enforcement
- Type-safe response handling
- Error handling for malformed outputs
### 4. Prompt Optimization
- Iterative refinement workflows
- A/B testing prompt variations
- Measuring prompt performance metrics (accuracy, consistency, latency)
- Reducing token usage while maintaining quality
- Handling edge cases and failure modes
### 4. Template Systems
### 5. Template Systems
- Variable interpolation and formatting
- Conditional prompt sections
- Multi-turn conversation templates
- Role-based prompt composition
- Modular prompt components
### 5. System Prompt Design
### 6. System Prompt Design
- Setting model behavior and constraints
- Defining output formats and structure
- Establishing role and expertise
@@ -57,68 +64,385 @@ Master advanced prompt engineering techniques to maximize LLM performance, relia
## Quick Start
```python
from prompt_optimizer import PromptTemplate, FewShotSelector
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
# Define a structured prompt template
template = PromptTemplate(
system="You are an expert SQL developer. Generate efficient, secure SQL queries.",
instruction="Convert the following natural language query to SQL:\n{query}",
few_shot_examples=True,
output_format="SQL code block with explanatory comments"
)
# Define structured output schema
class SQLQuery(BaseModel):
query: str = Field(description="The SQL query")
explanation: str = Field(description="Brief explanation of what the query does")
tables_used: list[str] = Field(description="List of tables referenced")
# Configure few-shot learning
selector = FewShotSelector(
examples_db="sql_examples.jsonl",
selection_strategy="semantic_similarity",
max_examples=3
)
# Initialize model with structured output
llm = ChatAnthropic(model="claude-sonnet-4-5")
structured_llm = llm.with_structured_output(SQLQuery)
# Generate optimized prompt
prompt = template.render(
query="Find all users who registered in the last 30 days",
examples=selector.select(query="user registration date filter")
)
# Create prompt template
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert SQL developer. Generate efficient, secure SQL queries.
Always use parameterized queries to prevent SQL injection.
Explain your reasoning briefly."""),
("user", "Convert this to SQL: {query}")
])
# Create chain
chain = prompt | structured_llm
# Use
result = await chain.ainvoke({
"query": "Find all users who registered in the last 30 days"
})
print(result.query)
print(result.explanation)
```
## Key Patterns
### Progressive Disclosure
### Pattern 1: Structured Output with Pydantic
```python
from anthropic import Anthropic
from pydantic import BaseModel, Field
from typing import Literal
import json
class SentimentAnalysis(BaseModel):
sentiment: Literal["positive", "negative", "neutral"]
confidence: float = Field(ge=0, le=1)
key_phrases: list[str]
reasoning: str
async def analyze_sentiment(text: str) -> SentimentAnalysis:
"""Analyze sentiment with structured output."""
client = Anthropic()
message = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Analyze the sentiment of this text.
Text: {text}
Respond with JSON matching this schema:
{{
"sentiment": "positive" | "negative" | "neutral",
"confidence": 0.0-1.0,
"key_phrases": ["phrase1", "phrase2"],
"reasoning": "brief explanation"
}}"""
}]
)
return SentimentAnalysis(**json.loads(message.content[0].text))
```
### Pattern 2: Chain-of-Thought with Self-Verification
```python
from langchain_core.prompts import ChatPromptTemplate
cot_prompt = ChatPromptTemplate.from_template("""
Solve this problem step by step.
Problem: {problem}
Instructions:
1. Break down the problem into clear steps
2. Work through each step showing your reasoning
3. State your final answer
4. Verify your answer by checking it against the original problem
Format your response as:
## Steps
[Your step-by-step reasoning]
## Answer
[Your final answer]
## Verification
[Check that your answer is correct]
""")
```
### Pattern 3: Few-Shot with Dynamic Example Selection
```python
from langchain_voyageai import VoyageAIEmbeddings
from langchain_core.example_selectors import SemanticSimilarityExampleSelector
from langchain_chroma import Chroma
# Create example selector with semantic similarity
example_selector = SemanticSimilarityExampleSelector.from_examples(
examples=[
{"input": "How do I reset my password?", "output": "Go to Settings > Security > Reset Password"},
{"input": "Where can I see my order history?", "output": "Navigate to Account > Orders"},
{"input": "How do I contact support?", "output": "Click Help > Contact Us or email support@example.com"},
],
embeddings=VoyageAIEmbeddings(model="voyage-3-large"),
vectorstore_cls=Chroma,
k=2 # Select 2 most similar examples
)
async def get_few_shot_prompt(query: str) -> str:
"""Build prompt with dynamically selected examples."""
examples = await example_selector.aselect_examples({"input": query})
examples_text = "\n".join(
f"User: {ex['input']}\nAssistant: {ex['output']}"
for ex in examples
)
return f"""You are a helpful customer support assistant.
Here are some example interactions:
{examples_text}
Now respond to this query:
User: {query}
Assistant:"""
```
### Pattern 4: Progressive Disclosure
Start with simple prompts, add complexity only when needed:
1. **Level 1**: Direct instruction
- "Summarize this article"
```python
PROMPT_LEVELS = {
# Level 1: Direct instruction
"simple": "Summarize this article: {text}",
2. **Level 2**: Add constraints
- "Summarize this article in 3 bullet points, focusing on key findings"
# Level 2: Add constraints
"constrained": """Summarize this article in 3 bullet points, focusing on:
- Key findings
- Main conclusions
- Practical implications
3. **Level 3**: Add reasoning
- "Read this article, identify the main findings, then summarize in 3 bullet points"
Article: {text}""",
4. **Level 4**: Add examples
- Include 2-3 example summaries with input-output pairs
# Level 3: Add reasoning
"reasoning": """Read this article carefully.
1. First, identify the main topic and thesis
2. Then, extract the key supporting points
3. Finally, summarize in 3 bullet points
### Instruction Hierarchy
```
[System Context] → [Task Instruction] → [Examples] → [Input Data] → [Output Format]
Article: {text}
Summary:""",
# Level 4: Add examples
"few_shot": """Read articles and provide concise summaries.
Example:
Article: "New research shows that regular exercise can reduce anxiety by up to 40%..."
Summary:
• Regular exercise reduces anxiety by up to 40%
• 30 minutes of moderate activity 3x/week is sufficient
• Benefits appear within 2 weeks of starting
Now summarize this article:
Article: {text}
Summary:"""
}
```
### Error Recovery
Build prompts that gracefully handle failures:
- Include fallback instructions
- Request confidence scores
- Ask for alternative interpretations when uncertain
- Specify how to indicate missing information
### Pattern 5: Error Recovery and Fallback
```python
from pydantic import BaseModel, ValidationError
import json
class ResponseWithConfidence(BaseModel):
answer: str
confidence: float
sources: list[str]
alternative_interpretations: list[str] = []
ERROR_RECOVERY_PROMPT = """
Answer the question based on the context provided.
Context: {context}
Question: {question}
Instructions:
1. If you can answer confidently (>0.8), provide a direct answer
2. If you're somewhat confident (0.5-0.8), provide your best answer with caveats
3. If you're uncertain (<0.5), explain what information is missing
4. Always provide alternative interpretations if the question is ambiguous
Respond in JSON:
{{
"answer": "your answer or 'I cannot determine this from the context'",
"confidence": 0.0-1.0,
"sources": ["relevant context excerpts"],
"alternative_interpretations": ["if question is ambiguous"]
}}
"""
async def answer_with_fallback(
context: str,
question: str,
llm
) -> ResponseWithConfidence:
"""Answer with error recovery and fallback."""
prompt = ERROR_RECOVERY_PROMPT.format(context=context, question=question)
try:
response = await llm.ainvoke(prompt)
return ResponseWithConfidence(**json.loads(response.content))
except (json.JSONDecodeError, ValidationError) as e:
# Fallback: try to extract answer without structure
simple_prompt = f"Based on: {context}\n\nAnswer: {question}"
simple_response = await llm.ainvoke(simple_prompt)
return ResponseWithConfidence(
answer=simple_response.content,
confidence=0.5,
sources=["fallback extraction"],
alternative_interpretations=[]
)
```
### Pattern 6: Role-Based System Prompts
```python
SYSTEM_PROMPTS = {
"analyst": """You are a senior data analyst with expertise in SQL, Python, and business intelligence.
Your responsibilities:
- Write efficient, well-documented queries
- Explain your analysis methodology
- Highlight key insights and recommendations
- Flag any data quality concerns
Communication style:
- Be precise and technical when discussing methodology
- Translate technical findings into business impact
- Use clear visualizations when helpful""",
"assistant": """You are a helpful AI assistant focused on accuracy and clarity.
Core principles:
- Always cite sources when making factual claims
- Acknowledge uncertainty rather than guessing
- Ask clarifying questions when the request is ambiguous
- Provide step-by-step explanations for complex topics
Constraints:
- Do not provide medical, legal, or financial advice
- Redirect harmful requests appropriately
- Protect user privacy""",
"code_reviewer": """You are a senior software engineer conducting code reviews.
Review criteria:
- Correctness: Does the code work as intended?
- Security: Are there any vulnerabilities?
- Performance: Are there efficiency concerns?
- Maintainability: Is the code readable and well-structured?
- Best practices: Does it follow language idioms?
Output format:
1. Summary assessment (approve/request changes)
2. Critical issues (must fix)
3. Suggestions (nice to have)
4. Positive feedback (what's done well)"""
}
```
## Integration Patterns
### With RAG Systems
```python
RAG_PROMPT = """You are a knowledgeable assistant that answers questions based on provided context.
Context (retrieved from knowledge base):
{context}
Instructions:
1. Answer ONLY based on the provided context
2. If the context doesn't contain the answer, say "I don't have information about that in my knowledge base"
3. Cite specific passages using [1], [2] notation
4. If the question is ambiguous, ask for clarification
Question: {question}
Answer:"""
```
### With Validation and Verification
```python
VALIDATED_PROMPT = """Complete the following task:
Task: {task}
After generating your response, verify it meets ALL these criteria:
✓ Directly addresses the original request
✓ Contains no factual errors
✓ Is appropriately detailed (not too brief, not too verbose)
✓ Uses proper formatting
✓ Is safe and appropriate
If verification fails on any criterion, revise before responding.
Response:"""
```
## Performance Optimization
### Token Efficiency
```python
# Before: Verbose prompt (150+ tokens)
verbose_prompt = """
I would like you to please take the following text and provide me with a comprehensive
summary of the main points. The summary should capture the key ideas and important details
while being concise and easy to understand.
"""
# After: Concise prompt (30 tokens)
concise_prompt = """Summarize the key points concisely:
{text}
Summary:"""
```
### Caching Common Prefixes
```python
from anthropic import Anthropic
client = Anthropic()
# Use prompt caching for repeated system prompts
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1000,
system=[
{
"type": "text",
"text": LONG_SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"}
}
],
messages=[{"role": "user", "content": user_query}]
)
```
## Best Practices
1. **Be Specific**: Vague prompts produce inconsistent results
2. **Show, Don't Tell**: Examples are more effective than descriptions
3. **Test Extensively**: Evaluate on diverse, representative inputs
4. **Iterate Rapidly**: Small changes can have large impacts
5. **Monitor Performance**: Track metrics in production
6. **Version Control**: Treat prompts as code with proper versioning
7. **Document Intent**: Explain why prompts are structured as they are
3. **Use Structured Outputs**: Enforce schemas with Pydantic for reliability
4. **Test Extensively**: Evaluate on diverse, representative inputs
5. **Iterate Rapidly**: Small changes can have large impacts
6. **Monitor Performance**: Track metrics in production
7. **Version Control**: Treat prompts as code with proper versioning
8. **Document Intent**: Explain why prompts are structured as they are
## Common Pitfalls
@@ -127,60 +451,8 @@ Build prompts that gracefully handle failures:
- **Context overflow**: Exceeding token limits with excessive examples
- **Ambiguous instructions**: Leaving room for multiple interpretations
- **Ignoring edge cases**: Not testing on unusual or boundary inputs
## Integration Patterns
### With RAG Systems
```python
# Combine retrieved context with prompt engineering
prompt = f"""Given the following context:
{retrieved_context}
{few_shot_examples}
Question: {user_question}
Provide a detailed answer based solely on the context above. If the context doesn't contain enough information, explicitly state what's missing."""
```
### With Validation
```python
# Add self-verification step
prompt = f"""{main_task_prompt}
After generating your response, verify it meets these criteria:
1. Answers the question directly
2. Uses only information from provided context
3. Cites specific sources
4. Acknowledges any uncertainty
If verification fails, revise your response."""
```
## Performance Optimization
### Token Efficiency
- Remove redundant words and phrases
- Use abbreviations consistently after first definition
- Consolidate similar instructions
- Move stable content to system prompts
### Latency Reduction
- Minimize prompt length without sacrificing quality
- Use streaming for long-form outputs
- Cache common prompt prefixes
- Batch similar requests when possible
## Resources
- **references/few-shot-learning.md**: Deep dive on example selection and construction
- **references/chain-of-thought.md**: Advanced reasoning elicitation techniques
- **references/prompt-optimization.md**: Systematic refinement workflows
- **references/prompt-templates.md**: Reusable template patterns
- **references/system-prompts.md**: System-level prompt design
- **assets/prompt-template-library.md**: Battle-tested prompt templates
- **assets/few-shot-examples.json**: Curated example datasets
- **scripts/optimize-prompt.py**: Automated prompt optimization tool
- **No error handling**: Assuming outputs will always be well-formed
- **Hardcoded values**: Not parameterizing prompts for reuse
## Success Metrics
@@ -189,13 +461,12 @@ Track these KPIs for your prompts:
- **Consistency**: Reproducibility across similar inputs
- **Latency**: Response time (P50, P95, P99)
- **Token Usage**: Average tokens per request
- **Success Rate**: Percentage of valid outputs
- **Success Rate**: Percentage of valid, parseable outputs
- **User Satisfaction**: Ratings and feedback
## Next Steps
## Resources
1. Review the prompt template library for common patterns
2. Experiment with few-shot learning for your specific use case
3. Implement prompt versioning and A/B testing
4. Set up automated evaluation pipelines
5. Document your prompt engineering decisions and learnings
- [Anthropic Prompt Engineering Guide](https://docs.anthropic.com/en/docs/build-with-claude/prompt-engineering)
- [Claude Prompt Caching](https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching)
- [OpenAI Prompt Engineering](https://platform.openai.com/docs/guides/prompt-engineering)
- [LangChain Prompts](https://python.langchain.com/docs/concepts/prompts/)

View File

@@ -23,187 +23,276 @@ Master Retrieval-Augmented Generation (RAG) to build LLM applications that provi
**Purpose**: Store and retrieve document embeddings efficiently
**Options:**
- **Pinecone**: Managed, scalable, fast queries
- **Weaviate**: Open-source, hybrid search
- **Pinecone**: Managed, scalable, serverless
- **Weaviate**: Open-source, hybrid search, GraphQL
- **Milvus**: High performance, on-premise
- **Chroma**: Lightweight, easy to use
- **Qdrant**: Fast, filtered search
- **FAISS**: Meta's library, local deployment
- **Chroma**: Lightweight, easy to use, local development
- **Qdrant**: Fast, filtered search, Rust-based
- **pgvector**: PostgreSQL extension, SQL integration
### 2. Embeddings
**Purpose**: Convert text to numerical vectors for similarity search
**Models:**
- **text-embedding-ada-002** (OpenAI): General purpose, 1536 dims
- **all-MiniLM-L6-v2** (Sentence Transformers): Fast, lightweight
- **e5-large-v2**: High quality, multilingual
- **Instructor**: Task-specific instructions
- **bge-large-en-v1.5**: SOTA performance
**Models (2026):**
| Model | Dimensions | Best For |
|-------|------------|----------|
| **voyage-3-large** | 1024 | Claude apps (Anthropic recommended) |
| **voyage-code-3** | 1024 | Code search |
| **text-embedding-3-large** | 3072 | OpenAI apps, high accuracy |
| **text-embedding-3-small** | 1536 | OpenAI apps, cost-effective |
| **bge-large-en-v1.5** | 1024 | Open source, local deployment |
| **multilingual-e5-large** | 1024 | Multi-language support |
### 3. Retrieval Strategies
**Approaches:**
- **Dense Retrieval**: Semantic similarity via embeddings
- **Sparse Retrieval**: Keyword matching (BM25, TF-IDF)
- **Hybrid Search**: Combine dense + sparse
- **Hybrid Search**: Combine dense + sparse with weighted fusion
- **Multi-Query**: Generate multiple query variations
- **HyDE**: Generate hypothetical documents
- **HyDE**: Generate hypothetical documents for better retrieval
### 4. Reranking
**Purpose**: Improve retrieval quality by reordering results
**Methods:**
- **Cross-Encoders**: BERT-based reranking
- **Cross-Encoders**: BERT-based reranking (ms-marco-MiniLM)
- **Cohere Rerank**: API-based reranking
- **Maximal Marginal Relevance (MMR)**: Diversity + relevance
- **LLM-based**: Use LLM to score relevance
## Quick Start
## Quick Start with LangGraph
```python
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitters import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_voyageai import VoyageAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing import TypedDict, Annotated
# 1. Load documents
loader = DirectoryLoader('./docs', glob="**/*.txt")
documents = loader.load()
class RAGState(TypedDict):
question: str
context: list[Document]
answer: str
# 2. Split into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
chunks = text_splitter.split_documents(documents)
# Initialize components
llm = ChatAnthropic(model="claude-sonnet-4-5")
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
vectorstore = PineconeVectorStore(index_name="docs", embedding=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# 3. Create embeddings and vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)
# RAG prompt
rag_prompt = ChatPromptTemplate.from_template(
"""Answer based on the context below. If you cannot answer, say so.
# 4. Create retrieval chain
qa_chain = RetrievalQA.from_chain_type(
llm=OpenAI(),
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
return_source_documents=True
Context:
{context}
Question: {question}
Answer:"""
)
# 5. Query
result = qa_chain({"query": "What are the main features?"})
print(result['result'])
print(result['source_documents'])
async def retrieve(state: RAGState) -> RAGState:
"""Retrieve relevant documents."""
docs = await retriever.ainvoke(state["question"])
return {"context": docs}
async def generate(state: RAGState) -> RAGState:
"""Generate answer from context."""
context_text = "\n\n".join(doc.page_content for doc in state["context"])
messages = rag_prompt.format_messages(
context=context_text,
question=state["question"]
)
response = await llm.ainvoke(messages)
return {"answer": response.content}
# Build RAG graph
builder = StateGraph(RAGState)
builder.add_node("retrieve", retrieve)
builder.add_node("generate", generate)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", END)
rag_chain = builder.compile()
# Use
result = await rag_chain.ainvoke({"question": "What are the main features?"})
print(result["answer"])
```
## Advanced RAG Patterns
### Pattern 1: Hybrid Search
### Pattern 1: Hybrid Search with RRF
```python
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
# Sparse retriever (BM25)
bm25_retriever = BM25Retriever.from_documents(chunks)
bm25_retriever.k = 5
# Sparse retriever (BM25 for keyword matching)
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 10
# Dense retriever (embeddings)
embedding_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# Dense retriever (embeddings for semantic search)
dense_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
# Combine with weights
# Combine with Reciprocal Rank Fusion weights
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, embedding_retriever],
weights=[0.3, 0.7]
retrievers=[bm25_retriever, dense_retriever],
weights=[0.3, 0.7] # 30% keyword, 70% semantic
)
```
### Pattern 2: Multi-Query Retrieval
```python
from langchain.retrievers.multi_query import MultiQueryRetriever
# Generate multiple query perspectives
retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=OpenAI()
# Generate multiple query perspectives for better recall
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
llm=llm
)
# Single query → multiple variations → combined results
results = retriever.get_relevant_documents("What is the main topic?")
results = await multi_query_retriever.ainvoke("What is the main topic?")
```
### Pattern 3: Contextual Compression
```python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# Compressor extracts only relevant portions
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=vectorstore.as_retriever()
base_retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
)
# Returns only relevant parts of documents
compressed_docs = compression_retriever.get_relevant_documents("query")
compressed_docs = await compression_retriever.ainvoke("specific query")
```
### Pattern 4: Parent Document Retriever
```python
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Small chunks for precise retrieval, large chunks for context
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
# Store for parent documents
store = InMemoryStore()
docstore = InMemoryStore()
# Small chunks for retrieval, large chunks for context
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
retriever = ParentDocumentRetriever(
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
docstore=docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter
)
# Add documents (splits children, stores parents)
await parent_retriever.aadd_documents(documents)
# Retrieval returns parent documents with full context
results = await parent_retriever.ainvoke("query")
```
### Pattern 5: HyDE (Hypothetical Document Embeddings)
```python
from langchain_core.prompts import ChatPromptTemplate
class HyDEState(TypedDict):
question: str
hypothetical_doc: str
context: list[Document]
answer: str
hyde_prompt = ChatPromptTemplate.from_template(
"""Write a detailed passage that would answer this question:
Question: {question}
Passage:"""
)
async def generate_hypothetical(state: HyDEState) -> HyDEState:
"""Generate hypothetical document for better retrieval."""
messages = hyde_prompt.format_messages(question=state["question"])
response = await llm.ainvoke(messages)
return {"hypothetical_doc": response.content}
async def retrieve_with_hyde(state: HyDEState) -> HyDEState:
"""Retrieve using hypothetical document."""
# Use hypothetical doc for retrieval instead of original query
docs = await retriever.ainvoke(state["hypothetical_doc"])
return {"context": docs}
# Build HyDE RAG graph
builder = StateGraph(HyDEState)
builder.add_node("hypothetical", generate_hypothetical)
builder.add_node("retrieve", retrieve_with_hyde)
builder.add_node("generate", generate)
builder.add_edge(START, "hypothetical")
builder.add_edge("hypothetical", "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", END)
hyde_rag = builder.compile()
```
## Document Chunking Strategies
### Recursive Character Text Splitter
```python
from langchain.text_splitters import RecursiveCharacterTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""] # Try these in order
separators=["\n\n", "\n", ". ", " ", ""] # Try in order
)
chunks = splitter.split_documents(documents)
```
### Token-Based Splitting
```python
from langchain.text_splitters import TokenTextSplitter
from langchain_text_splitters import TokenTextSplitter
splitter = TokenTextSplitter(
chunk_size=512,
chunk_overlap=50
chunk_overlap=50,
encoding_name="cl100k_base" # OpenAI tiktoken encoding
)
```
### Semantic Chunking
```python
from langchain.text_splitters import SemanticChunker
from langchain_experimental.text_splitter import SemanticChunker
splitter = SemanticChunker(
embeddings=OpenAIEmbeddings(),
breakpoint_threshold_type="percentile"
embeddings=embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=95
)
```
### Markdown Header Splitter
```python
from langchain.text_splitters import MarkdownHeaderTextSplitter
from langchain_text_splitters import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "Header 1"),
@@ -211,36 +300,54 @@ headers_to_split_on = [
("###", "Header 3"),
]
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on,
strip_headers=False
)
```
## Vector Store Configurations
### Pinecone
### Pinecone (Serverless)
```python
import pinecone
from langchain.vectorstores import Pinecone
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
pinecone.init(api_key="your-api-key", environment="us-west1-gcp")
# Initialize Pinecone client
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index = pinecone.Index("your-index-name")
# Create index if needed
if "my-index" not in pc.list_indexes().names():
pc.create_index(
name="my-index",
dimension=1024, # voyage-3-large dimensions
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
vectorstore = Pinecone(index, embeddings.embed_query, "text")
# Create vector store
index = pc.Index("my-index")
vectorstore = PineconeVectorStore(index=index, embedding=embeddings)
```
### Weaviate
```python
import weaviate
from langchain.vectorstores import Weaviate
from langchain_weaviate import WeaviateVectorStore
client = weaviate.Client("http://localhost:8080")
client = weaviate.connect_to_local() # or connect_to_weaviate_cloud()
vectorstore = Weaviate(client, "Document", "content", embeddings)
vectorstore = WeaviateVectorStore(
client=client,
index_name="Documents",
text_key="content",
embedding=embeddings
)
```
### Chroma (Local)
### Chroma (Local Development)
```python
from langchain.vectorstores import Chroma
from langchain_chroma import Chroma
vectorstore = Chroma(
collection_name="my_collection",
@@ -249,32 +356,47 @@ vectorstore = Chroma(
)
```
### pgvector (PostgreSQL)
```python
from langchain_postgres.vectorstores import PGVector
connection_string = "postgresql+psycopg://user:pass@localhost:5432/vectordb"
vectorstore = PGVector(
embeddings=embeddings,
collection_name="documents",
connection=connection_string,
)
```
## Retrieval Optimization
### 1. Metadata Filtering
```python
from langchain_core.documents import Document
# Add metadata during indexing
chunks_with_metadata = []
for i, chunk in enumerate(chunks):
chunk.metadata = {
"source": chunk.metadata.get("source"),
"page": i,
"category": determine_category(chunk.page_content)
}
chunks_with_metadata.append(chunk)
docs_with_metadata = []
for doc in documents:
doc.metadata.update({
"source": doc.metadata.get("source", "unknown"),
"category": determine_category(doc.page_content),
"date": datetime.now().isoformat()
})
docs_with_metadata.append(doc)
# Filter during retrieval
results = vectorstore.similarity_search(
results = await vectorstore.asimilarity_search(
"query",
filter={"category": "technical"},
k=5
)
```
### 2. Maximal Marginal Relevance
### 2. Maximal Marginal Relevance (MMR)
```python
# Balance relevance with diversity
results = vectorstore.max_marginal_relevance_search(
results = await vectorstore.amax_marginal_relevance_search(
"query",
k=5,
fetch_k=20, # Fetch 20, return top 5 diverse
@@ -288,116 +410,140 @@ from sentence_transformers import CrossEncoder
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
# Get initial results
candidates = vectorstore.similarity_search("query", k=20)
async def retrieve_and_rerank(query: str, k: int = 5) -> list[Document]:
# Get initial results
candidates = await vectorstore.asimilarity_search(query, k=20)
# Rerank
pairs = [[query, doc.page_content] for doc in candidates]
scores = reranker.predict(pairs)
# Rerank
pairs = [[query, doc.page_content] for doc in candidates]
scores = reranker.predict(pairs)
# Sort by score and take top k
reranked = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)[:5]
# Sort by score and take top k
ranked = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)
return [doc for doc, score in ranked[:k]]
```
### 4. Cohere Rerank
```python
from langchain.retrievers import CohereRerank
from langchain_cohere import CohereRerank
reranker = CohereRerank(model="rerank-english-v3.0", top_n=5)
# Wrap retriever with reranking
reranked_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=vectorstore.as_retriever(search_kwargs={"k": 20})
)
```
## Prompt Engineering for RAG
### Contextual Prompt
### Contextual Prompt with Citations
```python
prompt_template = """Use the following context to answer the question. If you cannot answer based on the context, say "I don't have enough information."
rag_prompt = ChatPromptTemplate.from_template(
"""Answer the question based on the context below. Include citations using [1], [2], etc.
Context:
{context}
If you cannot answer based on the context, say "I don't have enough information."
Question: {question}
Context:
{context}
Answer:"""
Question: {question}
Instructions:
1. Use only information from the context
2. Cite sources with [1], [2] format
3. If uncertain, express uncertainty
Answer (with citations):"""
)
```
### With Citations
### Structured Output for RAG
```python
prompt_template = """Answer the question based on the context below. Include citations using [1], [2], etc.
from pydantic import BaseModel, Field
Context:
{context}
class RAGResponse(BaseModel):
answer: str = Field(description="The answer based on context")
confidence: float = Field(description="Confidence score 0-1")
sources: list[str] = Field(description="Source document IDs used")
reasoning: str = Field(description="Brief reasoning for the answer")
Question: {question}
Answer (with citations):"""
```
### With Confidence
```python
prompt_template = """Answer the question using the context. Provide a confidence score (0-100%) for your answer.
Context:
{context}
Question: {question}
Answer:
Confidence:"""
# Use with structured output
structured_llm = llm.with_structured_output(RAGResponse)
```
## Evaluation Metrics
```python
def evaluate_rag_system(qa_chain, test_cases):
metrics = {
'accuracy': [],
'retrieval_quality': [],
'groundedness': []
}
from typing import TypedDict
class RAGEvalMetrics(TypedDict):
retrieval_precision: float # Relevant docs / retrieved docs
retrieval_recall: float # Retrieved relevant / total relevant
answer_relevance: float # Answer addresses question
faithfulness: float # Answer grounded in context
context_relevance: float # Context relevant to question
async def evaluate_rag_system(
rag_chain,
test_cases: list[dict]
) -> RAGEvalMetrics:
"""Evaluate RAG system on test cases."""
metrics = {k: [] for k in RAGEvalMetrics.__annotations__}
for test in test_cases:
result = qa_chain({"query": test['question']})
result = await rag_chain.ainvoke({"question": test["question"]})
# Check if answer matches expected
accuracy = calculate_accuracy(result['result'], test['expected'])
metrics['accuracy'].append(accuracy)
# Retrieval metrics
retrieved_ids = {doc.metadata["id"] for doc in result["context"]}
relevant_ids = set(test["relevant_doc_ids"])
# Check if relevant docs were retrieved
retrieval_quality = evaluate_retrieved_docs(
result['source_documents'],
test['relevant_docs']
precision = len(retrieved_ids & relevant_ids) / len(retrieved_ids)
recall = len(retrieved_ids & relevant_ids) / len(relevant_ids)
metrics["retrieval_precision"].append(precision)
metrics["retrieval_recall"].append(recall)
# Use LLM-as-judge for quality metrics
quality = await evaluate_answer_quality(
question=test["question"],
answer=result["answer"],
context=result["context"],
expected=test.get("expected_answer")
)
metrics['retrieval_quality'].append(retrieval_quality)
metrics["answer_relevance"].append(quality["relevance"])
metrics["faithfulness"].append(quality["faithfulness"])
metrics["context_relevance"].append(quality["context_relevance"])
# Check if answer is grounded in context
groundedness = check_groundedness(
result['result'],
result['source_documents']
)
metrics['groundedness'].append(groundedness)
return {k: sum(v)/len(v) for k, v in metrics.items()}
return {k: sum(v) / len(v) for k, v in metrics.items()}
```
## Resources
- **references/vector-databases.md**: Detailed comparison of vector DBs
- **references/embeddings.md**: Embedding model selection guide
- **references/retrieval-strategies.md**: Advanced retrieval techniques
- **references/reranking.md**: Reranking methods and when to use them
- **references/context-window.md**: Managing context limits
- **assets/vector-store-config.yaml**: Configuration templates
- **assets/retriever-pipeline.py**: Complete RAG pipeline
- **assets/embedding-models.md**: Model comparison and benchmarks
- [LangChain RAG Tutorial](https://python.langchain.com/docs/tutorials/rag/)
- [LangGraph RAG Examples](https://langchain-ai.github.io/langgraph/tutorials/rag/)
- [Pinecone Best Practices](https://docs.pinecone.io/guides/get-started/overview)
- [Voyage AI Embeddings](https://docs.voyageai.com/)
- [RAG Evaluation Guide](https://docs.ragas.io/)
## Best Practices
1. **Chunk Size**: Balance between context and specificity (500-1000 tokens)
1. **Chunk Size**: Balance between context (larger) and specificity (smaller) - typically 500-1000 tokens
2. **Overlap**: Use 10-20% overlap to preserve context at boundaries
3. **Metadata**: Include source, page, timestamp for filtering and debugging
4. **Hybrid Search**: Combine semantic and keyword search for best results
5. **Reranking**: Improve top results with cross-encoder
4. **Hybrid Search**: Combine semantic and keyword search for best recall
5. **Reranking**: Use cross-encoder reranking for precision-critical applications
6. **Citations**: Always return source documents for transparency
7. **Evaluation**: Continuously test retrieval quality and answer accuracy
8. **Monitoring**: Track retrieval metrics in production
8. **Monitoring**: Track retrieval metrics and latency in production
## Common Issues
- **Poor Retrieval**: Check embedding quality, chunk size, query formulation
- **Irrelevant Results**: Add metadata filtering, use hybrid search, rerank
- **Missing Information**: Ensure documents are properly indexed
- **Missing Information**: Ensure documents are properly indexed, check chunking
- **Slow Queries**: Optimize vector store, use caching, reduce k
- **Hallucinations**: Improve grounding prompt, add verification step
- **Context Too Long**: Use compression or parent document retriever