跳到主要内容

批处理与流式

批处理流程

在使用大模型时,如果需要同时处理多条独立请求(例如多个问题或多段文本),则可以使用 批量调用(Batch) 方法一次性提交这些请求。LangChain 中的 batch() 方法允许你同时发送一组请求,模型会在后台并行处理,然后返回所有结果:

import time
from datetime import datetime

# 记录开始时间
start_time = time.time()
print(f"⏱️ 开始时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")

# 批量提问
responses = model.batch([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
])

# 记录结束时间
end_time = time.time()
total_duration = end_time - start_time

print(f"⏱️ 结束时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
print(f"📊 总耗时: {total_duration:.2f}s")

for response in responses:
print(response)
特性说明
执行位置batch() 在客户端(Client-side)并行调用模型,而非调用模型提供商的批量API
返回结果默认会在所有任务完成后,统一返回完整结果列表
并行优势多条独立请求可同时执行,无需等待彼此完成
适用场景文档摘要、批量问答、数据预处理、多样本分类等

batch_as_completed()

也可以进行流式批处理,也就是每个任务完成后就立即获取结果(而不是等待全部完成),可以使用 batch_as_completed() 方法:

# 使用 model.batch_as_completed 批量提交多个问题,并逐个获取回答
for response in model.batch_as_completed([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
]):
print(response)

异步并发处理 RunnableConfig

为了更好的控制并发,我们可以在config参数中设置批处理的并发数:

import time
from datetime import datetime
from langchain_core.runnables import RunnableConfig

# 设置并发数为3
config = RunnableConfig(max_concurrency=3)

# 记录开始时间
start_time = time.time()
print(f"⏱️ 开始时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")

# 并发调用模型,批量处理三个问题
responses = await model.abatch([
"请介绍下你自己。",
"请问什么是机器学习?",
"你知道机器学习和深度学习区别么?"
], config=config)

# 记录结束时间
end_time = time.time()
total_duration = end_time - start_time

print(f"⏱️ 结束时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
print(f"📊 总耗时: {total_duration:.2f}s")

for response in responses:
print(response)

特别注意

  • RunnableConfig(max_concurrency=N) 只是告诉 LangChain 在执行 abatch/batch 时最多并发 N 个子任务
  • 是否能提速,取决于整个 pipeline 是否为 I/O-bound(等待网络/模型服务)或 CPU/GPU-bound(单次推理占满资源)
  • 如果单次推理把 GPU/CPU 占满(例如单卡的 vLLM 同步推理),增加并发不会变快,甚至更慢
  • 确认你使用的是 abatch(异步)而不是 batch(同步)

RunnableConfig 参数详解

from langchain_core.runnables import RunnableConfig

# 配置:最多 2 个并发任务
config = RunnableConfig(
max_concurrency=2, # 最大并行执行数
timeout=8.0, # 单个任务超时时间(秒)
callbacks=None, # 触发事件回调,用于日志或监控
metadata={"request_id": "abc123", "task": "query"}, # 额外的上下文信息
)

# 准备一个输入列表
prompt_template = PromptTemplate.from_template("为生产{product}的公司起好名字?")
inputs = ["彩色袜子", "环保咖啡杯", "智能水杯"]
formatted_prompts = [prompt_template.format(product=product) for product in inputs]

# 使用异步批处理
results = await model.abatch(formatted_prompts, config=config)

for i, r in enumerate(results):
print(f"=== Query {i + 1} ===")
print(r.content)
属性名类型说明
max_concurrencyint最大并行执行数
timeoutfloat每个请求的最大超时时间(秒)
callbackslist触发事件回调,用于日志或监控
metadatadict额外的上下文信息,可用于追踪

流式传输 (Streaming)

需要注意的是:

  1. 流式输出依赖于整个程序链路都支持"逐块处理"。如果程序中的某个环节必须等待完整输出(如需一次性写入数据库),则无法直接使用 Streaming;
  2. LangChain 1.0 进一步优化了流式机制,引入自动流式模式(Auto-streaming)。

基本流式输出

# 使用.stream()方法进行流式传输
for chunk in model.stream("用一段话描述大海。"):
print(chunk.content, end="", flush=True)

# 输出会像真正的打字效果一样,一个一个词地出现。

流式消息拼接

每个 AIMessageChunk 都可以通过加法 + 操作符拼接。LangChain 内部为此设计了"消息块相加(chunk summation)"机制:

# 初始化变量,用于累积模型返回的完整内容
full = None # 初始值为空

# 使用流式方式调用模型,逐块接收返回内容
for chunk in model.stream("你好,好久不见"):
# 如果是第一块内容,则直接赋值;否则拼接到已有内容
full = chunk if full is None else full + chunk
# 打印当前累积的文本内容
print(full.text)

print(full.content_blocks)

astream_events() 事件监听

LangChain 还支持通过 astream_events() 对语义事件进行异步流式监听,适合需要过滤不同事件类型的复杂场景。

你能看到完整语义生命周期事件,包括:

  • on_chain_start
  • on_prompt_start / on_prompt_end
  • on_llm_start
  • on_llm_stream(逐 Token)
  • on_llm_end
  • on_chain_end

这非常适合:

  • 调试 LLM 推理过程
  • 了解 LangChain pipeline 的执行顺序
  • 构建 UI(如 web 前端的逐 token streaming)
  • 实现日志、可观测性、监控系统
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 1. 构建最简单的 Prompt + LLM
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的 AI 助手。"),
("human", "{question}")
])

# 2. 初始化 ChatOpenAI 实例
llm = ChatOpenAI(model="gpt-3.5-turbo")

# 3. 使用管道符将 prompt 模板与 llm 连接,构建可运行的链
chain = prompt | llm

# 4. 使用 as tream_events() 监听所有语义事件
events = chain.astream_events(
{"question": "请用一句话介绍一下 LangChain 1.0 的核心思想。"},
version="v1", # 必须指明版本,v1 才有语义事件
)

async for event in events:
# 打印事件类型
print(f"[Event] type={event['event']}")
# 展示关键字段
if "data" in event:
print(" data:", event["data"])
print("-----------------------------")

stream_mode 模式对比

模式输出内容使用场景优点缺点
"values"每步后的完整状态调试Agent执行流程状态完整可追溯,无需拼接历史数据量大
"updates"仅状态变更部分前端增量更新UI数据量小传输快需手动维护完整状态
"messages"LLM生成的token流实时显示打字效果响应即时用户体验好不包含工具调用信息
"custom"工具函数自定义输出插入业务日志灵活控制输出内容需手动调用stream writer

Agent 流式输出示例

from langchain.agents import create_agent
from langchain_core.tools import tool

# 1. 定义天气查询工具
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气信息。"""
weather_data = {
"北京": "晴朗,气温25°C",
"上海": "多云,气温28°C",
"广州": "小雨,气温30°C"
}
return f"{city}的天气是:{weather_data.get(city, '未知')}"

# 2. 定义数学计算工具
@tool
def calculate(expression: str) -> str:
"""计算一个数学表达式的结果。"""
try:
result = eval(expression)
return f"计算结果是:{result}"
except Exception as e:
return f"计算出错:{str(e)}"

# 3. 初始化LLM
llm = load_chat_model(model="gpt-4o-mini", provider="openai")

# 4. 创建Agent
agent = create_agent(
model=llm,
tools=[get_weather, calculate],
system_prompt=("""
你是一个多功能的 AI 助手,能够调用以下工具:
1. `get_weather(city)`:查询指定城市的天气信息。参数 city 为城市名称(如"北京")。
2. `calculate(expression)`:计算数学表达式。参数 expression 为合法的 Python 表达式(如"25 - 28")。
请始终遵循以下最佳实践:
• 当用户询问天气时,先提取城市名,再调用 `get_weather`,并返回自然语言总结。
• 当用户需要计算时,先提取表达式,再调用 `calculate`,并给出易读的结果说明。
• 若问题同时涉及天气与计算,按顺序依次调用对应工具,最后整合答案。
• 禁止编造数据,必须调用工具获取结果后再回答。
• 所有数字、单位、符号务必与工具返回保持一致,避免主观臆断。
"""
)
)

# 5. 配置会话 ID
config = {"configurable": {"thread_id": "user_123"}}

# 6. 流式输出,实时观察推理过程
for step in agent.stream(
{"messages": [{"role": "user", "content": "北京和上海的天气怎么样?"}]},
config=config,
stream_mode="values" # 返回每个step步骤的完整消息列表,便于调试和观察
):
message = step["messages"][-1]
message.pretty_print()
print("-" * 50)

监控执行时间

start = time.time()
steps = 0
for chunk in agent.stream(..., stream_mode="values"):
steps += 1
elapsed = time.time() - start
print(f"步骤 {steps} 耗时:{elapsed:.2f}s")

print(f"总耗时:{time.time() - start:.2f}s,总步骤:{steps}")

捕获中间步骤错误

for chunk in agent.stream(..., stream_mode="values"):
messages = chunk["messages"]
if messages[-1].type == "error":
print(f"步骤出错:{messages[-1].content}")
# 回滚到上一步的状态
last_valid_state = messages[-2]

常见误区

误区真相
stream_mode="values" 会流式返回 LLM token它返回的是步骤级的完整状态,不是字符级token。想看token需用 stream_mode="messages"
values 和 updates 返回数据量差不多values 在每一步都返回所有历史消息,数据量线性增长;updates 只返回增量
可以混用多种 stream_mode可以同时指定多个模式(如 stream_mode=["values", "custom"]),但返回的是元组,需分别处理