工作流状态与节点编排
核心理念
LangGraph 将 Agent 和 工作流(Workflow) 建模为有向图(Directed Graph),通过状态图(StateGraph)将多个智能体和任务节点组织成可动态调整的流程结构。
| 概念 | 角色 | 说明 |
|---|---|---|
| Agent | 乐高积木 | 负责实现具体功能,“思考+行动” |
| 工作流 | 搭建规则 | 定义积木如何组合、何时执行 |
| 状态图 | 组织结构 | 将 Agent 和节点解耦,实现“搭积木”式开发 |
一句话总结:Agent 是一种特殊的节点(Node),而工作流是整个图的运行流程(Graph)。Agent负责 “思考+行动”,工作流负责“编排+调度”。
StateGraph 状态图
基本组成
StateGraph 由三个核心组件构成:
| 组件 | 说明 |
|---|---|
| State(状态) | 共享数据结构,表示应用程序的当前快照 |
| Nodes(节点) | Python 函数,接收当前状态、执行计算、返回更新后的状态 |
| Edges(边) | Python 函数,决定根据当前状态接下来执行哪个节点 |
构建流程
from langgraph.graph import StateGraph
# 1. 定义状态
class State(TypedDict):
messages: list
result: str
# 2. 创建图构建器
builder = StateGraph(State)
# 3. 添加节点
builder.add_node("node_name", node_function)
# 4. 添加边(定义流程)
builder.add_edge("start", "node_name")
builder.add_edge("node_name", "end")
# 5. 编译图
graph = builder.compile()
要构建图,首先定义状态,然后添加节点和边,最后进行编译。
State 状态定义
TypedDict 方式
状态是图中所有节点和边的输入 schema,可以使用 TypedDict 或 Pydantic 定义:
from typing_extensions import TypedDict
from typing import Annotated
from operator import add
class State(TypedDict):
foo: int # 默认覆盖式更新
bar: Annotated[list[str], add] # 使用 add reducer 追加更新
messages: Annotated[list, add_messages] # 消息专用 reducer
Reducer 函数
Reducer 决定如何将节点的更新应用到当前状态:
| 类型 | 行为 | 适用场景 |
|---|---|---|
| 默认(无 reducer) | 覆盖更新 | 简单值替换 |
| operator.add | 列表拼接/数值相加 | 累积数据 |
| add_messages | 消息追加(支持 ID 追踪) | 对话历史 |
实例对比
默认覆盖更新:
class State(TypedDict):
foo: int
bar: list[str]
# 输入: {"foo": 1, "bar": ["hi"]}
# 节点A返回: {"foo": 2}
# 结果: {"foo": 2, "bar": ["hi"]} # bar 被保留,未覆盖
# 节点B返回: {"bar": ["bye"]}
# 结果: {"foo": 2, "bar": ["bye"]} # bar 被覆盖
使用 operator.add:
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
# 输入: {"foo": 1, "bar": ["hi"]}
# 节点B返回: {"bar": ["bye"]}
# 结果: {"foo": 2, "bar": ["hi", "bye"]} # 列表合并
add_messages vs operator.add
| 特性 | operator.add | add_messages |
|---|---|---|
| 追加方式 | 直接拼接 | 追加到列表 |
| 消息 ID | 不追踪 | 追踪消息 ID |
| 现有消息更新 | 无法更新 | 支持覆盖更新 |
| 适用场景 | 简单列表累积 | 对话历史管理 |
# 推荐:使用 add_messages 管理对话历史
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
MessagesState 预定义状态
LangGraph 提供了预定义的 MessagesState,适合简单场景:
from langgraph.graph import MessagesState
# 扩展 MessagesState
class State(MessagesState):
documents: list[str] # 添加额外字段
user_info: dict # 添加额外字段
Node 节点
定义节点
节点是 Python 函数(同步或异步),接收状态作为输入,返回状态更新:
from langgraph.graph import StateGraph
from langchain_core.runnables import RunnableConfig
from typing import TypedDict
class State(TypedDict):
input: str
result: str
def my_node(state: State, config: RunnableConfig):
"""节点函数"""
user_id = config["configurable"]["user_id"]
return {"result": f"处理: {state['input']}"}
# 第二个参数是可选的
def other_node(state: State):
return {"result": "其他处理"}
添加节点
builder = StateGraph(State)
builder.add_node("my_node", my_node)
builder.add_node("other_node", other_node)
特殊节点
| 节点 | 说明 |
|---|---|
| START | 入口节点,代表用户输入进入图的位置 |
| END | 终止节点,表示流程结束 |
Edge 边
普通边
固定流程:从节点 A 直接转到节点 B
builder.add_edge("node_a", "node_b") # 必然执行
builder.add_edge(START, "first_node") # 入口
builder.add_edge("last_node", END) # 出口
条件边
动态路由:根据状态决定下一步走向
def routing_function(state: State) -> str:
"""路由函数:返回下一个节点名称"""
if some_condition:
return "node_b"
return "node_c"
# 方式一:直接返回节点名
builder.add_conditional_edges("node_a", routing_function)
# 方式二:映射到具体节点
builder.add_conditional_edges(
"node_a",
routing_function,
{
True: "node_b",
False: "node_c",
"end": END # 也可返回 END
}
)
多出口并行
一个节点可以有多个出口,目标节点将在下一个超级步骤并行执行:
# 条件边返回列表,多个节点并行执行
def routing_function(state: State):
return ["node_b", "node_c"] # 并行执行 B 和 C
builder.add_conditional_edges("node_a", routing_function)
路由函数返回值
| 返回值 | 行为 |
|---|---|
| 字符串 | 转到指定节点 |
| 布尔值 | 需配合映射字典 |
| 列表 | 多个节点并行执行 |
| "END" | 终止流程 |