🏠

volcengine_data_api_integration 模块技术深度解析

模块概述

volcengine_data_api_integration 模块是 OpenViking 系统与火山引擎 VikingDB 服务之间的桥梁。它提供了两个轻量级的 HTTP 客户端——ClientForConsoleApiClientForDataApi——负责处理与 VikingDB 服务的安全通信。这两个客户端封装了 AWS Signature V4 签名算法,使得 OpenViking 能够通过标准的 HTTP 请求与 VikingDB 的两类 API 进行交互:一类用于管理 Collection 和 Index 等元数据操作(Console API),另一类用于执行向量搜索、数据写入等核心数据操作(Data API)。

为什么需要这个模块? VikingDB 作为向量数据库服务,其 API 设计分为两个层面:Console API 负责元数据管理(类似数据库的 DDL 操作),Data API 负责实际的数据 CRUD 和检索(类似数据库的 DML 操作)。这两个 API 不仅端点不同、请求格式不同,而且使用了不同的认证机制。手动处理这些差异容易出错,而本模块将认证逻辑抽象化,使上层业务代码可以专注于业务逻辑而非 HTTP 底层细节。


架构定位与数据流

在存储层中的角色

该模块位于 vectordb_domain_models_and_service_schemas 层级之下,是整个 VikingDB 集成栈的最底层组件之一。它的上游是 VolcengineCollection(位于 volcengine_collection.py),下游直接与火山引擎的 VikingDB 服务进行 HTTP 通信。

flowchart TB subgraph "上层业务" A["VolcengineCollection
(collection/volcengine_collection.py)"] end subgraph "本模块 (volcengine_data_api_integration)" B["ClientForConsoleApi"] C["ClientForDataApi"] end subgraph "火山引擎服务" D["VikingDB Console API
(vikingdb.cn-*.volcengineapi.com)"] E["VikingDB Data API
(api-vikingdb.vikingdb.cn-*.volces.com)"] end A -->|"管理操作
(create_index, update, drop)"| B A -->|"数据操作
(search, upsert, fetch)"| C B -->|"HTTPS + SigV4"| D C -->|"HTTPS + SigV4"| E

数据流转路径

以一次典型的向量搜索操作为例,数据流经过以下路径:

  1. 业务层调用VolcengineCollection.search_by_vector(...) 被业务代码调用
  2. 请求构造VolcengineCollection 将搜索参数封装为字典,调用 _data_post(path, data)
  3. 数据清洗_data_post 首先调用 _sanitize_payload 对 URI 字段进行规范化处理
  4. 客户端转发VolcengineCollection 将清洗后的数据传递给 ClientForDataApi.do_req()
  5. 请求签名ClientForDataApi 使用 SignerV4 对请求进行 AWS Signature V4 签名
  6. HTTP 发送:签名后的请求通过 requests 库发送到 VikingDB Data API
  7. 响应解析:响应被解析为 SearchResult 返回给调用方

核心组件详解

ClientForConsoleApi

ClientForConsoleApi 专门用于与 VikingDB 的 Console API 交互。Console API 负责所有元数据相关的操作,包括 Collection 的创建、更新、删除,以及 Index 的管理。

设计意图:将元数据操作与数据操作分离是 VikingDB API 的设计决策。Console API 使用标准的火山引擎 API 网关_endpoint(vikingdb.cn-{region}.volcengineapi.com),采用 Action + Version 的请求模式,类似于 AWS 的传统 API 设计。

class ClientForConsoleApi:
    _global_host = {
        "cn-beijing": "vikingdb.cn-beijing.volcengineapi.com",
        "cn-shanghai": "vikingdb.cn-shanghai.volcengineapi.com",
        "cn-guangzhou": "vikingdb.cn-guangzhou.volcengineapi.com",
    }

关键设计细节

  • 区域支持:目前仅支持北京(cn-beijing)、上海(cn-shanghai)、广州(cn-guangzhou)三个区域
  • 认证方式:使用 SignerV4 进行请求签名,这是火山引擎的标准认证方式
  • 超时配置:默认 30 秒超时,通过 DEFAULT_TIMEOUT 常量控制
  • 路径固定:所有请求都发送到根路径 /,Action 通过 Query 参数传递

ClientForDataApi

ClientForDataApi 负责与 VikingDB 的 Data API 交互,执行实际的数据操作。这是向量检索的核心入口。

设计意图:Data API 是 VikingDB 的高性能数据平面,使用了独立的域名和 API 版本(2025-06-09)。它采用 RESTful 风格的路径设计(如 /api/vikingdb/data/search/vector),支持更丰富的查询能力。

class ClientForDataApi:
    _global_host = {
        "cn-beijing": "api-vikingdb.vikingdb.cn-beijing.volces.com",
        "cn-shanghai": "api-vikingdb.vikingdb.cn-shanghai.volces.com",
        "cn-guangzhou": "api-vikingdb.vikingdb.cn-guangzhou.volces.com",
    }

关键设计细节

  • 独立端点:Data API 使用 .volces.com 域名(而非 .volcengineapi.com),这是火山引擎新版 API 的统一后缀
  • 动态路径:每个操作对应不同的 HTTP 路径(如搜索用 /api/vikingdb/data/search/vector
  • 版本锁定:使用 VIKING_DB_VERSION = "2025-06-09" 锁定 API 版本,确保请求兼容性
  • 返回原始响应:直接返回 requests.Response 对象,由调用方负责解析

请求准备流程 (prepare_request)

两个客户端的 prepare_request 方法实现了标准化的请求构建流程:

def prepare_request(self, method, path, params=None, data=None):
    # 1. 创建 Request 对象
    r = Request()
    r.set_shema("https")
    r.set_method(method)
    r.set_connection_timeout(DEFAULT_TIMEOUT)
    r.set_socket_timeout(DEFAULT_TIMEOUT)
    
    # 2. 设置标准请求头
    mheaders = {
        "Accept": "application/json",
        "Content-Type": "application/json",
        "Host": self.host,
    }
    r.set_headers(mheaders)
    
    # 3. 设置查询参数和请求体
    if params:
        r.set_query(params)
    r.set_host(self.host)
    r.set_path(path)
    if data is not None:
        r.set_body(json.dumps(data))
    
    # 4. 应用 AWS Signature V4 签名
    credentials = Credentials(self.ak, self.sk, "vikingdb", self.region)
    SignerV4.sign(r, credentials)
    return r

签名机制说明:AWS Signature V4 是 AWS 风格的请求签名算法,它将请求的各个部分(Method、Path、Headers、Query)进行哈希处理,并使用 Secret Key 生成签名。签名会被添加到 Authorization 请求头中发送给服务器。火山引擎采用这一标准,使得开发者可以使用与 AWS S3 兼容的方式进行认证。


依赖分析与契约

上游依赖:谁调用这个模块?

主要调用方VolcengineCollection(位于 collection/volcengine_collection.py

VolcengineCollectionICollection 接口的实现类,它组合使用 ClientForConsoleApiClientForDataApi 来提供完整的 Collection 操作能力:

VolcengineCollection 方法 使用的客户端 操作类型
update(), get_meta_data(), drop() ClientForConsoleApi 元数据修改
create_index(), list_indexes(), drop_index() ClientForConsoleApi 索引管理
upsert_data(), fetch_data(), delete_data() ClientForDataApi 数据写入
search_by_vector(), search_by_id() ClientForDataApi 向量检索

调用契约

  • 调用方必须提供有效的 ak(Access Key)和 sk(Secret Key)
  • 调用方必须指定有效的 region(当前仅支持 cn-beijing, cn-shanghai, cn-guangzhou
  • 调用方需要自行处理响应解析(ClientForDataApi.do_req 返回原始 requests.Response

下游依赖:这个模块调用什么?

直接依赖

  • requests:Python 标准 HTTP 库,用于发送实际的网络请求
  • volcengine:火山引擎官方 Python SDK,提供 SignerV4RequestCredentials 等认证组件
  • json:Python 标准库,用于序列化请求体

间接依赖

  • 火山引擎 VikingDB 服务本身

与其他 VikingDB 客户端的关系

在同一目录下还存在 vikingdb_clients.py,它可能提供与 VikingDB 私有部署版本兼容的客户端。本模块的 ClientForDataApiClientForConsoleApi 是针对火山引擎公有云 VikingDB 的实现。


设计决策与权衡

决策一:薄客户端设计(Thin Client)

本模块选择做"薄客户端"而非"胖客户端"。具体来说:

  • 薄客户端:只负责请求的组装、签名和发送,不包含业务逻辑
  • 胖客户端:会包含数据验证、结果映射、错误处理等业务逻辑

选择薄客户端的理由:将业务逻辑上移到 VolcengineCollection 层,使得客户端可以更专注于认证和 HTTP 传输的本质职责,同时也便于在不同 Collection 实现之间共享认证逻辑。

权衡:上层代码需要编写更多的样板代码来处理响应,但获得了更大的灵活性。

决策二:直接返回原始 HTTP 响应

do_req 方法返回 requests.Response 对象而非解析后的数据结构。

选择理由

  1. 通用性:不同的 API 操作返回不同的 JSON 结构,难以统一建模
  2. 效率:避免不必要的对象转换,调用方可以直接获取需要的字段
  3. 容错性:即使 API 返回非 200 状态码,也能让调用方决定如何处理

权衡:调用方必须检查 response.status_code 并自行处理 JSON 解析,增加了上层代码的复杂度。

决策三:静态区域映射

两个客户端都使用静态字典 _global_host 来映射区域与主机地址:

_global_host = {
    "cn-beijing": "...",
    "cn-shanghai": "...",
    "cn-guangzhou": "...",
}

选择理由

  1. 确定性:VikingDB 的区域端点是固定的,不需要动态发现
  2. 简单性:简单的字典查找比动态配置更直观
  3. 显式化:开发者可以一目了然地看到支持哪些区域

权衡

  • 新增区域需要修改代码(但 VikingDB 区域本身是有限的)
  • 不支持自定义端点(可通过 host 参数覆盖,但会绕过默认映射)

使用指南与最佳实践

初始化客户端

from openviking.storage.vectordb.collection.volcengine_clients import (
    ClientForConsoleApi,
    ClientForDataApi,
)

# 标准初始化
data_client = ClientForDataApi(
    ak="AKIAIOSFODNN7EXAMPLE",
    sk="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    region="cn-beijing"
)

# 自定义端点(用于测试或私有部署)
data_client = ClientForDataApi(
    ak="...",
    sk="...",
    region="cn-beijing",
    host="api-vikingdb.vikingdb.cn-beijing.volces.com"
)

发起请求

# POST 请求(Data API)
response = data_client.do_req(
    req_method="POST",
    req_path="/api/vikingdb/data/search/vector",
    req_body={
        "project": "my_project",
        "collection_name": "my_collection",
        "index_name": "dense_index",
        "dense_vector": [0.1, 0.2, ...],
        "limit": 10
    }
)

if response.status_code == 200:
    result = response.json()
    print(result.get("data", []))
else:
    print(f"Request failed: {response.status_code} {response.text}")

# GET 请求
response = console_client.do_req(
    req_method="GET",
    req_params={"Action": "ListVikingdbIndex", "Version": "2025-06-09"}
)

在 VolcengineCollection 中使用

更常见的用法是通过 VolcengineCollection 间接使用:

from openviking.storage.vectordb.collection.volcengine_collection import (
    get_or_create_volcengine_collection
)

# 获取或创建 Collection
collection = get_or_create_volcengine_collection(
    config={"AK": "...", "SK": "...", "Region": "cn-beijing"},
    meta_data={"CollectionName": "my_collection", "ProjectName": "default"}
)

# 执行向量搜索
result = collection.search_by_vector(
    index_name="dense_index",
    dense_vector=[0.1, 0.2, 0.3],
    limit=10
)

# 处理结果
for item in result.data:
    print(f"ID: {item.id}, Score: {item.score}, Fields: {item.fields}")

边缘情况与注意事项

1. 认证失败

如果 aksk 无效,VikingDB 会返回 403 Forbidden。SignerV4.sign() 会在请求发送前抛出异常,因此:

try:
    response = client.do_req("POST", path, req_body=data)
except Exception as e:
    # 可能是签名失败或网络问题
    print(f"Request error: {e}")

2. 区域不支持

当前仅支持三个区域,如果传入不支持的区域会抛出 KeyError

client = ClientForDataApi(ak="...", sk="..., region="cn-hangzhou")
# KeyError: 'cn-hangzhou'

3. 请求超时

默认超时为 30 秒,对于大规模向量搜索可能不够。如果需要调整,有两种方式:

# 方式一:修改全局默认(不推荐,影响所有请求)
# 修改 DEFAULT_TIMEOUT 常量

# 方式二:创建自定义客户端类
class CustomClient(ClientForDataApi):
    DEFAULT_TIMEOUT = 120  # 覆盖为 2 分钟

4. URI 规范化

VolcengineCollection 包含一个重要的数据清洗逻辑(_sanitize_payload),用于处理 URI 字段:

# 输入: {"uri": "viking://docs/readme.md"}
# 输出: {"/docs/readme.md/"}

这是因为 VikingDB 内部使用 viking:// 前缀表示虚拟路径,而 API 期望规范化的 /path/format。这个转换在 _data_post 中自动完成,但如果直接使用 ClientForDataApi,需要手动处理。

5. API 版本兼容性

VIKING_DB_VERSION = "2025-06-09" 是硬编码的。如果 VikingDB 发布新版本,可能需要更新此常量。调用方无法动态指定版本,这保证了请求的一致性,但也意味着升级 SDK 可能是获取新 API 版本的唯一途径。


相关模块参考


总结

volcengine_data_api_integration 模块是 OpenViking 与火山引擎 VikingDB 交互的底层 HTTP 抽象层。它通过封装 AWS Signature V4 签名算法,使得上层业务代码可以专注于数据操作逻辑,而无需关心认证细节。该模块采用了薄客户端的设计哲学,保持了职责的单一性,同时也为上层的 VolcengineCollection 提供了足够的灵活性来应对复杂的业务场景。

对于新加入团队的开发者,重要的是理解:这两个客户端本身不包含业务逻辑,它们是纯粹的请求发送器。所有的业务规则(如 URI 规范化、结果解析、错误处理)都发生在 VolcengineCollection 层。这种分层使得 VikingDB 的 API 升级可以限制在客户端层,而业务逻辑无需改动。

On this page