4159 字
21 分钟
RAG技术实战指南:让大模型拥有"记忆"的秘密
RAG技术实战指南:让大模型拥有”记忆”的秘密
引言
你是否遇到过这样的场景:让ChatGPT回答公司内部文档的问题,它却一无所知?让大模型帮你查询上周的项目进度,它只能说”我没有相关信息”?
这是因为大模型存在一个根本性局限:它只能基于训练数据回答问题,无法获取实时信息或私有知识。
RAG(Retrieval-Augmented Generation,检索增强生成)正是解决这个问题的核心技术。它让大模型在回答问题前先”查资料”,从而拥有近乎无限的知识扩展能力。
本文将从原理到实战,带你构建一个高质量的RAG系统。
什么是RAG
核心概念
RAG的核心理念很简单:
回答问题前,先从知识库检索相关信息,再让大模型基于检索结果生成答案。
用公式表达:
回答 = LLM(问题 + 检索到的相关文档)与传统方法的对比
| 方法 | 优点 | 缺点 |
|---|---|---|
| 纯LLM | 速度快,无额外成本 | 无法获取新知识,容易”幻觉” |
| Fine-tuning | 定制化强 | 训练成本高,知识更新难 |
| RAG | 知识可实时更新,成本低 | 需要维护知识库,检索延迟 |
RAG的优势
- 知识可扩展:随时添加新文档,无需重新训练
- 答案可溯源:可以告诉用户信息来自哪份文档
- 降低幻觉:有据可依,答案更准确
- 成本可控:无需昂贵的Fine-tuning过程
- 隐私可控:敏感数据可留在本地知识库
RAG系统架构
一个完整的RAG系统包含以下组件:
┌─────────────────────────────────────────────────────────────┐│ RAG 系统架构 │├─────────────────────────────────────────────────────────────┤│ ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │ 文档入库 │───▶│ 文本切分 │───▶│ Embedding│───▶│向量存储 │ ││ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││ ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │ 用户提问 │───▶│ 问题向量 │───▶│ 相似检索 │───▶│ LLM生成 │ ││ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││ │└─────────────────────────────────────────────────────────────┘核心组件详解
1. 文档处理模块
负责将原始文档转化为可检索的向量:
- 文档加载:PDF、Word、Markdown、网页等
- 文本切分:将长文档切分为适当大小的片段
- 清洗预处理:去除噪声、统一格式
2. Embedding模块
将文本转化为向量表示:
- 选择Embedding模型
- 调用API或本地模型生成向量
- 处理批量文本的效率优化
3. 向量数据库
存储和检索向量:
- 向量索引构建
- 相似度搜索
- 元数据过滤
4. 检索与生成模块
将检索结果与LLM结合:
- 重排序优化(可选)
- Prompt构建
- LLM调用与答案生成
文档切分策略
文档切分是RAG质量的关键因素。切分不好,检索就找不到准确信息。
切分方法对比
| 方法 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 固定长度 | 结构化文档 | 简单易实现 | 可能切断语义 |
| 语义切分 | 长文档 | 保持语义完整 | 切分点不确定 |
| 递归切分 | 技术文档 | 层次清晰 | 需要定义分隔符 |
| 滑动窗口 | 需要上下文 | 信息连贯 | 有冗余 |
实战代码:智能切分
from langchain.text_splitter import RecursiveCharacterTextSplitterfrom langchain.text_splitter import MarkdownHeaderTextSplitter
class SmartTextSplitter: """智能文档切分器"""
def __init__( self, chunk_size: int = 500, chunk_overlap: int = 50, separators: list = None ): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap
# 默认分隔符优先级:从强到弱 self.separators = separators or [ "\n\n\n", # 章节分隔 "\n\n", # 段落分隔 "\n", # 行分隔 "。", # 中文句号 ".", # 英文句号 " ", # 空格 "" # 最后强制切分 ]
self.splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=self.separators, length_function=self._count_tokens )
def _count_tokens(self, text: str) -> int: """更准确的token计数(中文友好)""" # 简化版:中文约1.5字符/token,英文约4字符/token chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff') other_chars = len(text) - chinese_chars return int(chinese_chars * 0.7 + other_chars * 0.25)
def split_text(self, text: str) -> list[str]: """切分文本""" return self.splitter.split_text(text)
def split_markdown(self, text: str) -> list[dict]: """切分Markdown文档,保留标题元数据""" # 先按标题切分 headers_to_split_on = [ ("#", "header1"), ("##", "header2"), ("###", "header3"), ]
markdown_splitter = MarkdownHeaderTextSplitter( headers_to_split_on=headers_to_split_on ) md_splits = markdown_splitter.split_text(text)
# 再对每个片段进行细切分 final_chunks = [] for split in md_splits: content = split.page_content metadata = split.metadata
# 如果片段太长,继续切分 if len(content) > self.chunk_size: sub_chunks = self.splitter.split_text(content) for i, sub_chunk in enumerate(sub_chunks): final_chunks.append({ "content": sub_chunk, "metadata": { **metadata, "chunk_index": i } }) else: final_chunks.append({ "content": content, "metadata": metadata })
return final_chunks
# 使用示例splitter = SmartTextSplitter(chunk_size=500, chunk_overlap=50)chunks = splitter.split_markdown(long_markdown_text)切分参数调优建议
| 文档类型 | chunk_size | chunk_overlap | 建议 |
|---|---|---|---|
| 技术文档 | 300-500 | 50-100 | 按章节切分 |
| 法律合同 | 500-800 | 100-150 | 保持条款完整 |
| 新闻资讯 | 200-400 | 30-50 | 按段落切分 |
| 学术论文 | 400-600 | 80-100 | 保留引用上下文 |
| FAQ问答 | 100-200 | 0 | 按问答对切分 |
Embedding模型选择
Embedding质量直接影响检索效果。好的Embedding能让语义相似的文本向量距离更近。
主流Embedding模型对比
| 模型 | 维度 | 中文支持 | 性能(MTEB) | 部署方式 |
|---|---|---|---|---|
| text-embedding-3-small | 1536 | ⭐⭐⭐⭐ | 62.3 | API |
| text-embedding-3-large | 3072 | ⭐⭐⭐⭐ | 64.6 | API |
| bge-large-zh-v1.5 | 1024 | ⭐⭐⭐⭐⭐ | 64.5 | 本地/API |
| bge-m3 | 1024 | ⭐⭐⭐⭐⭐ | 65.0 | 本地 |
| m3e-large | 1024 | ⭐⭐⭐⭐⭐ | 63.5 | 本地 |
| jina-embeddings-v2 | 768 | ⭐⭐⭐ | 61.5 | 本地/API |
实战代码:多Embedding策略
from sentence_transformers import SentenceTransformerimport numpy as npfrom typing import List, Optional
class EmbeddingManager: """Embedding管理器,支持多种模型切换"""
def __init__(self, model_name: str = "BAAI/bge-large-zh-v1.5"): self.model_name = model_name self.model = None self._load_model()
def _load_model(self): """加载模型""" # 支持本地模型 local_models = { "bge-large-zh": "BAAI/bge-large-zh-v1.5", "bge-m3": "BAAI/bge-m3", "m3e-large": "moka-ai/m3e-large", }
if self.model_name in local_models: self.model = SentenceTransformer(local_models[self.model_name]) else: # 使用OpenAI API self.model = None # 将使用API
def embed_texts(self, texts: List[str]) -> np.ndarray: """批量生成向量""" if self.model: # 本地模型 return self.model.encode(texts, normalize_embeddings=True) else: # OpenAI API return self._embed_with_openai(texts)
def embed_query(self, query: str) -> np.ndarray: """生成查询向量(可添加特殊处理)""" # 对于某些模型,查询需要添加前缀 if "bge" in self.model_name.lower(): query = "为这个句子生成表示以用于检索相关文章:" + query
return self.embed_texts([query])[0]
def _embed_with_openai(self, texts: List[str]) -> np.ndarray: """使用OpenAI API""" import openai
response = openai.embeddings.create( model="text-embedding-3-small", input=texts )
embeddings = [item.embedding for item in response.data] return np.array(embeddings)
# 使用示例embedding_manager = EmbeddingManager("bge-large-zh")vectors = embedding_manager.embed_texts(["这是第一段文本", "这是第二段文本"])query_vector = embedding_manager.embed_query("搜索这段相关内容")Embedding选择建议
| 场景 | 推荐模型 | 原因 |
|---|---|---|
| 中文为主 | bge-large-zh-v1.5 | 中文效果最佳,开源免费 |
| 多语言混合 | bge-m3 | 支持100+语言,多粒度检索 |
| 高精度需求 | text-embedding-3-large | OpenAI最强模型 |
| 成本敏感 | bge-large-zh(本地) | 无API调用成本 |
| 低延迟需求 | m3e-base | 模型小,速度快 |
向量数据库选型
向量数据库是RAG系统的”记忆存储”,直接影响检索效率和准确性。
主流向量数据库对比
| 数据库 | 部署方式 | 性能 | 特点 | 适用场景 |
|---|---|---|---|---|
| Chroma | 本地/云端 | ⭐⭐⭐ | 轻量易用,开源 | 个人项目、原型开发 |
| Milvus | 本地/云端 | ⭐⭐⭐⭐⭐ | 高性能,分布式 | 大规模生产环境 |
| Pinecone | 云端 | ⭐⭐⭐⭐ | 全托管,易维护 | 企业级SaaS |
| Weaviate | 本地/云端 | ⭐⭐⭐⭐ | 混合检索,GraphQL | 多模态检索 |
| Qdrant | 本地/云端 | ⭐⭐⭐⭐ | Rust实现,高效 | 高并发场景 |
| PGVector | PostgreSQL扩展 | ⭐⭐⭐ | 集成简单 | 已有PG的项目 |
实战代码:Chroma快速上手
import chromadbfrom chromadb.config import Settings
class ChromaVectorStore: """Chroma向量存储封装"""
def __init__(self, collection_name: str = "documents", persist_dir: str = "./chroma_db"): self.client = chromadb.PersistentClient(path=persist_dir) self.collection = self.client.get_or_create_collection( name=collection_name, metadata={"hnsw:space": "cosine"} # 使用余弦相似度 )
def add_documents( self, documents: List[str], embeddings: List[List[float]], metadatas: List[dict] = None, ids: List[str] = None ): """添加文档""" if not ids: ids = [f"doc_{i}" for i in range(len(documents))]
if not metadatas: metadatas = [{"source": "unknown"} for _ in documents]
self.collection.add( documents=documents, embeddings=embeddings, metadatas=metadatas, ids=ids )
def search( self, query_embedding: List[float], top_k: int = 5, where_filter: dict = None ) -> dict: """相似度检索""" results = self.collection.query( query_embeddings=[query_embedding], n_results=top_k, where=where_filter # 元数据过滤 )
return { "documents": results["documents"][0], "metadatas": results["metadatas"][0], "distances": results["distances"][0], "ids": results["ids"][0] }
def delete_by_metadata(self, metadata_filter: dict): """按元数据删除""" self.collection.delete(where=metadata_filter)
# 使用示例store = ChromaVectorStore("my_knowledge_base")store.add_documents(chunks, embeddings, metadatas)results = store.search(query_vector, top_k=5)实战代码:Milvus生产级部署
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
class MilvusVectorStore: """Milvus向量存储(生产级)"""
def __init__( self, collection_name: str = "documents", host: str = "localhost", port: str = "19530", dimension: int = 1024 ): self.collection_name = collection_name self.dimension = dimension
# 连接Milvus connections.connect(host=host, port=port)
# 创建或获取collection if utility.has_collection(collection_name): self.collection = Collection(collection_name) else: self._create_collection()
# 创建索引(首次需要) self._create_index()
def _create_collection(self): """创建collection schema""" fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.dimension), FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000), FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=200), FieldSchema(name="chunk_index", dtype=DataType.INT64), ]
schema = CollectionSchema(fields=fields, description="知识库向量") self.collection = Collection(name=self.collection_name, schema=schema)
def _create_index(self): """创建向量索引""" index_params = { "metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 8, "efConstruction": 64} }
self.collection.create_index( field_name="embedding", index_params=index_params )
def insert(self, data: List[dict]): """批量插入""" embeddings = [item["embedding"] for item in data] contents = [item["content"][:2000] for item in data] sources = [item.get("source", "unknown")[:200] for item in data] chunk_indices = [item.get("chunk_index", 0) for item in data]
self.collection.insert([ embeddings, contents, sources, chunk_indices ])
# 刷新以确保写入 self.collection.flush()
def search( self, query_vector: List[float], top_k: int = 10, expr: str = None ) -> List[dict]: """相似度检索""" # 加载collection到内存 self.collection.load()
search_params = {"metric_type": "COSINE", "params": {"ef": 64}}
results = self.collection.search( data=[query_vector], anns_field="embedding", param=search_params, limit=top_k, expr=expr # 过滤表达式 )
# 格式化结果 formatted_results = [] for hit in results[0]: formatted_results.append({ "id": hit.id, "distance": hit.distance, "content": hit.entity.get("content"), "source": hit.entity.get("source"), })
return formatted_results检索优化策略
基础RAG往往检索效果不佳,需要多种优化策略提升质量。
常见问题与解决方案
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 检索不到相关内容 | 切分太细/太粗 | 调整chunk_size,多粒度切分 |
| 检索结果不精准 | Embedding不合适 | 更换中文优化模型 |
| 答案拼凑感强 | 检索片段分散 | 增加chunk_overlap,重排序 |
| 多轮对话丢失上下文 | 只用当前问题检索 | 加入历史对话检索 |
| 答案不准确 | 检索结果过多噪音 | 过滤、重排序、限制数量 |
高级检索技术
1. 混合检索(Hybrid Search)
结合向量检索和关键词检索:
from rank_bm25 import BM25Okapi
class HybridSearch: """混合检索:向量 + BM25"""
def __init__(self, vector_store, documents: List[str]): self.vector_store = vector_store self.documents = documents
# 构建BM25索引 tokenized_docs = [doc.split() for doc in documents] self.bm25 = BM25Okapi(tokenized_docs)
def search( self, query: str, query_vector: List[float], top_k: int = 10, alpha: float = 0.5 # 向量检索权重 ): """混合检索""" # 向量检索 vector_results = self.vector_store.search(query_vector, top_k=top_k * 2)
# BM25检索 bm25_scores = self.bm25.get_scores(query.split()) bm25_top_indices = np.argsort(bm25_scores)[-top_k * 2:]
# 合并结果并重排序 combined_scores = {}
for i, result in enumerate(vector_results["documents"]): doc_id = vector_results["ids"][i] vector_score = 1 - vector_results["distances"][i] # 转化为相似度 combined_scores[doc_id] = alpha * vector_score
for idx in bm25_top_indices: doc = self.documents[idx] bm25_score = bm25_scores[idx] / max(bm25_scores) # 归一化 # 如果已存在,加权合并 if doc in combined_scores: combined_scores[doc] += (1 - alpha) * bm25_score else: combined_scores[doc] = (1 - alpha) * bm25_score
# 排序返回top_k sorted_docs = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) return sorted_docs[:top_k]2. 重排序(Re-ranking)
对检索结果进行二次排序:
from sentence_transformers import CrossEncoder
class ReRanker: """检索结果重排序"""
def __init__(self, model_name: str = "BAAI/bge-reranker-large"): self.reranker = CrossEncoder(model_name)
def rerank( self, query: str, documents: List[str], top_k: int = 5 ) -> List[tuple]: """重排序""" # 构建query-doc pairs pairs = [(query, doc) for doc in documents]
# 计算相关性分数 scores = self.reranker.predict(pairs)
# 排序 ranked_results = sorted( zip(documents, scores), key=lambda x: x[1], reverse=True )
return ranked_results[:top_k]
# 使用示例reranker = ReRanker()initial_results = vector_store.search(query_vector, top_k=20)reranked = reranker.rerank(query, initial_results["documents"], top_k=5)3. 多查询检索
生成多个相关问题提高召回:
class MultiQueryRetriever: """多查询检索策略"""
def __init__(self, llm_client, vector_store): self.llm = llm_client self.vector_store = vector_store
def generate_queries(self, original_query: str) -> List[str]: """生成多个相关问题""" prompt = f"""用户问题:{original_query}
请生成3-5个与这个问题相关的搜索查询,帮助找到更全面的信息。每个查询一行,不要编号。"""
response = self.llm.generate(prompt) queries = [line.strip() for line in response.split("\n") if line.strip()]
# 加入原始问题 queries.append(original_query)
return queries
def retrieve(self, original_query: str, top_k: int = 5) -> List[str]: """多查询检索""" queries = self.generate_queries(original_query)
all_results = [] for query in queries: query_vector = self.embedding_manager.embed_query(query) results = self.vector_store.search(query_vector, top_k=top_k) all_results.extend(results["documents"])
# 去重 unique_results = list(set(all_results))
# 重排序 reranker = ReRanker() final_results = reranker.rerank(original_query, unique_results, top_k=top_k)
return final_results完整RAG系统实现
系统代码整合
"""完整的RAG系统实现"""
from typing import List, Dict, Optionalfrom dataclasses import dataclassimport chromadb
@dataclassclass RAGConfig: """RAG配置""" embedding_model: str = "BAAI/bge-large-zh-v1.5" vector_db: str = "chroma" chunk_size: int = 500 chunk_overlap: int = 50 top_k: int = 5 rerank: bool = True llm_model: str = "qwen-plus"
class RAGSystem: """完整RAG系统"""
def __init__(self, config: RAGConfig = None): self.config = config or RAGConfig()
# 初始化各组件 self.text_splitter = SmartTextSplitter( chunk_size=self.config.chunk_size, chunk_overlap=self.config.chunk_overlap )
self.embedding_manager = EmbeddingManager(self.config.embedding_model)
self.vector_store = ChromaVectorStore("knowledge_base")
if self.config.rerank: self.reranker = ReRanker()
def ingest_documents(self, documents: List[Dict]): """文档入库""" all_chunks = [] all_embeddings = [] all_metadatas = [] all_ids = []
for doc_idx, doc in enumerate(documents): # 切分 chunks = self.text_splitter.split_text(doc["content"])
# 生成向量 embeddings = self.embedding_manager.embed_texts(chunks)
# 构建元数据 for chunk_idx, chunk in enumerate(chunks): all_chunks.append(chunk) all_embeddings.append(embeddings[chunk_idx].tolist()) all_metadatas.append({ "source": doc.get("source", "unknown"), "doc_idx": doc_idx, "chunk_idx": chunk_idx, "title": doc.get("title", "") }) all_ids.append(f"doc_{doc_idx}_chunk_{chunk_idx}")
# 存入向量库 self.vector_store.add_documents( documents=all_chunks, embeddings=all_embeddings, metadatas=all_metadatas, ids=all_ids )
return len(all_chunks)
def query( self, question: str, top_k: int = None, rerank: bool = None, return_sources: bool = True ) -> Dict: """查询""" top_k = top_k or self.config.top_k rerank = rerank if rerank is not None else self.config.rerank
# 生成查询向量 query_vector = self.embedding_manager.embed_query(question)
# 检索(多取一些用于重排序) retrieve_k = top_k * 3 if rerank else top_k results = self.vector_store.search(query_vector, top_k=retrieve_k)
# 重排序 if rerank: ranked = self.reranker.rerank(question, results["documents"], top_k=top_k) final_docs = [item[0] for item in ranked] # 找到对应的元数据 final_metadatas = [] for doc in final_docs: idx = results["documents"].index(doc) final_metadatas.append(results["metadatas"][idx]) else: final_docs = results["documents"][:top_k] final_metadatas = results["metadatas"][:top_k]
# 构建Prompt context = "\n\n".join([f"[文档{i+1}] {doc}" for i, doc in enumerate(final_docs)])
prompt = f"""基于以下文档内容回答用户问题。如果文档中没有相关信息,请明确说明。
参考文档:{context}
用户问题:{question}
请给出准确、简洁的回答,并注明信息来源。"""
# 调用LLM answer = self._call_llm(prompt)
# 构建返回结果 result = { "question": question, "answer": answer, }
if return_sources: result["sources"] = [ { "content": doc, "metadata": meta } for doc, meta in zip(final_docs, final_metadatas) ]
return result
def _call_llm(self, prompt: str) -> str: """调用LLM""" # 这里可以根据config使用不同的LLM # 示例使用OpenAI格式API import openai
client = openai.OpenAI( api_key="your-api-key", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" )
response = client.chat.completions.create( model=self.config.llm_model, messages=[{"role": "user", "content": prompt}], temperature=0.1 # 低温度减少幻觉 )
return response.choices[0].message.content
# 使用示例def main(): # 配置 config = RAGConfig( embedding_model="bge-large-zh", chunk_size=500, top_k=5, rerank=True )
# 初始化系统 rag = RAGSystem(config)
# 入库文档 documents = [ { "content": "长文档内容...", "source": "产品手册", "title": "产品使用指南" }, { "content": "另一篇文档...", "source": "FAQ", "title": "常见问题解答" } ]
rag.ingest_documents(documents)
# 查询 result = rag.query("如何使用这个产品?")
print(f"问题: {result['question']}") print(f"回答: {result['answer']}") print(f"来源: {len(result['sources'])} 个文档片段")
if __name__ == "__main__": main()生产环境最佳实践
性能优化
import asynciofrom concurrent.futures import ThreadPoolExecutor
class AsyncRAGSystem(RAGSystem): """异步RAG系统,提升吞吐量"""
def __init__(self, config: RAGConfig = None, max_workers: int = 4): super().__init__(config) self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def async_ingest(self, documents: List[Dict]) -> int: """异步入库""" loop = asyncio.get_event_loop()
# 并行处理文档 tasks = [ loop.run_in_executor( self.executor, self._process_single_document, doc ) for doc in documents ]
results = await asyncio.gather(*tasks)
# 批量写入 total_chunks = sum(r["chunk_count"] for r in results)
# 合并所有结果写入向量库 all_data = [] for r in results: all_data.extend(r["data"])
self.vector_store.add_documents( documents=[d["content"] for d in all_data], embeddings=[d["embedding"] for d in all_data], metadatas=[d["metadata"] for d in all_data], ids=[d["id"] for d in all_data] )
return total_chunks
def _process_single_document(self, doc: Dict) -> Dict: """处理单个文档""" chunks = self.text_splitter.split_text(doc["content"]) embeddings = self.embedding_manager.embed_texts(chunks)
data = [] for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): data.append({ "content": chunk, "embedding": emb.tolist(), "metadata": { "source": doc.get("source", "unknown"), "chunk_idx": i }, "id": f"{doc.get('source', 'doc')}_{i}" })
return {"chunk_count": len(chunks), "data": data}
async def async_query(self, question: str) -> Dict: """异步查询""" loop = asyncio.get_event_loop()
# 并行执行:向量生成 + 初步检索 query_vector_task = loop.run_in_executor( self.executor, self.embedding_manager.embed_query, question )
query_vector = await query_vector_task
# 检索 results = await loop.run_in_executor( self.executor, self.vector_store.search, query_vector.tolist(), self.config.top_k * 3 )
# 重排序 + LLM生成并行 if self.config.rerank: rerank_task = loop.run_in_executor( self.executor, self.reranker.rerank, question, results["documents"], self.config.top_k ) ranked = await rerank_task final_docs = [item[0] for item in ranked] else: final_docs = results["documents"][:self.config.top_k]
# LLM生成 context = "\n\n".join(final_docs) prompt = f"基于以下内容回答:\n{context}\n问题:{question}"
answer = await loop.run_in_executor( self.executor, self._call_llm, prompt )
return {"question": question, "answer": answer, "sources": final_docs}监控与评估
class RAGMonitor: """RAG系统监控"""
def __init__(self): self.metrics = { "query_count": 0, "avg_latency": 0, "avg_retrieval_latency": 0, "avg_llm_latency": 0, "retrieval_accuracy": [] # 人工标注 }
def log_query( self, question: str, retrieval_latency: float, llm_latency: float, answer: str, sources: List[str] ): """记录查询日志""" self.metrics["query_count"] += 1
# 更新平均延迟 n = self.metrics["query_count"] old_avg = self.metrics["avg_latency"] new_latency = retrieval_latency + llm_latency self.metrics["avg_latency"] = old_avg + (new_latency - old_avg) / n
# 单独统计 self.metrics["avg_retrieval_latency"] = ( self.metrics["avg_retrieval_latency"] + (retrieval_latency - self.metrics["avg_retrieval_latency"]) / n ) self.metrics["avg_llm_latency"] = ( self.metrics["avg_llm_latency"] + (llm_latency - self.metrics["avg_llm_latency"]) / n )
def get_report(self) -> Dict: """获取监控报告""" return { "total_queries": self.metrics["query_count"], "avg_total_latency_ms": round(self.metrics["avg_latency"] * 1000, 2), "avg_retrieval_ms": round(self.metrics["avg_retrieval_latency"] * 1000, 2), "avg_llm_ms": round(self.metrics["avg_llm_latency"] * 1000, 2), "retrieval_accuracy": self._calculate_accuracy() }
def _calculate_accuracy(self) -> float: """计算检索准确率(需要人工标注)""" if not self.metrics["retrieval_accuracy"]: return None
return sum(self.metrics["retrieval_accuracy"]) / len(self.metrics["retrieval_accuracy"])总结
RAG技术让大模型突破了”知识边界”,成为企业AI应用的核心基础设施。
关键要点回顾
- 文档切分:选择合适的chunk_size和overlap,保持语义完整性
- Embedding选择:中文场景优先bge系列,多语言选bge-m3
- 向量数据库:小项目用Chroma,大生产用Milvus
- 检索优化:混合检索、重排序、多查询检索显著提升效果
- 监控评估:持续跟踪延迟和准确率,迭代优化
未来发展趋势
- 多模态RAG:支持图片、音频、视频检索
- 自适应检索:根据问题类型动态调整策略
- RAG + Fine-tuning:结合微调进一步提升效果
- Agent化RAG:让RAG成为AI Agent的知识工具
掌握RAG技术,你就掌握了让大模型”拥有知识”的钥匙。
参考链接: