跳到主要内容

记忆管理系统

在 LangGraph 中,记忆就是"持久化的状态(Persisted State)"。LangChain 1.0 的记忆管理与 LangGraph 的状态机制深度绑定。

你需要掌握三个核心要素:

  • State (状态): 定义用来存储消息的结构(通常是 MessagesState)。
  • Checkpointer (检查点保存器): 负责在每一步结束后把状态保存下来(短期记忆通常用 MemorySaver)。
  • Thread ID (线程ID): 在调用时通过 config 传入,用来隔离不同用户的对话上下文。

记忆类型概览

┌─────────────────────────────────────────────────────────────┐
│ Agent 记忆系统 │
├─────────────────────────────────────────────────────────────┤
│ 短期记忆 (Checkpointer) │
│ └── 与 thread_id 绑定,会话级生命周期 │
│ - InMemorySaver: 内存存储(开发用) │
│ - PostgresSaver: 数据库存储(生产用) │
├─────────────────────────────────────────────────────────────┤
│ 长期记忆 (VectorStore) │
│ └── 模糊知识片段,跨会话持久化 │
│ - Chroma: 本地向量数据库 │
│ - Pinecone/Milvus: 云端向量数据库 │
├─────────────────────────────────────────────────────────────┤
│ 跨线程记忆 (BaseStore) │
│ └── 结构化用户档案,跨会话精确检索 │
│ - PostgresStore: 数据库持久化 │
└─────────────────────────────────────────────────────────────┘

短期 vs 长期记忆的分界

维度短期记忆长期记忆
生命周期与会话(thread)绑定,会话结束即清理与用户/业务实体绑定,跨会话持久
检索方式自动加载到上下文需主动调用搜索工具
存储形态对话历史(完整上下文)知识片段(语义向量)
核心场景长对话维持、失败恢复用户偏好、经验积累

短期记忆(Checkpointer)

短期记忆通过 LangGraph 的 AgentState(一个 TypedDict)来管理。对话历史、中间步骤等信息被保存在状态中,并通过检查点(Checkpoints)机制在每次迭代后持久化。

核心原理:当你再次 invoke 并传入 thread_id 时,LangGraph 会先去存储里查"这个 ID 上次停在哪里?状态是什么?",然后加载状态,把新消息 append 进去,再继续运行。

InMemorySaver(开发环境)

from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent

# 创建内存检查点
memory = InMemorySaver()

# 创建 Agent
agent = create_agent(
model=model,
tools=[tools],
checkpointer=memory # 启用短期记忆
)

# 配置 thread_id(会话 ID)
config = {"configurable": {"thread_id": "user_123"}}

# 第一轮对话
response1 = agent.invoke(
{"messages": [{"role": "user", "content": "你好,我叫陈明"}]},
config=config
)

# 第二轮对话(自动携带上下文)
response2 = agent.invoke(
{"messages": [{"role": "user", "content": "你还记得我叫什么吗?"}]},
config=config # 相同 thread_id
)

# 验证:检查状态
state = agent.get_state(config)
print(f"当前记忆轮次: {len(state.values['messages'])} 条消息")

关键点

  • thread_id 是短期记忆的"钥匙"
  • 不同 thread_id 的会话记忆隔离
  • 内存存储,进程重启后记忆丢失

PostgresSaver(生产环境)

首先安装依赖

# 安装 psycopg-binary(推荐,纯 Python 实现)
pip install psycopg[binary]

# 安装 LangChain PostgreSQL 适配器
pip install langgraph-checkpoint-postgres
#%%
from langgraph.checkpoint.postgres import PostgresSaver

"""
生产环境使用数据库存储,支持:
- 持久化(重启不丢失)
- 多实例共享(分布式部署)
- 大规模并发
"""

print("\n" + "=" * 60)
print("场景 2: Postgres 持久化记忆(生产环境)")
print("=" * 60)

# 定义工具函数:查询用户信息
@tool
def get_user_info(name: str) -> str:
"""查询用户信息,返回姓名、年龄、爱好"""
user_db = {
"陈明": {"age": 28, "hobby": "旅游、滑雪、喝茶"},
"张三": {"age": 32, "hobby": "编程、阅读、电影"}
}
info = user_db.get(name, {"age": "未知", "hobby": "未知"})
return f"姓名: {name}, 年龄: {info['age']}岁, 爱好: {info['hobby']}"

# 创建模型
model = load_chat_model(model="gpt-4o-mini",provider="openai")

# 数据库连接字符串
DB_URI = "postgresql://myuser:123456@localhost:5432/mydatabase"

# 使用上下文管理器确保连接正确关闭
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
# 自动创建表结构(仅首次运行)
checkpointer.setup()

# 创建智能体
agent = create_agent(
model=model,
tools=[get_user_info],
checkpointer=checkpointer
)

# 配置线程 ID(用于区分不同用户)
config = {"configurable": {"thread_id": "production_user_001"}}

# 模拟用户注册流程
agent.invoke(
{"messages": [{"role": "user", "content": "我是新用户张三,请记录我的信息"}]},
config=config
)

response = agent.invoke(
{"messages": [{"role": "user", "content": "我是谁?"}]},
config=config
)
print(f"AI: {response['messages'][-1].content}")

[//]: # (============================================================)
[//]: # (场景 2: Postgres 持久化记忆(生产环境))
[//]: # (============================================================)
[//]: # (AI: 您是张三,32岁,您的爱好包括编程、阅读和电影。如果您有其他问题或者需要进一步的帮助,请随时告诉我!)

注意:PostgresSaver 即使存储到数据库,仍然属于短期记忆,因为:

  • 作用域限制:只检索和加载当前 thread_id 的数据
  • 无跨会话检索能力:无法在新会话中自动访问旧会话数据

两种检查点对比

特性InMemorySaverPostgresSaver
存储位置内存PostgreSQL
生命周期会话级会话级
持久化进程重启丢失保留
适用环境开发/测试生产

上下文裁剪

为什么要裁剪?

如果不处理,随着对话进行,state["messages"] 会包含几千条消息。直接全部传给 LLM 会导致:

  1. 烧钱:输入 token 越多,费用越高
  2. 超限报错:超过 128k/8k 上下文窗口限制

解决思路:在调用 LLM 之前使用 trimmer 裁剪。

  • State 中:依然保存完整历史(为了审计或回溯)
  • 传给 LLM 时:只传最近的 N 个 Token(或 N 条消息)

start_on 参数:这是一个细节最佳实践。如果截断导致第一条消息是 AI 的回复(没有对应的 User 问题),某些模型会感到困惑。设置 start_on="human" 确保截断后的对话总是以 User 消息开始。

使用 trim_messages

使用 trim_messages 可以裁剪对话历史,保留关键信息。

from langchain_core.messages import trim_messages
from langchain_core.language_models import count_tokens_tiktoken

# 裁剪配置
MAX_TOKENS = 100

# 获取当前状态
state = agent.get_state(config)
existing_messages = state.values.get("messages", [])

# 裁剪消息
trimmed_messages = trim_messages(
existing_messages,
max_tokens=MAX_TOKENS,
token_counter=count_tokens_tiktoken,
strategy="last", # 保留最新消息
include_system=True,
start_on="human" # 从人类消息开始裁剪
)

# 构建新输入
new_messages = trimmed_messages + [HumanMessage(content=user_input)]

裁剪策略

策略说明适用场景
last保留最近 N 条消息多数场景,推荐
first保留最早期消息需要历史上下文时
middle保留首尾,裁剪中间超长对话

长期记忆(向量数据库)

长期记忆通过外部向量数据库或键值存储集成来实现。可以在 Agent 执行的关键节点(如对话结束时)提取关键信息、用户偏好等,并存入长期记忆库,供未来的对话使用。

向量数据库是实现长期记忆的核心技术,通过语义相似度搜索实现知识的长期存储和检索。

技术实现:

  • 向量化存储:将对话内容、用户偏好等转换为向量表示
  • 语义检索:基于向量相似度实现智能搜索
  • 多模态支持:支持文本、图像、音频等多种数据类型
  • 高性能查询:支持大规模数据的快速检索
import os
import uuid
from typing import List

# --- 1. 导入组件 ---
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_chroma import Chroma # 向量数据库
from langchain_core.documents import Document
from langchain.agents import AgentState, create_agent
from langgraph.checkpoint.memory import MemorySaver

# 确保配置了 OPENAI_API_KEY
# os.environ["OPENAI_API_KEY"] = "sk-..."

# ==========================================
# 2. 初始化向量数据库 (长期记忆的物理载体)
# ==========================================
# 在生产环境中,这里应该是连接到 Pinecone, Milvus 或本地持久化的 Chroma
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = Chroma(
collection_name="agent_long_term_memory",
embedding_function=embeddings,
#persist_directory="./chroma_db" # 如果想存到硬盘,取消注释这一行
)

# ==========================================
# 3. 定义记忆工具 (Agent 的手)
# ==========================================
# 3.1 定义记忆保存工具
# ==========================================
@tool
def save_memory(content: str):
"""
将重要信息保存到长期记忆中。
当你获知用户的喜好、职业、计划或其他长期有效的事实时,调用此工具。
参数:
content (str): 要保存的记忆内容。
"""
print(f"\n[记忆操作] 正在保存记忆: '{content}'")
# 将文本封装为 Document
doc = Document(
page_content=content,
metadata={"source": "user_interaction", "timestamp": "simulated_time"}
)
# 写入向量库
vector_store.add_documents([doc])
return "记忆已成功保存。"

# 3.2 定义记忆搜索工具
# ==========================================
@tool
def search_memory(query: str):
"""
从长期记忆中搜索相关信息。
当你被问及关于用户过去的问题,或者你不确定答案时,使用此工具进行查找。
参数:
query (str): 要搜索的查询语句。
"""
print(f"\n[记忆操作] 正在搜索记忆: '{query}'")

# 执行语义搜索 (k=2 表示只取最相关的2条)
results = vector_store.similarity_search(query, k=2)

if not results:
return "没有找到相关的记忆。"

# 将搜索结果拼接成字符串返回给 Agent
memory_content = "\n".join([f"- {doc.page_content}" for doc in results])
return f"找到以下相关记忆:\n{memory_content}"

# 将工具放入列表
tools = [save_memory, search_memory]

# ==========================================
# 4. 创建 Agent
# ==========================================

# 定义系统提示词:教会 Agent 何时使用记忆工具
SYSTEM_PROMPT = """你是一个拥有长期记忆的私人助手。
你的目标是记住用户的喜好和重要信息,以便提供个性化服务。

1. 如果用户告诉你任何关于他们自己的事实(如名字、喜好、居住地),请务必调用 'save_memory' 工具保存。
2. 如果用户问你一个问题,而答案可能在你之前的记忆中,请先调用 'search_memory' 工具查找。
3. 如果只是闲聊,不需要调用工具。
"""

llm = ChatOpenAI(model="gpt-4o", temperature=0) # 建议使用 GPT-4 或更强的模型以保证工具调用准确率

# 使用 checkpointer 依然是必要的,用于维持当前这一轮对话的上下文
checkpointer = MemorySaver()

# 创建 Agent 应用
agent_app = create_agent(
llm,
tools,
system_prompt=SYSTEM_PROMPT, # 注入系统提示词
checkpointer=checkpointer
)

# ==========================================
# 5. 运行演示
# ==========================================

def run_demo():
# === 场景 A:存入记忆 ===
# 使用一个 thread_id,代表这是今天的对话
config_a = {"configurable": {"thread_id": "session_today"}}

print("--- 场景 A:用户告诉 Agent 喜好 ---")
user_input_1 = "你好,记住我最喜欢的水果是草莓,而且我对花生过敏。"

# 运行 Agent,stream_mode="values"参数,返回每个时间步的中间结果
for chunk in agent_app.stream({"messages": [HumanMessage(content=user_input_1)]}, config=config_a, stream_mode="values"):
# 只打印最后一条机器人的回复
pass
print(f"Agent: {chunk['messages'][-1].content}")

# === 场景 B:模拟遗忘 (开启新线程) ===
# 我们换一个 thread_id,这意味着 Agent 失去了“短期记忆” (MemorySaver 里的东西访问不到了)
# 但是,长期记忆在 VectorStore 里,是可以跨 thread 访问的!
config_b = {"configurable": {"thread_id": "session_tomorrow"}}

print("\n--- 场景 B:第二天 (新的 Session,短期记忆已清空) ---")
user_input_2 = "我想吃点零食,但我忘了我有什么忌口,你能帮我查查吗?"

print(f"User: {user_input_2}")

# 观察控制台输出,你会看到 Agent 自动调用 search_memory
final_response = None
for chunk in agent_app.stream({"messages": [HumanMessage(content=user_input_2)]}, config=config_b, stream_mode="values"):
final_response = chunk['messages'][-1]

print(f"Agent: {final_response.content}")

if __name__ == "__main__":
run_demo()

跨线程记忆(BaseStore)

跨线程记忆用于存储结构化的用户档案(如姓名、VIP等级、偏好设置),无论用户开多少个新聊天窗口,这些信息都必须存在。

三种记忆的区别

记忆类型存储内容作用域检索方式
短期记忆 (Checkpointer)对话历史thread_id 内自动加载
长期记忆 (VectorStore)模糊知识片段跨会话语义搜索
跨线程记忆 (BaseStore)结构化用户档案user_id 级别精确查询

BaseStore 核心特性

BaseStore 是 LangGraph 提供的通用键值存储抽象接口,专为结构化长期记忆设计。

命名空间(Namespace)机制:采用层次化元组路径组织数据,类似文件系统目录结构:

namespace = ("users", "user_123", "preferences")
# 对应逻辑路径:users/user_123/preferences

核心操作

  • put(namespace, key, value):存储键值对(支持 TTL 过期)
  • get(namespace, key):精确检索单个记忆
  • search(namespace, query):语义搜索(需子类支持)
  • delete(namespace, key):删除记忆
import os
from dotenv import load_dotenv
import time
import uuid
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langchain.agents import create_agent, AgentState
from typing import Annotated
from pydantic import BaseModel, Field

# --- 核心组件:Postgres 持久化检查点 ---
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
from langgraph.store.base import BaseStore
from langgraph.prebuilt import InjectedStore, InjectedState
from psycopg_pool import ConnectionPool

# 配置 API KEY
load_dotenv(override=True)

# ==========================================
# 1. 数据库配置
# ==========================================
DB_URI = "postgresql://postgres:password@host:5432/appdb"

# ==========================================
# 2. 定义工具 (Tools)
# ==========================================
@tool
def magic_calculation(a: int, b: int) -> int:
"""进行一次特殊的加法计算"""
return (a + b) * 10


class CrossThreadState(AgentState):
user_id: str # 跨线程记忆的唯一标识


# ============ 定义 Pydantic 模型用于提取用户信息 ===========
class UserInfo(BaseModel):
"""从文本中提取的用户信息"""
user_name: str = Field(description="用户的名字,例如:Alice、Bob、张三等")
additional_info: str = Field(description="关于用户的其他信息,例如职业、兴趣爱好等")


class QueryInfo(BaseModel):
"""从查询文本中提取的信息"""
user_name: str = Field(
description="要查询的用户名字。如果查询中包含'我的'、'我是'等第一人称,请从对话历史中提取用户名;如果没有明确的用户名,返回'all_users'"
)
query_content: str = Field(description="查询的具体内容,例如:职业、兴趣爱好等")


# ============ 定义记忆管理工具(使用 BaseStore)===========
@tool
def remember_user_info(
info: str,
state: Annotated[dict, InjectedState()],
store: Annotated[BaseStore, InjectedStore()]
) -> str:
"""
将用户信息存入跨线程记忆

参数说明:
info: 要记忆的信息(例如:用户的名字、职业、偏好等)
"""
# 使用 Pydantic 提取用户信息
structured_llm = llm.with_structured_output(UserInfo)

try:
extracted_info = structured_llm.invoke(
f"从以下文本中提取用户名和其他信息:{info}"
)

extracted_user_name = extracted_info.user_name.lower()
state_user_id = state.get("user_id", "unknown_user")

if extracted_user_name and extracted_user_name != "unknown":
user_id = extracted_user_name
else:
user_id = state_user_id

full_info = f"{extracted_info.user_name}: {extracted_info.additional_info}"

except Exception as e:
user_id = state.get("user_id", "unknown_user")
full_info = info

namespace = (user_id, "profile")
memory_id = str(uuid.uuid4())

store.put(
namespace,
memory_id,
{
"info": full_info,
"timestamp": "2025-11-25",
"source": "user_input"
}
)

return f"✅ 已将信息存入长期记忆 (用户: {user_id}): {full_info}"


@tool
def recall_user_info(
query: str,
state: Annotated[dict, InjectedState()],
store: Annotated[BaseStore, InjectedStore()]
) -> str:
"""
从跨线程记忆中检索用户信息

参数说明:
query: 查询关键词
"""
state_user_id = state.get("user_id", None)

if not state_user_id:
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm = llm.with_structured_output(QueryInfo)

try:
prompt = f"""从以下查询中提取用户名和查询内容。
查询文本:{query}
注意:
1. 如果查询中包含"我的"、"我是"等第一人称词汇,说明用户在询问自己的信息
2. 如果能从查询中推断出具体的用户名(如 Alice、Bob),请提取该用户名
3. 如果无法确定具体用户,返回 'all_users'
"""
extracted_query = structured_llm.invoke(prompt)

if extracted_query.user_name.lower() in ['all_users', 'current_user', 'unknown']:
user_id = None
else:
user_id = extracted_query.user_name.lower()

except Exception as e:
user_id = None
else:
user_id = state_user_id

try:
if user_id:
namespace_prefix = (user_id,)
memories = store.search(namespace_prefix, limit=20)
else:
memories = []
for uid in ['alice', 'bob', 'unknown_user']:
namespace_prefix = (uid,)
user_memories = store.search(namespace_prefix, limit=20)
memories.extend(user_memories)

if not memories:
return f"未找到相关记忆。请先告诉我一些信息,我会记住它们。"

results = []
for item in memories:
info = item.value.get('info', '未知信息')
timestamp = item.value.get('timestamp', '未知时间')
results.append(f"- {info} (记录时间: {timestamp})")

return f"找到 {len(results)} 条记忆:\n" + "\n".join(results)

except Exception as e:
return f"检索记忆时出错: {str(e)}"


# ==========================================
# 3. 主程序逻辑
# ==========================================
def run_postgres_agent():
print("--- 正在连接 PostgreSQL 数据库 ---")

with ConnectionPool(conninfo=DB_URI, max_size=20, kwargs={"autocommit": True}) as pool:
# --- A. 初始化 Checkpointer 和 Store ---
checkpointer = PostgresSaver(pool)
store = PostgresStore(pool)

print("🔧 初始化 Checkpointer 表结构...")
checkpointer.setup()
print("✅ Checkpointer 表结构初始化完成")

print("🔧 初始化 Store 表结构...")
store.setup()
print("✅ Store 表结构初始化完成")

# --- B. 创建 Agent ---
tools = [magic_calculation, remember_user_info, recall_user_info]

agent = create_agent(
model=llm,
tools=tools,
state_schema=CrossThreadState,
system_prompt="""
你是一个具备跨线程记忆的智能助手。

你的能力:
1. 使用 remember_user_info 工具将用户的重要信息存入长期记忆(跨会话持久化)
2. 使用 recall_user_info 工具从长期记忆中检索用户信息
3. 使用 magic_calculation 工具进行特殊计算

工作流程:
- 当用户告诉你他的名字、职业、偏好等信息时,主动调用 remember_user_info 存储
- 当用户询问"你还记得我吗"或类似问题时,调用 recall_user_info 检索
- 记忆是跨会话的,即使在新的对话中也能记住用户信息
""",
store=store,
checkpointer=checkpointer
)

# ==========================================
# 4. 测试场景:跨线程记忆功能
# ==========================================

print("\n" + "=" * 70)
print("场景 1:用户 Alice 第一次对话(会话 1)")
print("=" * 70)

thread1_config = {"configurable": {"thread_id": "session_alice_001"}}

print("\n👤 用户 Alice: 你好,我是 Alice,一名 Python 开发工程师,我喜欢深度学习。")

for chunk in agent.stream(
{
"messages": [HumanMessage(content="你好,我是 Alice,一名 Python 开发工程师,我喜欢深度学习。")],
"user_id": "alice"
},
config=thread1_config,
stream_mode="values"
):
last_msg = chunk["messages"][-1]
if last_msg.type == "ai" and last_msg.content:
print(f"🤖 Agent: {last_msg.content}")
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
for tool_call in last_msg.tool_calls:
print(f" 🔧 [调用工具]: {tool_call['name']}")

print("\n" + "=" * 70)
print("场景 2:用户 Alice 第二次对话(会话 2 - 不同 thread_id)")
print("=" * 70)
print("💡 模拟:Alice 关闭浏览器,第二天重新打开,开始新会话")

thread2_config = {"configurable": {"thread_id": "session_alice_002"}}

print("\n👤 用户 Alice: 你还记得我是谁吗?我的职业是什么?")

for chunk in agent.stream(
{
"messages": [HumanMessage(content="你还记得我是谁吗?我的职业是什么?")],
"user_id": "alice"
},
config=thread2_config,
stream_mode="values"
):
last_msg = chunk["messages"][-1]
if last_msg.type == "ai" and last_msg.content:
print(f"🤖 Agent: {last_msg.content}")
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
for tool_call in last_msg.tool_calls:
print(f" 🔧 [调用工具]: {tool_call['name']}")

print("\n" + "=" * 70)
print("场景 3:用户 Bob 的对话(不同用户)")
print("=" * 70)

thread3_config = {"configurable": {"thread_id": "session_bob_001"}}

print("\n👤 用户 Bob: 你好,我是 Bob,一名产品经理,帮我算一下 10 + 20 的特殊结果。")

for chunk in agent.stream(
{
"messages": [HumanMessage(content="你好,我是 Bob,一名产品经理,帮我算一下 10 + 20 的特殊结果。")],
"user_id": "bob"
},
config=thread3_config,
stream_mode="values"
):
last_msg = chunk["messages"][-1]
if last_msg.type == "ai" and last_msg.content:
print(f"🤖 Agent: {last_msg.content}")
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
for tool_call in last_msg.tool_calls:
print(f" 🔧 [调用工具]: {tool_call['name']}")

print("\n" + "=" * 70)
print("场景 4:Alice 第三次对话(验证记忆隔离)")
print("=" * 70)
print("💡 验证:Alice 的记忆不会被 Bob 的信息污染")

thread4_config = {"configurable": {"thread_id": "session_alice_003"}}

print("\n👤 用户 Alice: 我的兴趣爱好是什么?")

for chunk in agent.stream(
{
"messages": [HumanMessage(content="我的兴趣爱好是什么?")],
"user_id": "alice"
},
config=thread4_config,
stream_mode="values"
):
last_msg = chunk["messages"][-1]
if last_msg.type == "ai" and last_msg.content:
print(f"🤖 Agent: {last_msg.content}")
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
for tool_call in last_msg.tool_calls:
print(f" 🔧 [调用工具]: {tool_call['name']}")

print("\n" + "=" * 70)
print("✅ 跨线程记忆测试完成!")
print("=" * 70)
print("\n 测试总结:")
print(" ✅ 场景 1: Alice 首次对话,Agent 自动存储用户信息到 Store")
print(" ✅ 场景 2: Alice 新会话(不同 thread_id),Agent 成功从 Store 检索记忆")
print(" ✅ 场景 3: Bob 的对话,Agent 为 Bob 创建独立的记忆空间")
print(" ✅ 场景 4: Alice 再次对话,记忆未被 Bob 的信息污染")
print("\n💡 关键特性:")
print(" - Checkpointer: 管理单个会话的对话历史(基于 thread_id)")
print(" - Store: 管理跨会话的长期记忆(基于 user_id)")
print(" - 记忆隔离: 不同用户的记忆完全隔离(通过 namespace)")
print(" - 持久化: 所有数据存储在 PostgreSQL,重启程序后依然可用")
print("=" * 70)


run_postgres_agent()

记忆核心原则

  1. 隔离性:每个用户必须分配唯一 thread_id,避免串话
  2. 持久化:生产环境必须使用数据库检查点
  3. 可控性:使用中间件实现业务逻辑与记忆分离
  4. 性能:长对话启用摘要机制,防止 token 超限

企业级架构

用户请求 → thread_id + user_id → Checkpointer(会话) + BaseStore(用户)

记忆类型选择

场景推荐方案隔离级别
单会话对话历史Checkpointerthread_id
跨会话用户档案BaseStoreuser_id
模糊知识检索VectorStore语义搜索
全部保留Checkpointer + BaseStore + VectorStore组合

BaseStore vs VectorStore

维度BaseStoreVectorStore
存储内容结构化字典非结构化文本
检索方式精确匹配语义相似搜索
写入速度小于1ms50-200ms

清理与优化

  • TTLstore.put(namespace, key, value, ttl=3600)
  • 清理store.delete(namespace, key)
  • 优化:建立索引、使用缓存、批量操作

扩展 State

核心目的

  • 跨步骤持久化上下文:Agent 执行是多步骤的(LLM调用 → 工具调用 → 结果解析),扩展的 State 字段能在所有步骤间共享。
  • 实现条件分支与动态路由:根据 State 中的字段值,决定 Agent 的下一步走向。
  • 支持多模态与复杂输入:现代 Agent 需要处理图片、文件等非文本数据,扩展到 State 中。
  • 实现记忆与持久化:扩展字段用于存储长期记忆,跨会话保持。
  • 性能监控与调试:扩展字段用于记录性能指标,便于分析优化。

实现方式

#%%
# ============ 自定义 State 扩展 ============
"""
通过 TypedDict 扩展 AgentState,添加业务字段(用户ID、偏好等)。

**为什么使用 TypedDict 而非 Pydantic?**

| 场景 | TypedDict | Pydantic |
|------|-----------|----------|
| Agent State | ✅ 轻量高效 | ❌ 过度设计 |
| FastAPI 请求体 | ❌ 需手动验证 | ✅ 原生集成 |
| 内部函数参数 | ✅ 轻量有效 | ⚠️ 不必要 |
| 数据处理流水线 | ✅ 零开销传递 | ⚠️ 转换有成本 |
| 微服务 DTO | ⚠️ 需结合 mypy | ✅ 天然支持序列化 |

LangChain 1.0 推荐使用 TypedDict 而非 Pydantic。
"""
from typing import TypedDict, Optional
from langchain.agents import AgentState, create_agent
from langgraph.checkpoint.memory import InMemorySaver

# 定义自定义 State 结构
class CustomAgentState(AgentState):
"""扩展的 Agent 状态,包含业务上下文"""
user_id: str # 用户唯一标识
preferences: dict # 用户偏好(主题、语言等)
visit_count: int # 访问次数

# ============ 定义带状态访问的工具 ============
from langchain.tools import ToolRuntime
from langgraph.types import Command
from langchain.messages import ToolMessage

# 定义工具函数:更新用户偏好
@tool
def update_user_preference(runtime: ToolRuntime, theme: str) -> Command:
"""
更新用户主题偏好,写入短期记忆

ToolRuntime 提供对 state 和 context 的访问能力:
- runtime.state: 当前状态(含自定义字段)
- runtime.context: 调用上下文
- runtime.tool_call_id: 工具调用ID
"""
# 从当前状态获取偏好(如果不存在则初始化)
current_prefs = runtime.state.get("preferences", {})
current_prefs["theme"] = theme

# 返回 Command 对象,指示状态更新
return Command(update={
"preferences": current_prefs,
"messages": [
ToolMessage(
content=f"成功更新主题为: {theme}",
tool_call_id=runtime.tool_call_id
)
]
})

# 定义工具函数:根据用户偏好生成问候
@tool
def greet_user(runtime: ToolRuntime) -> str:
"""根据用户偏好生成个性化问候"""
user_name = runtime.state.get("user_id", "访客")
prefs = runtime.state.get("preferences", {})
theme = prefs.get("theme", "默认")

return f"欢迎回来,{user_name}!当前主题: {theme}"

# ============ 创建带自定义状态的 Agent ============
def demo_custom_state():
print("\n" + "=" * 60)
print("场景 3: 自定义 State 扩展记忆维度")
print("=" * 60)

# 使用内存存储
checkpointer = InMemorySaver()

# 创建 Agent,指定自定义 state_schema
agent = create_agent(
model=model,
tools=[update_user_preference, greet_user],
state_schema=CustomAgentState, # 关键:传入自定义状态类型
checkpointer=checkpointer
)

# 配置线程 ID(用于区分不同用户)
config = {"configurable": {"thread_id": "custom_state_user"}}

# 第一轮:初始化用户信息
result1 = agent.invoke(
{
"messages": [{"role": "user", "content": "设置主题为暗黑模式"}],
"user_id": "user_789", # 自定义字段
"preferences": {"language": "zh-CN"}, # 初始偏好
"visit_count": 1
},
config=config
)
print(f"第一轮: {result1['messages'][-1].content}")
print("-" * 40)

# 第二轮:读取记忆
result2 = agent.invoke(
{"messages": [{"role": "user", "content": "打个招呼"}]},
config=config
)
print(f"第二轮: {result2['messages'][-1].content}")
print("-" * 40)

# 查看完整状态
state = agent.get_state(config)
print("当前记忆状态:")
print(f" 用户ID: {state.values.get('user_id')}")
print(f" 偏好: {state.values.get('preferences')}")
print(f" 消息数: {len(state.values['messages'])}")

# ============ 运行自定义状态示例 ============
if __name__ == "__main__":
demo_custom_state()