跳到主要内容

多进程写入Milvus

当需要处理海量数据时,单线程写入效率较低。本文介绍如何使用多进程方式并行处理文档解析和写入。

架构设计

┌─────────────────┐     Queue     ┌─────────────────┐
│ 进程1:文档解析 │ ──────────> │ 进程2:写入Milvus │
│ (Producer) │ 阻塞队列 │ (Consumer) │
└─────────────────┘ └─────────────────┘
  • 进程1(Producer):解析目录下的所有 Markdown 文件,将 Document 放入队列
  • 进程2(Consumer):从队列中读取 Document,写入 Milvus 数据库
  • 队列:使用 multiprocessing.Queue 实现进程间通信,可设置最大容量防止内存溢出

完整代码

import multiprocessing  # 多进程模块
import os # 文件操作模块
from multiprocessing import Queue # 进程间通信队列

from document.markdown_parser import MarkdownParser # Markdown解析器
from document.milvus_db import MilvusVectorSave # Milvus向量存储工具类
from utils.log_utils import log # 日志工具


# 一个进程专门去做文档的解析,解析后把 document 数据放到队列中
# 进程2 专门从队列中取数据,然后写入 Milvus 数据库
# 左右两边进程数可以动态扩展


def file_parser_process(dir_path: str, output_queue: Queue, batch_size: int = 20):
"""
进程组1:解析目录下的所有 md 文件,分批放入队列

参数:
dir_path: Markdown文件所在目录
output_queue: 进程间通信队列
batch_size: 每批次包含的文档数量,默认20
"""
log.info(f'解析进程开始扫描目录:{dir_path}')

# 获取目录下的所有 .md 文件
md_files = [
os.path.join(dir_path, f)
for f in os.listdir(dir_path)
if f.endswith('.md')
]

if not md_files:
log.warning('警告:未找到任何.md文件')
output_queue.put(None) # 发送终止信号
return

parser = MarkdownParser()
doc_batch = [] # 文档批次缓冲区

for file_path in md_files:
try:
# 解析Markdown文件为Document列表
docs = parser.parse_markdown_to_documents(file_path)
if docs:
doc_batch.extend(docs)

# 达到批次大小时发送到队列中
if len(doc_batch) >= batch_size:
output_queue.put(doc_batch.copy()) # 拷贝后放入队列
doc_batch.clear() # 清空当前缓冲区的所有批次数据

except Exception as e:
log.error(f'解析失败 {file_path}{str(e)}')
log.exception(e)

# 发送剩余文档(不足batch_size的剩余部分)
if doc_batch:
output_queue.put(doc_batch)

# 发送终止信号
output_queue.put(None)
log.info(f'解析完成,共处理 {len(md_files)} 个文件')


def milvus_writer_process(input_queue: Queue):
"""
进程组2:从队列中读取数据并写入 Milvus

参数:
input_queue: 进程间通信队列
"""
log.info("Milvus 写入进程启动")

# 创建 Milvus 向量存储实例
mv = MilvusVectorSave()
mv.create_connection() # 建立连接

total = 0 # 累计写入文档数

while True:
try:
# 阻塞等待队列数据,队列中没有数据会阻塞不动
datas = input_queue.get()

# 收到了终止信号
if datas is None:
break # 退出循环

# 写入数据到Milvus
if isinstance(datas, list):
mv.add_documents(datas)
total += len(datas)
log.info(f'累计已写入:{total} 个文档')

except Exception as e:
log.error(f'写入数据失败:{str(e)}')
log.exception(e)

log.info(f'写入进程结束,总计写入 {total} 个文档')


if __name__ == '__main__':
# ============ 配置参数 ============
md_dir = r'C:\Users\21129\Documents\百度网盘\资料\ai大模型课件\15 RAG企业知识库项目\md\md'
queue_maxsize = 20 # 队列最大容量(防止内存溢出)

# ============ 创建表结构 ============
mv = MilvusVectorSave()
mv.create_collection() # 创建Collection(只需执行一次)

# ============ 创建进程间通信队列 ============
docs_queue = Queue(maxsize=queue_maxsize)

# ============ 启动子进程 ============
# 进程1:文档解析(Producer)
parser_proc = multiprocessing.Process(
target=file_parser_process,
args=(md_dir, docs_queue)
)

# 进程2:Milvus写入(Consumer)
writer_proc = multiprocessing.Process(
target=milvus_writer_process,
args=(docs_queue,)
)

parser_proc.start()
writer_proc.start()

# 等待进程结束
parser_proc.join()
writer_proc.join()

print('系统提示:所有任务完成')

代码说明

核心组件

组件说明
multiprocessing.Process创建子进程
Queue进程间通信队列,maxsize 防止内存溢出
batch_size每处理20个文档放入队列,避免频繁IPC

关键点

  1. 终止信号output_queue.put(None) 表示解析完成,消费者收到 None 后退出循环
  2. 阻塞队列input_queue.get() 是阻塞函数,队列为空时会等待
  3. 批次处理:累积达到 batch_size 才发送到队列,减少进程间通信开销

运行结果示例

20260217 10:30:15 | INFO | 解析进程开始扫描目录:C:\Users\21129\Documents\...\md
20260217 10:30:15 | INFO | 累计已写入:20 个文档
20260217 10:30:15 | INFO | 累计已写入:40 个文档
20260217 10:30:15 | INFO | 累计已写入:60 个文档
20260217 10:30:15 | INFO | 解析完成,共处理 15 个文件
20260217 10:30:15 | INFO | 写入进程结束,总计写入 85 个文档
系统提示:所有任务完成

核心改进点

改进项说明
连接复用mv.create_connection() 在写入进程启动时调用一次
批量写入每 20 个文档为一批次写入,提高效率
终止信号output_queue.put(None) 通知消费者退出
队列容量控制maxsize=20 防止内存溢出

扩展:多进程并行解析

如果解析速度较慢,可以启动多个解析进程并行处理:

if __name__ == '__main__':
# 配置参数
md_dir = r'C:\Users\21129\Documents\百度网盘\资料\ai大模型课件\15 RAG企业知识库项目\md\md'
queue_maxsize = 20

# 创建表结构
mv = MilvusVectorSave()
mv.create_collection()

# 创建进程间通信队列
docs_queue = Queue(maxsize=queue_maxsize)

# 启动多个解析进程(Producer)
parser_procs = []
for i in range(3): # 3个解析进程
proc = multiprocessing.Process(
target=file_parser_process,
args=(md_dir, docs_queue)
)
parser_procs.append(proc)
proc.start()

# 启动写入进程(Consumer)
writer_proc = multiprocessing.Process(
target=milvus_writer_process,
args=(docs_queue,)
)
writer_proc.start()

# 等待所有进程结束
for proc in parser_procs:
proc.join()
writer_proc.join()

print('系统提示:所有任务完成')

注意:多个 Producer 时,需要发送多个 None 信号(每个解析进程发送一个),消费者需要计数收到相同数量的终止信号后才能退出。