vectorizer_factory_and_model_typing 模块技术深度解析
概述
在向量数据库系统中,文本、图像等非结构化数据需要被转换为高维向量才能进行相似度检索。这个转换过程由**向量化器(Vectorizer)**完成。然而,系统面临着多模型、多后端的现实需求:可能需要在本地模型、远程 HTTP 服务或火山引擎云服务之间切换。
这个模块解决的核心问题是:如何以统一的方式创建和切换不同的向量化实现,同时保持代码的简洁性和可扩展性?
vectorizer_factory_and_model_typing 模块通过工厂模式与注册表机制的结合,提供了一种轻量级但强大的解决方案。它定义了向量化器的基础类型(ModelType 枚举)和工厂类(VectorizerFactory),使得调用方只需关心配置字典和模型类型,无需直接耦合具体的向量化实现类。
架构设计
核心抽象
┌─────────────────────────────────────────────────────────────────────┐
│ VectorizerFactory │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ _registry: Dict[str, type] │ │
│ │ ┌─────────────┬────────────────────┐ │ │
│ │ │ "local" │ LocalVectorizer │ ← 已注册 │ │
│ │ │ "http" │ HttpVectorizer │ ← 已注册 │ │
│ │ │ "volcengine"│ VolcengineVectorizer│ ← 已注册(默认) │ │
│ │ └─────────────┴────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ register(model_type, vectorizer_class) ──→ 注册新的向量化器 │
│ create(config, model_type) ──→ 创建向量化器实例 │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ BaseVectorizer (ABC) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ + vectorize_query(texts: List[str]) -> VectorizeResult │ │
│ │ + vectorize_document(data, dense_model, sparse_model) │ │
│ │ + close() │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
△
│ implements
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐
│Volcengine │ │ Local │ │ Http │
│Vectorizer │ │ Vectorizer │ │ Vectorizer │
│ │ │ │ │ │
│ 火山引擎 API │ │ 本地模型推理 │ │ 远程 HTTP 调用 │
└───────────────┘ └─────────────────┘ └─────────────────┘
数据流
典型使用场景:将文本集合批量向量化并存入向量数据库
1. 配置加载
Config Dict ──────────────────────────────────────────┐
(AK/SK/Host/ModelName/Version等) │
▼
2. 工厂创建 ┌───────────────────┐
VectorizerFactory.create(config, ModelType) ──→│ VolcengineVectorizer │
(根据model_type查找注册表,返回对应实例) └───────────────────┘
│
▼
3. 元数据适配 ┌───────────────────┐
VectorizerAdapter(vectorizer, VectorizeMeta) ──→│ 字段映射/模型参数 │
(将集合级别的向量化配置转换为向量器可用的格式) └───────────────────┘
│
▼
4. 向量化执行 ┌───────────────────┐
adapter.vectorize_raw_data(raw_data_list) ──→│ vectorize_document│
(批量向量化,返回dense+sparse向量) │ 返回VectorizeResult│
└───────────────────┘
│
▼
5. 数据入库 ┌───────────────────┐
CollectionAdapter.upsert(records) ──→│ 向量写入VectorDB │
└───────────────────┘
核心组件详解
ModelType 枚举
class ModelType(Enum):
"""Model type enumeration."""
LOCAL = "local" # 本地模型
HTTP = "http" # HTTP 远程模型
VOLCENGINE = "Volcengine" # 火山引擎远程模型
设计意图:ModelType 是整个系统的类型锚点。它将纷繁复杂的向量化实现收敛为三种主要模式,这种分类方式反映了实际的部署形态:
- LOCAL:适用于隐私敏感场景,数据不离开本地;或需要极低延迟的离线批处理
- HTTP:通用的远程调用模式,可以对接任何提供 RESTful API 的向量化服务
- VOLCENGINE:针对火山引擎embedding服务的深度定制,支持 AK/SK 签名认证和特定的 API 契约
VectorizerFactory 工厂类
class VectorizerFactory:
"""Vectorizer factory."""
_registry: Dict[str, type] = {}
@classmethod
def register(cls, model_type: ModelType, vectorizer_class: type):
cls._registry[model_type.value.lower()] = vectorizer_class
@classmethod
def create(
cls, config: Dict[str, Any], model_type: ModelType = ModelType.VOLCENGINE
) -> BaseVectorizer:
vectorizer_class = cls._registry.get(model_type.value.lower())
if not vectorizer_class:
raise ValueError(f"Unknown model type: {model_type}")
return vectorizer_class(config)
注册-创建模式:这个工厂采用了经典的注册-创建模式。模块初始化时默认注册了 VolcengineVectorizer:
VectorizerFactory.register(ModelType.VOLCENGINE, VolcengineVectorizer)
为什么不用继承而用注册? 如果使用静态继承,每增加一种新的向量化后端,就需要修改工厂类的代码。注册机制允许在运行时动态添加新的向量化实现,符合开闭原则。
大小写处理:注意到注册和查找都使用 .value.lower(),这是为了允许配置文件中使用任意大小写的模型类型名称(如 "Volcengine"、"VOLCENGINE"、"volcengine"),提供更好的配置灵活性。
依赖关系分析
上游依赖(谁调用这个模块)
| 调用方 | 调用方式 | 期望的契约 |
|---|---|---|
VectorizerAdapter |
VectorizerFactory.create(config, model_type) |
返回 BaseVectorizer 实例 |
| 业务层代码 | 直接导入 | 提供统一的 create() 入口 |
下游依赖(这个模块调用谁)
| 被调用方 | 调用关系 | 作用 |
|---|---|---|
BaseVectorizer |
继承 | 定义向量化器抽象接口 |
VolcengineVectorizer |
实例化 | 默认的向量化实现 |
ClientForDataApi |
间接依赖 | 火山引擎数据 API 客户端(通过 VolcengineVectorizer) |
设计决策与权衡
1. 注册表 vs 插件系统
当前选择:简单的类注册表(_registry: Dict[str, type])
权衡分析:
- 优点:实现极其简洁,不需要额外的依赖注入框架,新成员容易理解
- 缺点:不支持热插拔(运行时动态加载新模块),不支持优先级和版本选择
对于当前系统的复杂度而言,这种轻量级方案是合理的。如果未来需要支持插件市场或更复杂的模型路由逻辑,可以考虑迁移到 pluggy 或 entrypoints 等成熟的插件框架。
2. 默认值的选择
def create(cls, config: Dict[str, Any], model_type: ModelType = ModelType.VOLCENGINE) -> BaseVectorizer:
决策:将 VOLCENGINE 作为默认模型类型。
原因:这是公司内部云服务的模型,对于内部使用场景最友好。同时,这个设计隐含地假设了大多数部署场景都会使用火山引擎的向量化服务。
潜在风险:如果系统在外部环境使用,可能需要在配置中显式指定模型类型。建议在文档中明确说明这一假设。
3. 错误处理策略
if not vectorizer_class:
print(f"Unknown model type: {model_type.value.lower()}. Available: {list(cls._registry.keys())}")
raise ValueError(...)
观察:错误信息同时打印到 stdout 和抛出异常。
权衡:
- 好的一方面:开发阶段更容易调试,可以看到可用选项
- 风险:生产环境中
print语句可能会干扰日志收集,建议后续考虑使用标准日志框架
4. 配置对象的灵活性
def create(cls, config: Dict[str, Any], model_type: ModelType = ModelType.VOLCENGINE) -> BaseVectorizer:
使用 Dict[str, Any] 而非强类型的配置类(如 Pydantic 模型),提供了极大的灵活性——不同向量化实现可以接收完全不同的配置键。
代价:配置错误只能在运行时才能发现,没有静态类型检查的保障。这是运行时灵活性和静态安全性之间的经典权衡。
扩展点与最佳实践
如何添加新的向量化后端
- 创建向量器类:继承
BaseVectorizer,实现vectorize_query和vectorize_document方法 - 注册到工厂:
VectorizerFactory.register(ModelType.NEW_TYPE, NewVectorizer) - 在配置中使用:
config = {"ModelName": "...", ...} vectorizer = VectorizerFactory.create(config, ModelType.NEW_TYPE)
实际使用示例
from openviking.storage.vectordb.vectorize.vectorizer_factory import (
VectorizerFactory,
ModelType
)
from openviking.storage.vectordb.vectorize.vectorizer import VectorizeMeta
# 1. 创建火山引擎向量化器
config = {
"AK": os.environ.get("VOLC_AK"),
"SK": os.environ.get("VOLC_SK"),
"Host": "open.volcengineapi.com",
"Region": "cn-beijing",
"DenseModelName": "text-embedding-3-large",
"DenseModelVersion": "v1.0",
}
vectorizer = VectorizerFactory.create(config, ModelType.VOLCENGINE)
# 2. 使用适配器封装(可选,推荐)
vectorize_meta: VectorizeMeta = {
"Dense": {
"ModelName": "text-embedding-3-large",
"Version": "v1.0",
"TextField": "content",
"Dim": 1024
}
}
adapter = VectorizerAdapter(vectorizer, vectorize_meta)
# 3. 执行向量化
raw_data = [
{"content": "第一段文本", "id": "1"},
{"content": "第二段文本", "id": "2"}
]
dense_vectors, sparse_vectors = adapter.vectorize_raw_data(raw_data)
边缘情况与注意事项
1. 注册表为空
如果调用 VectorizerFactory.create() 时,没有任何向量化器被注册,会抛出 ValueError。模块初始化时会自动注册 VolcengineVectorizer,但在某些测试场景或动态导入场景下可能出现问题。
建议:在测试中使用 unittest.mock 或确保模块正确初始化。
2. 模型类型大小写敏感问题
虽然代码中使用了 .lower() 进行大小写规范化,但配置中传入无效的大小写组合(如 "VOlCeNgInE")会被正确处理。然而,如果注册时使用了非标准的字符串(如 "local-model"),查找时需要匹配精确的字符串。
3. 配置验证
VectorizerFactory.create() 不对配置进行预验证。配置错误(如缺少必需的 AK/SK)会在创建 VolcengineVectorizer 实例时抛出 ValueError。
最佳实践:在使用前对配置进行预验证,或在 VolcengineVectorizer.__init__ 中添加更友好的错误提示。
4. 资源管理
BaseVectorizer 提供了 close() 方法用于释放资源,但在当前的 VectorizerFactory 中没有对应的生命周期管理。如果向量化器持有连接池或其他资源,需要调用方负责在适当时机调用 close()。
vectorizer = VectorizerFactory.create(config)
try:
result = vectorizer.vectorize_query(["query"])
finally:
vectorizer.close() # 重要:避免资源泄漏
5. VolcengineVectorizer 的重试机制
VolcengineVectorizer 实现了指数退避重试(默认3次),但重试逻辑会捕获所有异常。对于临时性网络错误这是合适的,但可能导致真正的配置错误被延迟发现。建议在生产环境中监控重试次数。
相关模块参考
- BaseVectorizer(定义于
openviking.storage.vectordb.vectorize.base):向量化器的抽象基类,定义了vectorize_query()和vectorize_document()两个核心接口 - VolcengineVectorizer(定义于
openviking.storage.vectordb.vectorize.volcengine_vectorizer):火山引擎向量化服务的具体实现,包含 AK/SK 签名认证、请求构建、响应解析和重试逻辑 - VectorizerAdapter(定义于
openviking.storage.vectordb.vectorize.vectorizer):适配器层,位于向量化工厂和具体向量化器之间,处理字段映射(如 "content" → "text")和模型参数转换 - CollectionAdapter 体系(定义于
openviking.storage.vectordb_adapters):向量数据库适配层,在数据入库时调用 VectorizerFactory 创建向量化器,将文档转换为向量后存储