跳到主要内容

智能小秘书案例

案例概述

需求:开发一个智能小秘书,具备以下能力:

  • 查询火车票
  • 生成分析图
  • 获取网上数据

这类应用的核心是工具调用能力,通过 MCP(Model Context Protocol)协议连接各种外部服务,让 AI 能够执行真实世界的操作。


MCP 工具

MCP(Model Context Protocol)是一种用于 AI 模型与外部系统交互的标准化协议。它允许 AI 应用连接到各种数据源和工具,如数据库、API、文件系统等。

MCP 架构

MCP 架构

MCP 服务配置示例

from langchain_mcp_adapters.client import MultiServerMCPClient
from dotenv import load_dotenv
import os

load_dotenv()
ZHIPU_API_KEY = os.environ.get('ZHIPU_API_KEY')

# MCP 服务配置
MCP_SERVERS = {
# 搜索引擎 MCP
'zhipuai_mcp': {
'url': 'https://open.bigmodel.cn/api/mcp/web_search_prime/mcp',
'transport': 'http',
'headers': {"Authorization": f"Bearer {ZHIPU_API_KEY}"}
},
# 火车票查询 MCP
'my12306_mcp': {
'url': 'https://mcp.api-inference.modelscope.net/71c1767580c047/mcp',
'transport': 'streamable_http',
},
# 图表生成 MCP
'chart_mcp': {
'url': 'https://mcp.api-inference.modelscope.net/17efa5fc50dc48/mcp',
'transport': 'streamable_http',
}
}

# 创建 MCP 客户端
mcp_client = MultiServerMCPClient(MCP_SERVERS)

# 获取工具列表
tools = await mcp_client.get_tools()

自定义工具调用

当内置的 ToolNode 无法满足需求时,可以自定义工具节点来实现:

  • 异步调用
  • 并发执行多个工具
  • 参数验证
  • 异常处理
  • 人工介入

工具调用时机

什么时候调用工具?

大模型返回的 AIMessage 中包含调用工具的请求或指令时。此时:

  • AIMessage 内容为空
  • 但包含 tool_calls 列表,记录了要调用的工具、参数等信息

并发执行

一个 AIMessage 中可能包含多个工具调用请求,这些工具可以并行执行以提高效率。


ToolNode

ToolNode 是 LangGraph 预置的工具节点,用于执行 AIMessage 中请求的工具调用。

基本用法

from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool

@tool
def calculator(a: int, b: int) -> int:
"""加法计算器"""
return a + b

# 创建工具节点
tool_node = ToolNode([calculator])

核心特性

特性说明
并发执行多个工具调用会并行运行
输出格式返回 ToolMessage 列表
状态键支持自定义 message_key

工作原理

AIMessage (含 tool_calls)


┌───────────────────┐
│ ToolNode │
│ ┌─────────────┐ │
│ │ 工具1 (异步) │ │
│ ├─────────────┤ │
│ │ 工具2 (异步) │ │ ← 并发执行
│ ├─────────────┤ │
│ │ 工具3 (异步) │ │
│ └─────────────┘ │
└───────────────────┘


ToolMessage 列表

人机协作

在 AI 应用中,模型输出可能需要人工验证、修正或补充额外上下文。LangGraph 提供了人机协作功能,允许在工作流的任意点暂停,等待人工确认后再继续执行。

核心组件

组件作用
interrupt()在节点内调用,暂停图执行,等待人工输入
Command恢复执行时传递人工输入
checkpointer保存图状态,支持中断恢复

使用步骤

  1. 创建检查点:编译图时添加 checkpointer
  2. 配置中断:选择 interrupt_beforeinterrupt()
  3. 运行并等待:使用线程 ID 运行,触发中断后暂停
  4. 恢复执行:通过 Command 传递人工输入继续

实现方式一:interrupt_before

在编译图时指定中断点,在指定节点执行之前暂停。

from langgraph.checkpoint.memory import MemorySaver

# 在 tools 节点执行前中断
graph = builder.compile(
checkpointer=MemorySaver(),
interrupt_before=['tools'] # 中断后等待人工确认
)

恢复执行

config = {'configurable': {'thread_id': 'session_001'}}

# 传入 None 表示继续执行
async for chunk in graph.astream(None, config, stream_mode='values'):
result = chunk['messages'][-1].content

适用场景

  • 所有工具调用都需要人工审核
  • 固定的审批流程
  • 简单的一对一中断

实现方式二:interrupt()

在节点内部调用 interrupt(),可以传递自定义信息给审核者。

from langgraph.types import interrupt

def tool_node(state):
tool_call = state['messages'][-1].tool_calls[0]

# 触发中断,传递审核信息
response = interrupt(
f"即将执行工具: {tool_call['name']}\n"
f"参数: {tool_call['args']}\n"
f"请确认是否执行?(y/n)"
)

# 根据人工响应继续处理
if response['answer'].lower() != 'y':
return {"messages": [...]}

恢复执行

from langgraph.types import Command

# 传递人工输入
human_cmd = Command(resume={'answer': user_input})

async for chunk in graph.astream(human_cmd, config, stream_mode='values'):
result = chunk['messages'][-1].content

适用场景

  • 需要根据参数内容决定是否中断
  • 需要向审核者展示详细信息
  • 复杂的中断逻辑

两种方式对比

特性interrupt_beforeinterrupt()
配置位置编译时运行时
信息传递仅节点名自定义提示信息
条件中断不支持支持
复杂度简单较复杂

选择建议

  • 简单固定流程 → interrupt_before
  • 复杂灵活需求 → interrupt()

四种典型场景

场景说明
批准或拒绝在关键步骤前暂停,审查后决定是否执行
编辑状态暂停图,审查和修正状态数据
审查工具调用工具执行前暂停,审核参数或替换结果
验证输入暂停验证人工输入的有效性

完整示例:带人工审核的工具调用

from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver

# 需要审核的工具
REVIEW_TOOLS = {'web_search', 'send_email'}

class HumanReviewToolNode:
"""支持人工审核的工具节点"""

def __init__(self, tools):
self.tools = {t.name: t for t in tools}

async def __call__(self, state):
messages = state['messages']
tool_call = messages[-1].tool_calls[0]
tool_name = tool_call['name']

# 判断是否需要审核
if tool_name in REVIEW_TOOLS:
# 触发中断,传递详细信息
response = interrupt({
'tool': tool_name,
'args': tool_call['args'],
'message': f'即将执行 {tool_name},是否批准?'
})

# 处理审核结果
if response['action'] == 'reject':
return {'messages': [ToolMessage(
content=f"人工拒绝执行 {tool_name}",
name=tool_name,
tool_call_id=tool_call['id']
)]}

# 执行工具
result = await self.tools[tool_name].ainvoke(tool_call['args'])
return {'messages': [ToolMessage(
content=str(result),
name=tool_name,
tool_call_id=tool_call['id']
)]}


# 构建图
builder.add_node('tools', HumanReviewToolNode(tools))
graph = builder.compile(
checkpointer=MemorySaver(),
interrupt_before=['tools']
)


# 运行循环
async def run():
config = {'configurable': {'thread_id': 'main'}}

while True:
user_input = input("\n用户: ").strip()
if user_input.lower() in ('q', 'exit'):
break

# 检查是否有中断需要恢复
state = graph.get_state(config)

if state.next:
# 恢复中断
cmd = Command(resume={'action': 'approve' if user_input == 'y' else 'reject'})
async for chunk in graph.astream(cmd, config, stream_mode='values'):
print(f"AI: {chunk['messages'][-1].content}")
else:
# 正常执行
async for chunk in graph.astream({'messages': [('user', user_input)]}, config, stream_mode='values'):
if chunk.get('messages'):
msg = chunk['messages'][-1]
if hasattr(msg, 'tool_calls') and msg.tool_calls:
print(f"\n【待审核】工具: {msg.tool_calls[0]['name']}")
print("输入 'y' 批准,其他拒绝")
else:
print(f"AI: {msg.content}")

示例一:基础 MCP 工具调用

构建一个基本的 MCP 工具调用工作流。

import asyncio
import json
from typing import Dict, Any, List

from dotenv import load_dotenv
from langchain_core.messages import ToolMessage, AIMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.constants import END, START
from langgraph.graph import MessagesState, StateGraph

from all_llm.my_llm import llm

load_dotenv()

# MCP 配置
mcp_client = MultiServerMCPClient({
'chart_mcp': {'url': '...', 'transport': 'streamable_http'},
'my12306_mcp': {'url': '...', 'transport': 'streamable_http'},
'zhipuai_mcp': {'url': '...', 'transport': 'http'},
})

# 状态定义
class State(MessagesState):
pass

# 路由函数
def route_tools_func(state: State):
"""动态路由:有工具调用则到 tools,否则结束"""
messages = state.get('messages', [])
if not messages:
return END

ai_message = messages[-1]
if hasattr(ai_message, 'tool_calls') and len(ai_message.tool_calls) > 0:
return 'tools'
return END

# 构建图
async def create_graph():
tools = await mcp_client.get_tools()
builder = StateGraph(State)
llm_with_tools = llm.bind_tools(tools)

# 聊天节点
async def chatbot(state: State):
return {'messages': [await llm_with_tools.ainvoke(state['messages'])]}

builder.add_node('chatbot', chatbot)

# 工具节点(自定义实现)
tool_node = BasicToolsNode(tools)
builder.add_node('tools', tool_node)

# 边
builder.add_conditional_edges(
'chatbot', route_tools_func,
{'tools': 'tools', END: END}
)
builder.add_edge('tools', 'chatbot')
builder.add_edge(START, 'chatbot')

return builder.compile()

# 自定义工具节点
class BasicToolsNode:
"""异步工具节点,用于并发执行工具调用"""

def __init__(self, tools: list) -> None:
self.tools_by_name = {tool.name: tool for tool in tools}

async def __call__(self, state: Dict[str, Any], **kwargs) -> Dict[str, List[ToolMessage]]:
messages = state.get('messages')
if not messages:
raise ValueError('未找到消息内容')

message = messages[-1]
outputs = await self._execute_tool_calls(message.tool_calls)
return {'messages': outputs}

async def _execute_tool_calls(self, tool_calls: List[Dict]) -> List[ToolMessage]:

async def _invoke_tool(tool_call: Dict) -> ToolMessage:
tool = self.tools_by_name.get(tool_call['name'])
if not tool:
raise KeyError(f"未注册的工具:{tool_call['name']}")

# 异步调用
if hasattr(tool, 'ainvoke'):
tool_result = await tool.ainvoke(tool_call['args'])
else:
loop = asyncio.get_running_loop()
tool_result = await loop.run_in_executor(None, tool.invoke, tool_call['args'])

return ToolMessage(
content=json.dumps(tool_result, ensure_ascii=False),
name=tool_call['name'],
tool_call_id=tool_call['id']
)

# 并发执行所有工具
return await asyncio.gather(*[_invoke_tool(tc) for tc in tool_calls])

示例二:使用预置 ToolNode

使用 LangGraph 预置的 ToolNode 简化实现。

from langgraph.prebuilt import ToolNode, tools_condition

async def create_graph():
tools = await mcp_client.get_tools()
builder = StateGraph(State)
llm_with_tools = llm.bind_tools(tools)

async def chatbot(state: State):
return {'messages': [await llm_with_tools.ainvoke(state['messages'])]}

builder.add_node('chatbot', chatbot)

# 使用预置 ToolNode
tool_node = ToolNode(tools)
builder.add_node('tools', tool_node)

# 使用预置条件边
builder.add_conditional_edges('chatbot', tools_condition)
builder.add_edge('tools', 'chatbot')
builder.add_edge(START, 'chatbot')

return builder.compile()

示例三:使用 interrupt_before 暂停

在工具节点执行前暂停,等待人工确认。

async def create_graph():
tools = await mcp_client.get_tools()
builder = StateGraph(State)
llm_with_tools = llm.bind_tools(tools)

async def chatbot(state: State):
return {'messages': [await llm_with_tools.ainvoke(state['messages'])]}

builder.add_node('chatbot', chatbot)
builder.add_node('tools', ToolNode(tools))

builder.add_conditional_edges('chatbot', tools_condition)
builder.add_edge('tools', 'chatbot')
builder.add_edge(START, 'chatbot')

# 在 tools 节点前中断
graph = builder.compile(interrupt_before=['tools'])
return graph

示例四:带检查点的手动恢复

使用 MemorySaver 作为检查点,支持手动恢复执行。

from langgraph.checkpoint.memory import MemorySaver

async def create_graph():
tools = await mcp_client.get_tools()
builder = StateGraph(State)
llm_with_tools = llm.bind_tools(tools)

async def chatbot(state: State):
return {'messages': [await llm_with_tools.ainvoke(state['messages'])]}

builder.add_node('chatbot', chatbot)
builder.add_node('tools', ToolNode(tools))

builder.add_conditional_edges('chatbot', tools_condition)
builder.add_edge('tools', 'chatbot')
builder.add_edge(START, 'chatbot')

# 添加检查点
memory = MemorySaver()
graph = builder.compile(checkpointer=memory, interrupt_before=['tools'])
return graph

# 手动执行与恢复
async def run_graph():
graph = await create_graph()
config = {'configurable': {'thread_id': 'session_001'}}

async def execute_graph(user_input: str) -> str:
result = ''
current_state = graph.get_state(config)

if current_state.next:
# 恢复中断
tools_message = current_state.values['messages'][-1]
tool_name = tools_message.tool_calls[0]['name']

if user_input.lower() == 'y':
# 批准执行
async for chunk in graph.astream(None, config, stream_mode='values'):
result = chunk.get('messages', [{}])[-1].content
else:
# 拒绝执行,自定义返回
answer = f"人工拒绝执行工具: {tool_name},理由: {user_input}"
new_message = [
ToolMessage(content=answer, tool_call_id=tools_message.tool_calls[0]['id']),
AIMessage(content=answer)
]
graph.update_state(config, {'messages': new_message})
result = answer
else:
# 正常执行
async for chunk in graph.astream({'messages': [('user', user_input)]}, config, stream_mode='values'):
messages = chunk.get('messages', [])
if messages:
result = messages[-1].content

# 检查中断
current_state = graph.get_state(config)
if current_state.next:
tool_call = current_state.values['messages'][-1].tool_calls[0]
return f"即将执行工具: {tool_call['name']},是否批准?(y/n)"

return result

示例五:使用 interrupt() 人工审核

在工具节点内部使用 interrupt() 实现人工审核。注意:官方的 ToolNode 无法实现中断,必须自定义工具节点

自定义工具节点(支持人工审核)

import logging
from typing import Any, Dict, List
from langchain_core.messages import AIMessage, ToolMessage
from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 需要人工审核的工具列表
REVIEW_TOOLS = {'webSearchPrime', 'webSearchStd'}


class BasicToolsNode:
"""异步工具节点,支持并发执行和人工审核"""

def __init__(self, tools: list) -> None:
self.tools_by_name = {tool.name: tool for tool in tools}
logger.info(f"工具节点初始化完成,共加载 {len(tools)} 个工具")

async def __call__(self, state: Dict[str, Any], **kwargs) -> Dict[str, List[ToolMessage]]:
"""异步调用入口"""
messages = state.get('messages')
if not messages:
raise ValueError('未找到消息内容')

message: AIMessage = messages[-1]
tool_calls = message.tool_calls

if not tool_calls:
return {'messages': []}

# 检查是否有需要审核的工具
review_tools = [tc for tc in tool_calls if tc['name'] in REVIEW_TOOLS]

if review_tools:
# 有需要审核的工具,触发中断
tool_name = review_tools[0]['name']
logger.info(f"触发人工审核: {tool_name}")

# 触发中断,传递审核信息给人工
response = interrupt(
f"AI 尝试调用工具 `{tool_name}`\n"
f"参数: {json.dumps(review_tools[0]['args'], ensure_ascii=False, indent=2)}\n"
f"请选择: 批准(y) 或 提供答案"
)

if response['answer'].lower() == 'y':
# 批准执行,继续原有流程
logger.info(f"用户批准执行工具: {tool_name}")
else:
# 拒绝执行,返回自定义结果(不执行工具)
logger.info(f"用户拒绝执行工具: {tool_name}, 答案: {response['answer']}")
return {'messages': [ToolMessage(
content=f"人工审核拒绝。理由/答案: {response['answer']}",
name=tool_name,
tool_call_id=review_tools[0]['id']
)]}

# 执行工具调用
outputs = await self._execute_tool_calls(tool_calls)
return {'messages': outputs}

async def _execute_tool_calls(self, tool_calls: List[Dict]) -> List[ToolMessage]:
"""并发执行工具调用"""

async def _invoke_tool(tool_call: Dict) -> ToolMessage:
tool_name = tool_call['name']
tool = self.tools_by_name.get(tool_name)

if not tool:
raise KeyError(f"未找到工具: {tool_name}")

logger.info(f"执行工具: {tool_name}")

try:
# 异步调用
if hasattr(tool, 'ainvoke'):
result = await tool.ainvoke(tool_call['args'])
else:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, tool.invoke, tool_call['args']
)

return ToolMessage(
content=json.dumps(result, ensure_ascii=False),
name=tool_name,
tool_call_id=tool_call['id']
)
except Exception as e:
logger.error(f"工具 {tool_name} 执行失败: {e}")
raise RuntimeError(f"工具执行失败: {tool_name}") from e

# 并发执行所有工具
return await asyncio.gather(*[_invoke_tool(tc) for tc in tool_calls])

构建图并运行

# 构建图
async def create_graph():
tools = await mcp_client.get_tools()
builder = StateGraph(State)
llm_with_tools = llm.bind_tools(tools)

# 聊天节点
async def chatbot(state: State):
return {'messages': [await llm_with_tools.ainvoke(state['messages'])]}

builder.add_node('chatbot', chatbot)
builder.add_node('tools', BasicToolsNode(tools))

# 边定义
builder.add_conditional_edges('chatbot', route_tools, {'tools': 'tools', END: END})
builder.add_edge('tools', 'chatbot')
builder.add_edge(START, 'chatbot')

# 编译图(添加检查点)
graph = builder.compile(checkpointer=MemorySaver())
return graph


# 运行图
async def run_graph():
graph = await create_graph()
config = {'configurable': {'thread_id': 'main_session'}}

async def execute(user_input: str) -> str:
current_state = graph.get_state(config)

# 恢复中断(用户已输入审核结果)
if current_state.next:
human_cmd = Command(resume={'answer': user_input})
result = ''
async for chunk in graph.astream(human_cmd, config, stream_mode='values'):
messages = chunk.get('messages', [])
if messages:
result = messages[-1].content
return result

# 正常执行
result = ''
async for chunk in graph.astream(
{'messages': [('user', user_input)]},
config,
stream_mode='values'
):
messages = chunk.get('messages', [])
if messages:
result = messages[-1].content

# 检查是否触发中断
current_state = graph.get_state(config)
if current_state.next:
tool_call = current_state.values['messages'][-1].tool_calls[0]
return f"\n【人工审核】\n工具: {tool_call['name']}\n参数: {json.dumps(tool_call['args'], ensure_ascii=False)}\n\n请回复 'y' 批准,或提供答案"

return result

# 主循环
while True:
user_input = input("\n用户: ").strip()
if user_input.lower() in ('quit', 'exit'):
break

response = await execute(user_input)
print(f"\nAI: {response}")


if __name__ == '__main__':
asyncio.run(run_graph())

关键点说明

  • interrupt()BasicToolsNode 内部被调用,触发中断并传递审核信息
  • 使用 Command(resume={'answer': user_input}) 恢复执行,传递人工输入
  • 检查 graph.get_state(config).next 判断是否有待处理的中断

总结

概念说明
MCP模型上下文协议,连接 AI 与外部服务
ToolNode预置工具节点,执行 AIMessage 中的工具调用
interrupt()暂停图执行,等待人工输入
Command恢复执行,传递人工输入
checkpointer保存图状态,支持恢复和中断

通过这些组件的组合,可以构建出功能强大的智能助手,支持外部工具调用、并发执行和人工审核等复杂场景。