4159 字
21 分钟
RAG技术实战指南:让大模型拥有"记忆"的秘密

RAG技术实战指南:让大模型拥有”记忆”的秘密#

引言#

你是否遇到过这样的场景:让ChatGPT回答公司内部文档的问题,它却一无所知?让大模型帮你查询上周的项目进度,它只能说”我没有相关信息”?

这是因为大模型存在一个根本性局限:它只能基于训练数据回答问题,无法获取实时信息或私有知识。

RAG(Retrieval-Augmented Generation,检索增强生成)正是解决这个问题的核心技术。它让大模型在回答问题前先”查资料”,从而拥有近乎无限的知识扩展能力。

本文将从原理到实战,带你构建一个高质量的RAG系统。

什么是RAG#

核心概念#

RAG的核心理念很简单:

回答问题前,先从知识库检索相关信息,再让大模型基于检索结果生成答案。

用公式表达:

回答 = LLM(问题 + 检索到的相关文档)

与传统方法的对比#

方法优点缺点
纯LLM速度快,无额外成本无法获取新知识,容易”幻觉”
Fine-tuning定制化强训练成本高,知识更新难
RAG知识可实时更新,成本低需要维护知识库,检索延迟

RAG的优势#

  1. 知识可扩展:随时添加新文档,无需重新训练
  2. 答案可溯源:可以告诉用户信息来自哪份文档
  3. 降低幻觉:有据可依,答案更准确
  4. 成本可控:无需昂贵的Fine-tuning过程
  5. 隐私可控:敏感数据可留在本地知识库

RAG系统架构#

一个完整的RAG系统包含以下组件:

┌─────────────────────────────────────────────────────────────┐
│ RAG 系统架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 文档入库 │───▶│ 文本切分 │───▶│ Embedding│───▶│向量存储 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 用户提问 │───▶│ 问题向量 │───▶│ 相似检索 │───▶│ LLM生成 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

核心组件详解#

1. 文档处理模块#

负责将原始文档转化为可检索的向量:

  • 文档加载:PDF、Word、Markdown、网页等
  • 文本切分:将长文档切分为适当大小的片段
  • 清洗预处理:去除噪声、统一格式

2. Embedding模块#

将文本转化为向量表示:

  • 选择Embedding模型
  • 调用API或本地模型生成向量
  • 处理批量文本的效率优化

3. 向量数据库#

存储和检索向量:

  • 向量索引构建
  • 相似度搜索
  • 元数据过滤

4. 检索与生成模块#

将检索结果与LLM结合:

  • 重排序优化(可选)
  • Prompt构建
  • LLM调用与答案生成

文档切分策略#

文档切分是RAG质量的关键因素。切分不好,检索就找不到准确信息。

切分方法对比#

方法适用场景优点缺点
固定长度结构化文档简单易实现可能切断语义
语义切分长文档保持语义完整切分点不确定
递归切分技术文档层次清晰需要定义分隔符
滑动窗口需要上下文信息连贯有冗余

实战代码:智能切分#

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.text_splitter import MarkdownHeaderTextSplitter
class SmartTextSplitter:
"""智能文档切分器"""
def __init__(
self,
chunk_size: int = 500,
chunk_overlap: int = 50,
separators: list = None
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# 默认分隔符优先级:从强到弱
self.separators = separators or [
"\n\n\n", # 章节分隔
"\n\n", # 段落分隔
"\n", # 行分隔
"。", # 中文句号
".", # 英文句号
" ", # 空格
"" # 最后强制切分
]
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=self.separators,
length_function=self._count_tokens
)
def _count_tokens(self, text: str) -> int:
"""更准确的token计数(中文友好)"""
# 简化版:中文约1.5字符/token,英文约4字符/token
chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
other_chars = len(text) - chinese_chars
return int(chinese_chars * 0.7 + other_chars * 0.25)
def split_text(self, text: str) -> list[str]:
"""切分文本"""
return self.splitter.split_text(text)
def split_markdown(self, text: str) -> list[dict]:
"""切分Markdown文档,保留标题元数据"""
# 先按标题切分
headers_to_split_on = [
("#", "header1"),
("##", "header2"),
("###", "header3"),
]
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
md_splits = markdown_splitter.split_text(text)
# 再对每个片段进行细切分
final_chunks = []
for split in md_splits:
content = split.page_content
metadata = split.metadata
# 如果片段太长,继续切分
if len(content) > self.chunk_size:
sub_chunks = self.splitter.split_text(content)
for i, sub_chunk in enumerate(sub_chunks):
final_chunks.append({
"content": sub_chunk,
"metadata": {
**metadata,
"chunk_index": i
}
})
else:
final_chunks.append({
"content": content,
"metadata": metadata
})
return final_chunks
# 使用示例
splitter = SmartTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_markdown(long_markdown_text)

切分参数调优建议#

文档类型chunk_sizechunk_overlap建议
技术文档300-50050-100按章节切分
法律合同500-800100-150保持条款完整
新闻资讯200-40030-50按段落切分
学术论文400-60080-100保留引用上下文
FAQ问答100-2000按问答对切分

Embedding模型选择#

Embedding质量直接影响检索效果。好的Embedding能让语义相似的文本向量距离更近。

主流Embedding模型对比#

模型维度中文支持性能(MTEB)部署方式
text-embedding-3-small1536⭐⭐⭐⭐62.3API
text-embedding-3-large3072⭐⭐⭐⭐64.6API
bge-large-zh-v1.51024⭐⭐⭐⭐⭐64.5本地/API
bge-m31024⭐⭐⭐⭐⭐65.0本地
m3e-large1024⭐⭐⭐⭐⭐63.5本地
jina-embeddings-v2768⭐⭐⭐61.5本地/API

实战代码:多Embedding策略#

from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Optional
class EmbeddingManager:
"""Embedding管理器,支持多种模型切换"""
def __init__(self, model_name: str = "BAAI/bge-large-zh-v1.5"):
self.model_name = model_name
self.model = None
self._load_model()
def _load_model(self):
"""加载模型"""
# 支持本地模型
local_models = {
"bge-large-zh": "BAAI/bge-large-zh-v1.5",
"bge-m3": "BAAI/bge-m3",
"m3e-large": "moka-ai/m3e-large",
}
if self.model_name in local_models:
self.model = SentenceTransformer(local_models[self.model_name])
else:
# 使用OpenAI API
self.model = None # 将使用API
def embed_texts(self, texts: List[str]) -> np.ndarray:
"""批量生成向量"""
if self.model:
# 本地模型
return self.model.encode(texts, normalize_embeddings=True)
else:
# OpenAI API
return self._embed_with_openai(texts)
def embed_query(self, query: str) -> np.ndarray:
"""生成查询向量(可添加特殊处理)"""
# 对于某些模型,查询需要添加前缀
if "bge" in self.model_name.lower():
query = "为这个句子生成表示以用于检索相关文章:" + query
return self.embed_texts([query])[0]
def _embed_with_openai(self, texts: List[str]) -> np.ndarray:
"""使用OpenAI API"""
import openai
response = openai.embeddings.create(
model="text-embedding-3-small",
input=texts
)
embeddings = [item.embedding for item in response.data]
return np.array(embeddings)
# 使用示例
embedding_manager = EmbeddingManager("bge-large-zh")
vectors = embedding_manager.embed_texts(["这是第一段文本", "这是第二段文本"])
query_vector = embedding_manager.embed_query("搜索这段相关内容")

Embedding选择建议#

场景推荐模型原因
中文为主bge-large-zh-v1.5中文效果最佳,开源免费
多语言混合bge-m3支持100+语言,多粒度检索
高精度需求text-embedding-3-largeOpenAI最强模型
成本敏感bge-large-zh(本地)无API调用成本
低延迟需求m3e-base模型小,速度快

向量数据库选型#

向量数据库是RAG系统的”记忆存储”,直接影响检索效率和准确性。

主流向量数据库对比#

数据库部署方式性能特点适用场景
Chroma本地/云端⭐⭐⭐轻量易用,开源个人项目、原型开发
Milvus本地/云端⭐⭐⭐⭐⭐高性能,分布式大规模生产环境
Pinecone云端⭐⭐⭐⭐全托管,易维护企业级SaaS
Weaviate本地/云端⭐⭐⭐⭐混合检索,GraphQL多模态检索
Qdrant本地/云端⭐⭐⭐⭐Rust实现,高效高并发场景
PGVectorPostgreSQL扩展⭐⭐⭐集成简单已有PG的项目

实战代码:Chroma快速上手#

import chromadb
from chromadb.config import Settings
class ChromaVectorStore:
"""Chroma向量存储封装"""
def __init__(self, collection_name: str = "documents", persist_dir: str = "./chroma_db"):
self.client = chromadb.PersistentClient(path=persist_dir)
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"} # 使用余弦相似度
)
def add_documents(
self,
documents: List[str],
embeddings: List[List[float]],
metadatas: List[dict] = None,
ids: List[str] = None
):
"""添加文档"""
if not ids:
ids = [f"doc_{i}" for i in range(len(documents))]
if not metadatas:
metadatas = [{"source": "unknown"} for _ in documents]
self.collection.add(
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
def search(
self,
query_embedding: List[float],
top_k: int = 5,
where_filter: dict = None
) -> dict:
"""相似度检索"""
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=where_filter # 元数据过滤
)
return {
"documents": results["documents"][0],
"metadatas": results["metadatas"][0],
"distances": results["distances"][0],
"ids": results["ids"][0]
}
def delete_by_metadata(self, metadata_filter: dict):
"""按元数据删除"""
self.collection.delete(where=metadata_filter)
# 使用示例
store = ChromaVectorStore("my_knowledge_base")
store.add_documents(chunks, embeddings, metadatas)
results = store.search(query_vector, top_k=5)

实战代码:Milvus生产级部署#

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
class MilvusVectorStore:
"""Milvus向量存储(生产级)"""
def __init__(
self,
collection_name: str = "documents",
host: str = "localhost",
port: str = "19530",
dimension: int = 1024
):
self.collection_name = collection_name
self.dimension = dimension
# 连接Milvus
connections.connect(host=host, port=port)
# 创建或获取collection
if utility.has_collection(collection_name):
self.collection = Collection(collection_name)
else:
self._create_collection()
# 创建索引(首次需要)
self._create_index()
def _create_collection(self):
"""创建collection schema"""
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.dimension),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name="chunk_index", dtype=DataType.INT64),
]
schema = CollectionSchema(fields=fields, description="知识库向量")
self.collection = Collection(name=self.collection_name, schema=schema)
def _create_index(self):
"""创建向量索引"""
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {"M": 8, "efConstruction": 64}
}
self.collection.create_index(
field_name="embedding",
index_params=index_params
)
def insert(self, data: List[dict]):
"""批量插入"""
embeddings = [item["embedding"] for item in data]
contents = [item["content"][:2000] for item in data]
sources = [item.get("source", "unknown")[:200] for item in data]
chunk_indices = [item.get("chunk_index", 0) for item in data]
self.collection.insert([
embeddings,
contents,
sources,
chunk_indices
])
# 刷新以确保写入
self.collection.flush()
def search(
self,
query_vector: List[float],
top_k: int = 10,
expr: str = None
) -> List[dict]:
"""相似度检索"""
# 加载collection到内存
self.collection.load()
search_params = {"metric_type": "COSINE", "params": {"ef": 64}}
results = self.collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=top_k,
expr=expr # 过滤表达式
)
# 格式化结果
formatted_results = []
for hit in results[0]:
formatted_results.append({
"id": hit.id,
"distance": hit.distance,
"content": hit.entity.get("content"),
"source": hit.entity.get("source"),
})
return formatted_results

检索优化策略#

基础RAG往往检索效果不佳,需要多种优化策略提升质量。

常见问题与解决方案#

问题原因解决方案
检索不到相关内容切分太细/太粗调整chunk_size,多粒度切分
检索结果不精准Embedding不合适更换中文优化模型
答案拼凑感强检索片段分散增加chunk_overlap,重排序
多轮对话丢失上下文只用当前问题检索加入历史对话检索
答案不准确检索结果过多噪音过滤、重排序、限制数量

高级检索技术#

结合向量检索和关键词检索:

from rank_bm25 import BM25Okapi
class HybridSearch:
"""混合检索:向量 + BM25"""
def __init__(self, vector_store, documents: List[str]):
self.vector_store = vector_store
self.documents = documents
# 构建BM25索引
tokenized_docs = [doc.split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
def search(
self,
query: str,
query_vector: List[float],
top_k: int = 10,
alpha: float = 0.5 # 向量检索权重
):
"""混合检索"""
# 向量检索
vector_results = self.vector_store.search(query_vector, top_k=top_k * 2)
# BM25检索
bm25_scores = self.bm25.get_scores(query.split())
bm25_top_indices = np.argsort(bm25_scores)[-top_k * 2:]
# 合并结果并重排序
combined_scores = {}
for i, result in enumerate(vector_results["documents"]):
doc_id = vector_results["ids"][i]
vector_score = 1 - vector_results["distances"][i] # 转化为相似度
combined_scores[doc_id] = alpha * vector_score
for idx in bm25_top_indices:
doc = self.documents[idx]
bm25_score = bm25_scores[idx] / max(bm25_scores) # 归一化
# 如果已存在,加权合并
if doc in combined_scores:
combined_scores[doc] += (1 - alpha) * bm25_score
else:
combined_scores[doc] = (1 - alpha) * bm25_score
# 排序返回top_k
sorted_docs = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_docs[:top_k]

2. 重排序(Re-ranking)#

对检索结果进行二次排序:

from sentence_transformers import CrossEncoder
class ReRanker:
"""检索结果重排序"""
def __init__(self, model_name: str = "BAAI/bge-reranker-large"):
self.reranker = CrossEncoder(model_name)
def rerank(
self,
query: str,
documents: List[str],
top_k: int = 5
) -> List[tuple]:
"""重排序"""
# 构建query-doc pairs
pairs = [(query, doc) for doc in documents]
# 计算相关性分数
scores = self.reranker.predict(pairs)
# 排序
ranked_results = sorted(
zip(documents, scores),
key=lambda x: x[1],
reverse=True
)
return ranked_results[:top_k]
# 使用示例
reranker = ReRanker()
initial_results = vector_store.search(query_vector, top_k=20)
reranked = reranker.rerank(query, initial_results["documents"], top_k=5)

3. 多查询检索#

生成多个相关问题提高召回:

class MultiQueryRetriever:
"""多查询检索策略"""
def __init__(self, llm_client, vector_store):
self.llm = llm_client
self.vector_store = vector_store
def generate_queries(self, original_query: str) -> List[str]:
"""生成多个相关问题"""
prompt = f"""用户问题:{original_query}
请生成3-5个与这个问题相关的搜索查询,帮助找到更全面的信息。
每个查询一行,不要编号。"""
response = self.llm.generate(prompt)
queries = [line.strip() for line in response.split("\n") if line.strip()]
# 加入原始问题
queries.append(original_query)
return queries
def retrieve(self, original_query: str, top_k: int = 5) -> List[str]:
"""多查询检索"""
queries = self.generate_queries(original_query)
all_results = []
for query in queries:
query_vector = self.embedding_manager.embed_query(query)
results = self.vector_store.search(query_vector, top_k=top_k)
all_results.extend(results["documents"])
# 去重
unique_results = list(set(all_results))
# 重排序
reranker = ReRanker()
final_results = reranker.rerank(original_query, unique_results, top_k=top_k)
return final_results

完整RAG系统实现#

系统代码整合#

"""
完整的RAG系统实现
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
import chromadb
@dataclass
class RAGConfig:
"""RAG配置"""
embedding_model: str = "BAAI/bge-large-zh-v1.5"
vector_db: str = "chroma"
chunk_size: int = 500
chunk_overlap: int = 50
top_k: int = 5
rerank: bool = True
llm_model: str = "qwen-plus"
class RAGSystem:
"""完整RAG系统"""
def __init__(self, config: RAGConfig = None):
self.config = config or RAGConfig()
# 初始化各组件
self.text_splitter = SmartTextSplitter(
chunk_size=self.config.chunk_size,
chunk_overlap=self.config.chunk_overlap
)
self.embedding_manager = EmbeddingManager(self.config.embedding_model)
self.vector_store = ChromaVectorStore("knowledge_base")
if self.config.rerank:
self.reranker = ReRanker()
def ingest_documents(self, documents: List[Dict]):
"""文档入库"""
all_chunks = []
all_embeddings = []
all_metadatas = []
all_ids = []
for doc_idx, doc in enumerate(documents):
# 切分
chunks = self.text_splitter.split_text(doc["content"])
# 生成向量
embeddings = self.embedding_manager.embed_texts(chunks)
# 构建元数据
for chunk_idx, chunk in enumerate(chunks):
all_chunks.append(chunk)
all_embeddings.append(embeddings[chunk_idx].tolist())
all_metadatas.append({
"source": doc.get("source", "unknown"),
"doc_idx": doc_idx,
"chunk_idx": chunk_idx,
"title": doc.get("title", "")
})
all_ids.append(f"doc_{doc_idx}_chunk_{chunk_idx}")
# 存入向量库
self.vector_store.add_documents(
documents=all_chunks,
embeddings=all_embeddings,
metadatas=all_metadatas,
ids=all_ids
)
return len(all_chunks)
def query(
self,
question: str,
top_k: int = None,
rerank: bool = None,
return_sources: bool = True
) -> Dict:
"""查询"""
top_k = top_k or self.config.top_k
rerank = rerank if rerank is not None else self.config.rerank
# 生成查询向量
query_vector = self.embedding_manager.embed_query(question)
# 检索(多取一些用于重排序)
retrieve_k = top_k * 3 if rerank else top_k
results = self.vector_store.search(query_vector, top_k=retrieve_k)
# 重排序
if rerank:
ranked = self.reranker.rerank(question, results["documents"], top_k=top_k)
final_docs = [item[0] for item in ranked]
# 找到对应的元数据
final_metadatas = []
for doc in final_docs:
idx = results["documents"].index(doc)
final_metadatas.append(results["metadatas"][idx])
else:
final_docs = results["documents"][:top_k]
final_metadatas = results["metadatas"][:top_k]
# 构建Prompt
context = "\n\n".join([f"[文档{i+1}] {doc}" for i, doc in enumerate(final_docs)])
prompt = f"""基于以下文档内容回答用户问题。如果文档中没有相关信息,请明确说明。
参考文档:
{context}
用户问题:{question}
请给出准确、简洁的回答,并注明信息来源。"""
# 调用LLM
answer = self._call_llm(prompt)
# 构建返回结果
result = {
"question": question,
"answer": answer,
}
if return_sources:
result["sources"] = [
{
"content": doc,
"metadata": meta
}
for doc, meta in zip(final_docs, final_metadatas)
]
return result
def _call_llm(self, prompt: str) -> str:
"""调用LLM"""
# 这里可以根据config使用不同的LLM
# 示例使用OpenAI格式API
import openai
client = openai.OpenAI(
api_key="your-api-key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
response = client.chat.completions.create(
model=self.config.llm_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.1 # 低温度减少幻觉
)
return response.choices[0].message.content
# 使用示例
def main():
# 配置
config = RAGConfig(
embedding_model="bge-large-zh",
chunk_size=500,
top_k=5,
rerank=True
)
# 初始化系统
rag = RAGSystem(config)
# 入库文档
documents = [
{
"content": "长文档内容...",
"source": "产品手册",
"title": "产品使用指南"
},
{
"content": "另一篇文档...",
"source": "FAQ",
"title": "常见问题解答"
}
]
rag.ingest_documents(documents)
# 查询
result = rag.query("如何使用这个产品?")
print(f"问题: {result['question']}")
print(f"回答: {result['answer']}")
print(f"来源: {len(result['sources'])} 个文档片段")
if __name__ == "__main__":
main()

生产环境最佳实践#

性能优化#

import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncRAGSystem(RAGSystem):
"""异步RAG系统,提升吞吐量"""
def __init__(self, config: RAGConfig = None, max_workers: int = 4):
super().__init__(config)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def async_ingest(self, documents: List[Dict]) -> int:
"""异步入库"""
loop = asyncio.get_event_loop()
# 并行处理文档
tasks = [
loop.run_in_executor(
self.executor,
self._process_single_document,
doc
)
for doc in documents
]
results = await asyncio.gather(*tasks)
# 批量写入
total_chunks = sum(r["chunk_count"] for r in results)
# 合并所有结果写入向量库
all_data = []
for r in results:
all_data.extend(r["data"])
self.vector_store.add_documents(
documents=[d["content"] for d in all_data],
embeddings=[d["embedding"] for d in all_data],
metadatas=[d["metadata"] for d in all_data],
ids=[d["id"] for d in all_data]
)
return total_chunks
def _process_single_document(self, doc: Dict) -> Dict:
"""处理单个文档"""
chunks = self.text_splitter.split_text(doc["content"])
embeddings = self.embedding_manager.embed_texts(chunks)
data = []
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
data.append({
"content": chunk,
"embedding": emb.tolist(),
"metadata": {
"source": doc.get("source", "unknown"),
"chunk_idx": i
},
"id": f"{doc.get('source', 'doc')}_{i}"
})
return {"chunk_count": len(chunks), "data": data}
async def async_query(self, question: str) -> Dict:
"""异步查询"""
loop = asyncio.get_event_loop()
# 并行执行:向量生成 + 初步检索
query_vector_task = loop.run_in_executor(
self.executor,
self.embedding_manager.embed_query,
question
)
query_vector = await query_vector_task
# 检索
results = await loop.run_in_executor(
self.executor,
self.vector_store.search,
query_vector.tolist(),
self.config.top_k * 3
)
# 重排序 + LLM生成并行
if self.config.rerank:
rerank_task = loop.run_in_executor(
self.executor,
self.reranker.rerank,
question,
results["documents"],
self.config.top_k
)
ranked = await rerank_task
final_docs = [item[0] for item in ranked]
else:
final_docs = results["documents"][:self.config.top_k]
# LLM生成
context = "\n\n".join(final_docs)
prompt = f"基于以下内容回答:\n{context}\n问题:{question}"
answer = await loop.run_in_executor(
self.executor,
self._call_llm,
prompt
)
return {"question": question, "answer": answer, "sources": final_docs}

监控与评估#

class RAGMonitor:
"""RAG系统监控"""
def __init__(self):
self.metrics = {
"query_count": 0,
"avg_latency": 0,
"avg_retrieval_latency": 0,
"avg_llm_latency": 0,
"retrieval_accuracy": [] # 人工标注
}
def log_query(
self,
question: str,
retrieval_latency: float,
llm_latency: float,
answer: str,
sources: List[str]
):
"""记录查询日志"""
self.metrics["query_count"] += 1
# 更新平均延迟
n = self.metrics["query_count"]
old_avg = self.metrics["avg_latency"]
new_latency = retrieval_latency + llm_latency
self.metrics["avg_latency"] = old_avg + (new_latency - old_avg) / n
# 单独统计
self.metrics["avg_retrieval_latency"] = (
self.metrics["avg_retrieval_latency"] +
(retrieval_latency - self.metrics["avg_retrieval_latency"]) / n
)
self.metrics["avg_llm_latency"] = (
self.metrics["avg_llm_latency"] +
(llm_latency - self.metrics["avg_llm_latency"]) / n
)
def get_report(self) -> Dict:
"""获取监控报告"""
return {
"total_queries": self.metrics["query_count"],
"avg_total_latency_ms": round(self.metrics["avg_latency"] * 1000, 2),
"avg_retrieval_ms": round(self.metrics["avg_retrieval_latency"] * 1000, 2),
"avg_llm_ms": round(self.metrics["avg_llm_latency"] * 1000, 2),
"retrieval_accuracy": self._calculate_accuracy()
}
def _calculate_accuracy(self) -> float:
"""计算检索准确率(需要人工标注)"""
if not self.metrics["retrieval_accuracy"]:
return None
return sum(self.metrics["retrieval_accuracy"]) / len(self.metrics["retrieval_accuracy"])

总结#

RAG技术让大模型突破了”知识边界”,成为企业AI应用的核心基础设施。

关键要点回顾#

  1. 文档切分:选择合适的chunk_size和overlap,保持语义完整性
  2. Embedding选择:中文场景优先bge系列,多语言选bge-m3
  3. 向量数据库:小项目用Chroma,大生产用Milvus
  4. 检索优化:混合检索、重排序、多查询检索显著提升效果
  5. 监控评估:持续跟踪延迟和准确率,迭代优化

未来发展趋势#

  • 多模态RAG:支持图片、音频、视频检索
  • 自适应检索:根据问题类型动态调整策略
  • RAG + Fine-tuning:结合微调进一步提升效果
  • Agent化RAG:让RAG成为AI Agent的知识工具

掌握RAG技术,你就掌握了让大模型”拥有知识”的钥匙。


参考链接: