跳到主要内容

ETL 数据管道

ETL(Extract, Transform, Load)是 RAG 应用的数据处理基础。它的作用是把原始文档转换成向量数据库能用的格式,确保数据以最优方式存储,方便后续检索。

简单说,ETL 管道负责:读取文档 → 转换处理 → 存储到向量库

ETL 的核心组件

Spring AI 的 ETL 管道由三个核心接口组成:

DocumentReader(文档读取器)

负责从各种数据源读取文档,转换成 Document 对象。

public interface DocumentReader extends Supplier<List<Document>> {
default List<Document> read() {
return get();
}
}

DocumentTransformer(文档转换器)

对文档进行转换处理,比如分块、清洗、增强等。

public interface DocumentTransformer extends Function<List<Document>, List<Document>> {
default List<Document> transform(List<Document> documents) {
return apply(documents);
}
}

DocumentWriter(文档写入器)

把处理好的文档写入目标存储,比如向量数据库、文件等。

public interface DocumentWriter extends Consumer<List<Document>> {
default void write(List<Document> documents) {
accept(documents);
}
}

基本使用

最简单的 ETL 流程就是把这三个组件链起来:

// 假设我们有这三个组件
PagePdfDocumentReader pdfReader = new PagePdfDocumentReader(resource);
TokenTextSplitter textSplitter = new TokenTextSplitter(1000, 200);
VectorStore vectorStore = ...;

// 方式一:函数式风格
vectorStore.accept(textSplitter.apply(pdfReader.get()));

// 方式二:更直观的方法名
vectorStore.write(textSplitter.split(pdfReader.read()));

流程就是:读取 PDF → 文本分块 → 写入向量库。

Document 类

Document 是 ETL 管道的核心数据结构,包含:

  • 文本内容:文档的主要内容
  • 元数据:文档的附加信息(来源、页码、时间等)
  • 媒体内容:可选的图片、音频、视频
Document doc = new Document("这是文档内容");
doc.getMetadata().put("source", "product-manual.pdf");
doc.getMetadata().put("page", 1);

DocumentReader 实现

Spring AI 提供了多种文档读取器,支持不同的数据源。

PDF 文档

PagePdfDocumentReader

按页读取 PDF 文档,每页生成一个 Document

@Component
public class PdfDocumentService {

public List<Document> loadPdf(Resource pdfResource) {
PagePdfDocumentReader pdfReader = new PagePdfDocumentReader(pdfResource);
return pdfReader.get();
}
}

TikaPdfDocumentReader

基于 Apache Tika 的 PDF 读取器,功能更强大。

@Component
public class TikaPdfService {

public List<Document> loadPdfWithTika(Resource pdfResource) {
TikaPdfDocumentReader reader = new TikaPdfDocumentReader(pdfResource);
return reader.get();
}
}

文本文件

TextReader

读取纯文本文件。

@Component
public class TextDocumentService {

public List<Document> loadText(Resource textResource) {
TextReader textReader = new TextReader(textResource);
return textReader.get();
}
}

JSON 文档

JsonReader

从 JSON 文件中提取内容,可以指定哪些字段作为文档内容。

@Component
public class JsonDocumentService {

private final Resource jsonResource;

public JsonDocumentService(@Value("classpath:bikes.json") Resource jsonResource) {
this.jsonResource = jsonResource;
}

public List<Document> loadJson() {
// 使用 "description" 和 "content" 字段作为文档内容
JsonReader jsonReader = new JsonReader(
this.jsonResource,
"description",
"content"
);
return jsonReader.get();
}
}

构造函数选项:

  1. JsonReader(Resource resource) - 使用所有字段
  2. JsonReader(Resource resource, String... jsonKeysToUse) - 指定字段
  3. JsonReader(Resource resource, JsonMetadataGenerator generator, String... jsonKeysToUse) - 自定义元数据生成

网页内容

WebPdfDocumentReader

从 URL 读取 PDF 文档。

@Component
public class WebDocumentService {

public List<Document> loadWebPdf(String url) {
WebPdfDocumentReader reader = new WebPdfDocumentReader(url);
return reader.get();
}
}

其他格式

Spring AI 还支持:

  • MarkdownMarkdownReader
  • Word 文档TikaDocumentReader(通过 Tika)
  • ExcelTikaDocumentReader
  • PowerPointTikaDocumentReader

DocumentTransformer 实现

文档转换器用于处理文档,常见操作包括分块、清洗、增强等。

文本分块

TokenTextSplitter

按 token 数量分块,适合控制输入长度。

@Component
public class TextSplitterService {

public List<Document> splitDocuments(List<Document> documents) {
// 每块最多 1000 token,重叠 200 token
TokenTextSplitter splitter = new TokenTextSplitter(1000, 200);
return splitter.apply(documents);
}
}

ParagraphTextSplitter

按段落分块,保持语义完整。

@Component
public class ParagraphSplitterService {

public List<Document> splitByParagraph(List<Document> documents) {
ParagraphTextSplitter splitter = new ParagraphTextSplitter();
return splitter.apply(documents);
}
}

SentenceTextSplitter

按句子分块,更细粒度。

@Component
public class SentenceSplitterService {

public List<Document> splitBySentence(List<Document> documents) {
SentenceTextSplitter splitter = new SentenceTextSplitter();
return splitter.apply(documents);
}
}

文档增强

SummaryMetadataEnricher

使用 AI 模型为文档生成摘要,并添加到元数据中。可以生成当前文档、前一个文档、后一个文档的摘要。

@Configuration
public class EnricherConfig {

@Bean
public SummaryMetadataEnricher summaryMetadataEnricher(OpenAiChatModel chatModel) {
return new SummaryMetadataEnricher(
chatModel,
List.of(
SummaryType.PREVIOUS, // 前一个文档的摘要
SummaryType.CURRENT, // 当前文档的摘要
SummaryType.NEXT // 后一个文档的摘要
)
);
}
}

@Component
public class DocumentEnrichmentService {

private final SummaryMetadataEnricher enricher;

public DocumentEnrichmentService(SummaryMetadataEnricher enricher) {
this.enricher = enricher;
}

public List<Document> enrichDocuments(List<Document> documents) {
return this.enricher.apply(documents);
}
}

生成的元数据:

  • section_summary:当前文档的摘要
  • prev_section_summary:前一个文档的摘要(如果有)
  • next_section_summary:后一个文档的摘要(如果有)

自定义摘要模板:

String customTemplate = """
这是文档内容:
{context_str}

请总结关键主题和实体。

摘要:
""";

SummaryMetadataEnricher enricher = new SummaryMetadataEnricher(
chatModel,
List.of(SummaryType.CURRENT),
customTemplate,
MetadataMode.ALL
);

元数据处理

MetadataEnricher

为文档添加自定义元数据。

@Component
public class CustomMetadataEnricher implements DocumentTransformer {

@Override
public List<Document> apply(List<Document> documents) {
return documents.stream()
.map(doc -> {
doc.getMetadata().put("processed_at", Instant.now().toString());
doc.getMetadata().put("source_type", "manual");
return doc;
})
.collect(Collectors.toList());
}
}

DocumentWriter 实现

文档写入器负责把处理好的文档存储到目标位置。

VectorStore(向量存储)

最常用的写入器,把文档存储到向量数据库。

@Component
public class VectorStoreWriter {

private final VectorStore vectorStore;

public VectorStoreWriter(VectorStore vectorStore) {
this.vectorStore = vectorStore;
}

public void writeDocuments(List<Document> documents) {
vectorStore.add(documents);
}
}

VectorStore 会自动调用 EmbeddingClient 生成向量,然后存储。

FileDocumentWriter(文件写入器)

把文档写入文件,主要用于调试或备份。

@Component
public class FileWriterService {

public void writeToFile(List<Document> documents) {
// 写入文件,包含文档标记,使用所有元数据,追加模式
FileDocumentWriter writer = new FileDocumentWriter(
"output.txt",
true, // 包含文档标记
MetadataMode.ALL, // 写入所有元数据
false // 不追加,覆盖文件
);
writer.accept(documents);
}
}

构造函数选项:

  1. FileDocumentWriter(String fileName) - 基本写入
  2. FileDocumentWriter(String fileName, boolean withDocumentMarkers) - 是否包含标记
  3. FileDocumentWriter(String fileName, boolean withDocumentMarkers, MetadataMode metadataMode, boolean append) - 完整配置

文档标记格式:

withDocumentMarkerstrue 时,会写入如下格式:

### Doc: [index], pages:[start_page_number,end_page_number]

完整示例

示例 1:PDF 文档处理流程

@Service
public class PdfEtlService {

private final VectorStore vectorStore;
private final EmbeddingClient embeddingClient;

public PdfEtlService(VectorStore vectorStore, EmbeddingClient embeddingClient) {
this.vectorStore = vectorStore;
this.embeddingClient = embeddingClient;
}

public void processPdfDocument(Resource pdfResource) {
// 1. 读取 PDF
PagePdfDocumentReader pdfReader = new PagePdfDocumentReader(pdfResource);
List<Document> documents = pdfReader.read();

// 2. 添加元数据
documents.forEach(doc -> {
doc.getMetadata().put("source", pdfResource.getFilename());
doc.getMetadata().put("processed_at", Instant.now().toString());
});

// 3. 文本分块
TokenTextSplitter splitter = new TokenTextSplitter(1000, 200);
List<Document> chunks = splitter.split(documents);

// 4. 写入向量库
vectorStore.write(chunks);
}
}

示例 2:JSON 数据增强流程

@Service
public class JsonEnrichmentService {

private final VectorStore vectorStore;
private final ChatModel chatModel;

public JsonEnrichmentService(VectorStore vectorStore, ChatModel chatModel) {
this.vectorStore = vectorStore;
this.chatModel = chatModel;
}

public void processJsonWithSummaries(Resource jsonResource) {
// 1. 读取 JSON
JsonReader jsonReader = new JsonReader(jsonResource, "description", "content");
List<Document> documents = jsonReader.read();

// 2. 生成摘要
SummaryMetadataEnricher enricher = new SummaryMetadataEnricher(
chatModel,
List.of(SummaryType.CURRENT, SummaryType.NEXT)
);
List<Document> enriched = enricher.transform(documents);

// 3. 分块
ParagraphTextSplitter splitter = new ParagraphTextSplitter();
List<Document> chunks = splitter.split(enriched);

// 4. 存储
vectorStore.write(chunks);
}
}

示例 3:多步骤处理管道

@Service
public class ComplexEtlService {

private final VectorStore vectorStore;
private final ChatModel chatModel;

public ComplexEtlService(VectorStore vectorStore, ChatModel chatModel) {
this.vectorStore = vectorStore;
this.chatModel = chatModel;
}

public void processDocuments(Resource resource) {
// 读取
PagePdfDocumentReader reader = new PagePdfDocumentReader(resource);
List<Document> docs = reader.read();

// 添加源信息
docs.forEach(doc -> {
doc.getMetadata().put("source", resource.getFilename());
doc.getMetadata().put("type", "pdf");
});

// 生成摘要
SummaryMetadataEnricher summaryEnricher = new SummaryMetadataEnricher(
chatModel,
List.of(SummaryType.CURRENT)
);
docs = summaryEnricher.transform(docs);

// 分块
TokenTextSplitter splitter = new TokenTextSplitter(800, 150);
List<Document> chunks = splitter.split(docs);

// 为每个块添加索引
for (int i = 0; i < chunks.size(); i++) {
chunks.get(i).getMetadata().put("chunk_index", i);
}

// 写入向量库
vectorStore.write(chunks);

// 同时备份到文件(用于调试)
FileDocumentWriter fileWriter = new FileDocumentWriter(
"backup.txt",
true,
MetadataMode.ALL,
false
);
fileWriter.write(chunks);
}
}

最佳实践

1. 合理选择分块策略

不同文档类型适合不同的分块方式:

  • 技术文档:按段落分块,保持代码示例完整
  • 长文本:按 token 数量分块,控制大小
  • 结构化数据:保持表格、列表的完整性
// 技术文档用段落分块
ParagraphTextSplitter paragraphSplitter = new ParagraphTextSplitter();

// 长文本用 token 分块
TokenTextSplitter tokenSplitter = new TokenTextSplitter(1000, 200);

2. 添加有用的元数据

元数据对后续检索和过滤很重要:

documents.forEach(doc -> {
doc.getMetadata().put("source", "product-manual.pdf");
doc.getMetadata().put("chapter", "第三章");
doc.getMetadata().put("page", pageNumber);
doc.getMetadata().put("timestamp", Instant.now().toString());
doc.getMetadata().put("version", "1.0");
});

3. 使用摘要增强上下文

为文档生成摘要可以帮助模型更好地理解上下文关系:

SummaryMetadataEnricher enricher = new SummaryMetadataEnricher(
chatModel,
List.of(SummaryType.PREVIOUS, SummaryType.CURRENT, SummaryType.NEXT)
);

这样检索时,模型可以看到相邻文档的摘要,理解更大的上下文。

4. 处理特殊内容

对于代码、表格等特殊内容,要特殊处理:

// 代码块要保持完整
if (doc.getContent().contains("```")) {
// 不要在这个文档中间切分
// 或者使用专门处理代码的分块器
}

5. 错误处理

ETL 过程中可能遇到各种错误,要做好处理:

public void safeProcess(Resource resource) {
try {
PagePdfDocumentReader reader = new PagePdfDocumentReader(resource);
List<Document> docs = reader.read();

if (docs.isEmpty()) {
log.warn("文档为空: {}", resource.getFilename());
return;
}

// 处理文档...

} catch (Exception e) {
log.error("处理文档失败: {}", resource.getFilename(), e);
// 记录失败,但不中断整个流程
}
}

6. 批量处理优化

处理大量文档时,考虑批量操作:

public void batchProcess(List<Resource> resources) {
resources.parallelStream()
.forEach(resource -> {
try {
processDocument(resource);
} catch (Exception e) {
log.error("处理失败: {}", resource.getFilename(), e);
}
});
}

7. 增量更新

如果文档会更新,实现增量处理:

public void incrementalUpdate(Resource resource) {
String fileHash = calculateHash(resource);

// 检查是否已处理
if (isAlreadyProcessed(fileHash)) {
log.info("文档已处理,跳过: {}", resource.getFilename());
return;
}

// 处理文档
processDocument(resource);

// 记录处理状态
markAsProcessed(fileHash);
}

常见问题

文档读取失败

可能原因:

  1. 文件格式不支持
  2. 文件损坏
  3. 编码问题

解决方案:

  • 使用 Tika 读取器,支持更多格式
  • 检查文件完整性
  • 指定正确的字符编码

分块效果不好

可能原因:

  1. 分块大小不合适
  2. 切分位置不当,破坏了语义

解决方案:

  • 根据文档类型调整分块策略
  • 使用重叠避免边界信息丢失
  • 保持代码块、表格等结构完整

向量化失败

可能原因:

  1. EmbeddingClient 配置错误
  2. API 调用失败
  3. 文本过长

解决方案:

  • 检查 EmbeddingClient 配置
  • 添加重试机制
  • 确保文本长度在限制内

总结

ETL 管道是 RAG 应用的基础,负责把原始文档转换成向量数据库能用的格式。Spring AI 提供了完整的 ETL 支持,包括:

  • 多种文档读取器:支持 PDF、文本、JSON、网页等
  • 灵活的转换器:分块、增强、元数据处理
  • 多种写入器:向量库、文件等

关键是要根据实际场景选择合适的组件,合理配置参数,并做好错误处理。ETL 的质量直接影响后续检索的效果,所以这一步很重要。

在实际应用中,建议:

  1. 先小规模测试,验证效果
  2. 根据文档类型选择合适的分块策略
  3. 添加有用的元数据,方便后续过滤
  4. 使用摘要增强上下文理解
  5. 做好错误处理和日志记录

参考文档:Spring AI ETL Pipeline