批处理与流式
批处理流程
在使用大模型 时,如果需要同时处理多条独立请求(例如多个问题或多段文本),则可以使用 批量调用(Batch) 方法一次性提交这些请求。LangChain 中的 batch() 方法允许你同时发送一组请求,模型会在后台并行处理,然后返回所有结果:
import time
from datetime import datetime
# 记录开始时间
start_time = time.time()
print(f"⏱️ 开始时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
# 批量提问
responses = model.batch([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
])
# 记录结束时间
end_time = time.time()
total_duration = end_time - start_time
print(f"⏱️ 结束时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
print(f"📊 总耗时: {total_duration:.2f}s")
for response in responses:
print(response)
| 特性 | 说明 |
|---|---|
| 执行位置 | batch() 在客户端(Client-side)并行调用模型,而非调用模型提供商的批量API |
| 返回结果 | 默认会在所有任务完成后,统一返回完整结果列表 |
| 并行优势 | 多条独立请求可同时执行,无需等待彼此完成 |
| 适用场景 | 文档摘要、批量问答、数据预处理、多样本分类等 |
batch_as_completed()
也可以进行流式批处理,也就是每个任务完成后就立即获取结果(而不是等待全部完成),可以使用 batch_as_completed() 方法:
# 使用 model.batch_as_completed 批量提交多个问题,并逐个获取回答
for response in model.batch_as_completed([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
]):
print(response)
异步并发处理 RunnableConfig
为了更好的控制并发,我们可以在config参数中设置批处理的并发数:
import time
from datetime import datetime
from langchain_core.runnables import RunnableConfig
# 设置并发数为3
config = RunnableConfig(max_concurrency=3)
# 记录开始时间
start_time = time.time()
print(f"⏱️ 开始时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
# 并发调用模型,批量处理三个问题
responses = await model.abatch([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
], config=config)
# 记录结束时间
end_time = time.time()
total_duration = end_time - start_time
print(f"⏱️ 结束时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
print(f"📊 总耗时: {total_duration:.2f}s")
for response in responses:
print(response)
特别注意
RunnableConfig(max_concurrency=N)只是告诉 LangChain 在执行 abatch/batch 时最 多并发 N 个子任务- 是否能提速,取决于整个 pipeline 是否为 I/O-bound(等待网络/模型服务)或 CPU/GPU-bound(单次推理占满资源)
- 如果单次推理把 GPU/CPU 占满(例如单卡的 vLLM 同步推理),增加并发不会变快,甚至更慢
- 确认你使用的是
abatch(异步)而不是batch(同步)
RunnableConfig 参数详解
from langchain_core.runnables import RunnableConfig
# 配置:最多 2 个并发任务
config = RunnableConfig(
max_concurrency=2, # 最大并行执行数
timeout=8.0, # 单个任务超时时间(秒)
callbacks=None, # 触发事件回调,用于日志或监控
metadata={"request_id": "abc123", "task": "query"}, # 额外的上下文信息
)
# 准备一个输入列表
prompt_template = PromptTemplate.from_template("为生产{product}的公司起好名字?")
inputs = ["彩色袜子", "环保咖啡杯", "智能水杯"]
formatted_prompts = [prompt_template.format(product=product) for product in inputs]
# 使用异步批处理
results = await model.abatch(formatted_prompts, config=config)
for i, r in enumerate(results):
print(f"=== Query {i + 1} ===")
print(r.content)
| 属性名 | 类型 | 说明 |
|---|---|---|
max_concurrency | int | 最大并行执行数 |
timeout | float | 每个请求的最大超时时间(秒) |
callbacks | list | 触发事件回调,用于日志或监控 |
metadata | dict | 额外的上下文信息,可用于追踪 |
流式传输 (Streaming)
需要注意的是:
- 流式输出依赖于整个程序链路都支持"逐块处理"。如果程序中的某个环节必须等待完整输出(如需一次性写入数据库),则无法直接使用 Streaming;
- LangChain 1.0 进一步优化了流式机制,引入自动流式模式(Auto-streaming)。
基本流式输出
# 使用.stream()方法进行流式传输
for chunk in model.stream("用一段话描述大海。"):
print(chunk.content, end="", flush=True)
# 输出会像真正的打字效果一样,一个一个词地出现。
流式消息拼接
每个 AIMessageChunk 都可以通过加法 + 操作符拼接。LangChain 内部为此设计了"消息块相加(chunk summation)"机制:
# 初始化变量,用于累积模型返回的完整内容
full = None # 初始值为空
# 使用流式方式调用模型,逐块接收返回内容
for chunk in model.stream("你好,好久不见"):
# 如果是第一块内容,则直接赋值;否则拼接到已有内容
full = chunk if full is None else full + chunk
# 打印当前累积的文本内容
print(full.text)
print(full.content_blocks)
astream_events() 事件监听
LangChain 还支持通过 astream_events() 对语义事件进行异步流式监听,适合需要过滤不同事件类型的复杂场景。
你能看到完整语义生命周期事件,包括:
- on_chain_start
- on_prompt_start / on_prompt_end
- on_llm_start
- on_llm_stream(逐 Token)
- on_llm_end
- on_chain_end
这非常适合:
- 调试 LLM 推理过程
- 了解 LangChain pipeline 的执行顺序
- 构建 UI(如 web 前端的逐 token streaming)
- 实现日志、可观测性、监控系统
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 1. 构建最简单的 Prompt + LLM
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的 AI 助手。"),
("human", "{question}")
])
# 2. 初始化 ChatOpenAI 实例
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3. 使用管道符将 prompt 模板与 llm 连接,构建可运行的链
chain = prompt | llm
# 4. 使用 as tream_events() 监听所有语义事件
events = chain.astream_events(
{"question": "请用一句话介绍一下 LangChain 1.0 的核心思想。"},
version="v1", # 必须指明版本,v1 才有语义事件
)
async for event in events:
# 打印事件类型
print(f"[Event] type={event['event']}")
# 展示关键字段
if "data" in event:
print(" data:", event["data"])
print("-----------------------------")
stream_mode 模式对比
| 模式 | 输出内容 | 使用场景 | 优点 | 缺点 |
|---|---|---|---|---|
"values" | 每步后的完整状态 | 调试Agent执行流程 | 状态完整可追溯,无需拼接历史 | 数据量大 |
"updates" | 仅状态变更部分 | 前端增量更新UI | 数据量小传输快 | 需手动维护完整状态 |
"messages" | LLM生成的token流 | 实时显示打字效果 | 响应即时用户体验好 | 不包含工具调用信息 |
"custom" | 工具函数自定义输出 | 插入业务日志 | 灵活控制输出内容 | 需手动调用stream writer |
Agent 流式输出示例
from langchain.agents import create_agent
from langchain_core.tools import tool
# 1. 定义天气查询工具
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气信息。"""
weather_data = {
"北京": "晴朗,气温25°C",
"上海": "多云,气温28°C",
"广州": "小雨,气温30°C"
}
return f"{city}的天气是:{weather_data.get(city, '未知')}"
# 2. 定义数学计算工具
@tool
def calculate(expression: str) -> str:
"""计算一个数学表达式的结果。"""
try:
result = eval(expression)
return f"计算结果是:{result}"
except Exception as e:
return f"计算出错:{str(e)}"
# 3. 初始化LLM
llm = load_chat_model(model="gpt-4o-mini", provider="openai")
# 4. 创建Agent
agent = create_agent(
model=llm,
tools=[get_weather, calculate],
system_prompt=("""
你是一个多功能的 AI 助手,能够调用以下工具:
1. `get_weather(city)`:查询指定城市的天气信息。参数 city 为城市名称(如"北京")。
2. `calculate(expression)`:计算数学表达式 。参数 expression 为合法的 Python 表达式(如"25 - 28")。
请始终遵循以下最佳实践:
• 当用户询问天气时,先提取城市名,再调用 `get_weather`,并返回自然语言总结。
• 当用户需要计算时,先提取表达式,再调用 `calculate`,并给出易读的结果说明。
• 若问题同时涉及天气与计算,按顺序依次调用对应工具,最后整合答案。
• 禁止编造数据,必须调用工具获取结果后再回答。
• 所有数字、单位、符号务必与工具返回保持一致,避免主观臆断。
"""
)
)
# 5. 配置会话 ID
config = {"configurable": {"thread_id": "user_123"}}
# 6. 流式输出,实时观察推理过程
for step in agent.stream(
{"messages": [{"role": "user", "content": "北京和上海的天气怎么样?"}]},
config=config,
stream_mode="values" # 返回每个step步骤的完整消息列表,便于调试和观察
):
message = step["messages"][-1]
message.pretty_print()
print("-" * 50)