多进程写入Milvus
当需要处理海量数据时,单线程写入效率较低。本文介绍如何使用多进程方式并行处理文档解析和写入。
架构设计
┌─────────────────┐ Queue ┌─────────────────┐
│ 进程1:文档解析 │ ──────────> │ 进程2:写入Milvus │
│ (Producer) │ 阻塞队列 │ (Consumer) │
└─────────────────┘ └─────────────────┘
- 进程1(Producer):解析目录下的所有 Markdown 文件,将 Document 放入队列
- 进程2(Consumer):从队列中读取 Document,写入 Milvus 数据库
- 队列:使用
multiprocessing.Queue实现进程间通信,可设置最大容量防止内存溢出
完整代码
import multiprocessing # 多进程模块
import os # 文件操作模块
from multiprocessing import Queue # 进程间通信队列
from document.markdown_parser import MarkdownParser # Markdown解析器
from document.milvus_db import MilvusVectorSave # Milvus向量存储工具类
from utils.log_utils import log # 日志工具
# 一个进程专门去做文档的解析,解析后把 document 数据放到队列中
# 进程2 专门从队列中取数据,然后写入 Milvus 数据库
# 左右两边进程数可以动态扩展
def file_parser_process(dir_path: str, output_queue: Queue, batch_size: int = 20):
"""
进程组1:解析目录下的所有 md 文件,分批放入队列
参数:
dir_path: Markdown文件所在目录
output_queue: 进程间通信队列
batch_size: 每批次包含的文档数量,默认20
"""
log.info(f'解析进程开始扫描目录:{dir_path}')
# 获取目录下的所有 .md 文件
md_files = [
os.path.join(dir_path, f)
for f in os.listdir(dir_path)
if f.endswith('.md')
]
if not md_files:
log.warning('警告:未找到任何.md文件')
output_queue.put(None) # 发送终止信号
return
parser = MarkdownParser()
doc_batch = [] # 文档批次缓冲区
for file_path in md_files:
try:
# 解析Markdown文件为Document列表
docs = parser.parse_markdown_to_documents(file_path)
if docs:
doc_batch.extend(docs)
# 达到批次大小时发送到队列中
if len(doc_batch) >= batch_size:
output_queue.put(doc_batch.copy()) # 拷贝后放入队列
doc_batch.clear() # 清空当前缓冲区的所有批次数据
except Exception as e:
log.error(f'解析失败 {file_path}:{str(e)}')
log.exception(e)
# 发送剩余文档(不足batch_size的剩余部分)
if doc_batch:
output_queue.put(doc_batch)
# 发送终止信号
output_queue.put(None)
log.info(f'解析完成,共处理 {len(md_files)} 个文件')
def milvus_writer_process(input_queue: Queue):
"""
进程组2:从队列中读取数据并写入 Milvus
参数:
input_queue: 进程间通信队列
"""
log.info("Milvus 写入进程启动")
# 创建 Milvus 向量存储实例
mv = MilvusVectorSave()
mv.create_connection() # 建立连接
total = 0 # 累计写入文档数
while True:
try:
# 阻塞等待队列数据,队列中没有数据会阻塞不动
datas = input_queue.get()
# 收到了终止信号
if datas is None:
break # 退出循环
# 写入数据到Milvus
if isinstance(datas, list):
mv.add_documents(datas)
total += len(datas)
log.info(f'累计已写入:{total} 个文档')
except Exception as e:
log.error(f'写入数据失败:{str(e)}')
log.exception(e)
log.info(f'