🏠

compose-chain_parallel 模块深度解析

1. 引言

欢迎来到 compose-chain_parallel 模块的技术深度解析文档。本模块是 CloudWeGo Eino 框架中负责构建并行处理组件的关键部分,它允许开发者以简洁的方式在链式流程中嵌入并行执行的节点组,从而提升系统的处理效率和并发能力。

在本文档中,我们将深入探讨这个模块的设计理念、实现原理、核心组件、依赖关系以及使用场景,帮助你全面理解并有效地应用这一模块。

2. 问题空间与模块定位

2.1 问题背景

在构建复杂的 AI 应用和工作流时,我们经常会遇到需要同时执行多个相互独立操作的场景。例如:

  1. 多模型并发调用:同时向多个不同的大语言模型发送相同的查询,以便比较结果或进行投票
  2. 并行数据处理:同时对同一输入进行多种不同的转换或处理
  3. 多源信息检索:同时从多个不同的数据源或检索器中获取相关信息

在传统的串行处理方式中,这些操作需要依次执行,导致总处理时间等于所有操作时间之和。而通过并行处理,我们可以将总时间降低到最慢的那个操作的时间,显著提升系统的响应速度和吞吐量。

2.2 模块定位

compose-chain_parallel 模块正是为了解决上述问题而设计的。它是 Eino 框架中 compose-graph_engine 模块下的一个子模块,专门用于在链式(Chain)工作流中创建并行执行的节点组。

该模块的核心价值在于:

  • 简化并行逻辑的构建:提供直观的 API,让开发者无需关心底层并发控制细节
  • 与链式工作流无缝集成:作为 Chain 的一个组成部分,保持了整体工作流的连贯性
  • 灵活的节点类型支持:支持各种类型的节点,包括模型、模板、工具、Lambda 函数等

3. 核心概念与心智模型

3.1 核心抽象

在理解 compose-chain_parallel 模块时,有几个核心概念需要掌握:

  1. Parallel:这是模块的核心结构体,代表一个并行执行的节点组容器。
  2. Output Key:每个添加到 Parallel 中的节点都需要一个唯一的输出键,用于在最终结果中标识该节点的输出。
  3. 节点对(nodeOptionsPair):包含实际的图节点和其配置选项的内部结构。

3.2 心智模型

理解 compose-chain_parallel 的一个有效心智模型是将其想象为**"分流器-收集器"**系统:

  1. 分流器:当工作流执行到 Parallel 节点时,输入数据会被复制并分发给 Parallel 中的所有子节点,就像水流被分配到多个平行的管道中一样。
  2. 并行处理:每个子节点在各自的"管道"中独立处理输入数据,互不干扰。
  3. 收集器:所有子节点执行完成后,它们的输出会被收集起来,组装成一个以 output key 为键的 map,然后传递给下一个节点。

这种设计使得并行逻辑的构建变得非常直观,开发者只需要关注每个子节点的功能,而无需担心并发控制、同步等底层细节。

4. 架构与数据流

4.1 模块架构

compose-chain_parallel 模块的架构相对简洁,主要由以下几个部分组成:

┌─────────────────────────────────────────────────────────────┐
│                         Parallel                              │
│  ┌───────────────────────────────────────────────────────┐  │
│  │  nodes: []nodeOptionsPair                              │  │
│  │  ┌──────────────┐  ┌──────────────┐  ┌─────────────┐ │  │
│  │  │ Node 1       │  │ Node 2       │  │ Node N      │ │  │
│  │  │ (outputKey1) │  │ (outputKey2) │  │ (outputKeyN)│ │  │
│  │  └──────────────┘  └──────────────┘  └─────────────┘ │  │
│  └───────────────────────────────────────────────────────┘  │
│  ┌───────────────────────────────────────────────────────┐  │
│  │  outputKeys: map[string]bool                           │  │
│  └───────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

4.2 构建时数据流

当构建包含 Parallel 的 Chain 时,数据流如下:

  1. 开发者创建一个 Parallel 实例
  2. 通过各种 Add* 方法向 Parallel 中添加节点,每个节点都指定一个唯一的 output key
  3. 将 Parallel 实例通过 Chain.AppendParallel() 方法添加到 Chain 中
  4. 在 AppendParallel 内部,系统会:
    • 验证 Parallel 的有效性(非空、有足够的节点等)
    • 为每个节点生成唯一的节点键
    • 将所有节点添加到底层的 Graph 中
    • 添加从起始节点到所有并行节点的边
    • 更新 Chain 的 preNodeKeys 为所有并行节点的键

4.3 运行时数据流

在运行时,当执行到 Parallel 部分时:

  1. 前一个节点的输出作为输入传递给所有并行节点
  2. 所有并行节点同时开始执行,处理相同的输入
  3. 每个节点执行完毕后,将结果存储在自己的 output key 下
  4. 等待所有节点执行完毕
  5. 将所有节点的结果组装成一个 map[string]any,其中键是节点的 output key
  6. 将这个 map 作为输入传递给 Chain 中的下一个节点

5. 核心组件深度解析

5.1 Parallel 结构体

Parallel 是模块的核心结构体,定义如下:

type Parallel struct {
    nodes      []nodeOptionsPair
    outputKeys map[string]bool
    err        error
}

组件职责

  • nodes:存储所有添加到 Parallel 中的节点及其配置选项
  • outputKeys:跟踪已使用的输出键,确保唯一性
  • err:存储构建过程中可能发生的错误,实现错误传播

设计亮点

  • 使用构建器模式,所有 Add* 方法都返回 *Parallel,支持链式调用
  • 延迟错误处理,错误会在添加到 Chain 时才被检查
  • 内部使用 map 来确保输出键的唯一性

5.2 NewParallel 函数

func NewParallel() *Parallel {
    return &Parallel{
        outputKeys: make(map[string]bool),
    }
}

这个函数简单但重要,它创建并初始化一个新的 Parallel 实例。注意它只初始化了 outputKeys,而 nodes 和 err 则保持它们的零值(nil),这是符合 Go 语言习惯的做法。

5.3 Add* 方法族

Parallel 提供了一系列 Add* 方法,用于添加不同类型的节点:

  • AddChatModel:添加聊天模型节点
  • AddChatTemplate:添加聊天模板节点
  • AddToolsNode:添加工具节点
  • AddLambda:添加 Lambda 函数节点
  • AddEmbedding:添加嵌入模型节点
  • AddRetriever:添加检索器节点
  • AddLoader:添加文档加载器节点
  • AddIndexer:添加索引器节点
  • AddDocumentTransformer:添加文档转换器节点
  • AddGraph:添加子图/子链节点
  • AddPassthrough:添加直通节点

所有这些方法都遵循相似的模式:

  1. 接收输出键、节点实例和可选配置
  2. 将节点转换为内部的 graphNode 格式
  3. 调用内部的 addNode 方法进行实际添加
  4. 返回 Parallel 实例以支持链式调用

这种设计提供了一致的 API 体验,同时确保了各种类型节点的正确处理。

5.4 addNode 内部方法

addNode 是 Parallel 的内部方法,实现了节点添加的核心逻辑:

func (p *Parallel) addNode(outputKey string, node *graphNode, options *graphAddNodeOpts) *Parallel {
    // 检查是否已有错误
    if p.err != nil {
        return p
    }

    // 验证节点不为空
    if node == nil {
        p.err = fmt.Errorf("chain parallel add node invalid, node is nil")
        return p
    }

    // 确保 outputKeys 已初始化
    if p.outputKeys == nil {
        p.outputKeys = make(map[string]bool)
    }

    // 检查输出键是否重复
    if _, ok := p.outputKeys[outputKey]; ok {
        p.err = fmt.Errorf("parallel add node err, duplicate output key= %s", outputKey)
        return p
    }

    // 验证节点信息不为空
    if node.nodeInfo == nil {
        p.err = fmt.Errorf("chain parallel add node invalid, nodeInfo is nil")
        return p
    }

    // 设置节点的输出键
    node.nodeInfo.outputKey = outputKey
    
    // 添加节点到列表
    p.nodes = append(p.nodes, nodeOptionsPair{node, options})
    p.outputKeys[outputKey] = true
    
    return p
}

这个方法展示了几个重要的设计决策:

  1. 错误快速失败但延迟报告:一旦出现错误,后续操作都会被跳过,但错误会保存直到 Parallel 被添加到 Chain 时才报告
  2. 防御性编程:多处验证确保状态一致性,即使是在内部方法中
  3. 输出键唯一性保证:通过 map 来确保每个输出键只被使用一次
  4. 节点信息完整性:确保节点的 nodeInfo 存在并正确设置 outputKey

6. 与其他模块的关系

6.1 依赖关系

compose-chain_parallel 模块与以下模块有紧密的依赖关系:

  1. compose-graph_engine:Parallel 最终会被转换为 Graph 中的节点和边
  2. 各种组件模块:如 model、prompt、retriever 等,这些是可以添加到 Parallel 中的节点类型
  3. compose-chain:Parallel 主要是作为 Chain 的一部分使用的

6.2 被调用关系

主要被以下模块调用:

  1. compose-chain:通过 AppendParallel 方法将 Parallel 集成到 Chain 中
  2. 用户代码:开发者直接创建和配置 Parallel 实例

7. 设计决策与权衡

7.1 构建器模式 vs 函数选项模式

Parallel 采用了构建器模式(Builder Pattern)而不是函数选项模式(Functional Options Pattern)。

选择原因

  • 节点添加是一个累积过程,构建器模式更自然
  • 支持流畅的链式调用,提高代码可读性
  • 每个 Add* 方法都有明确的语义,比通用的选项函数更直观

权衡

  • 相比函数选项模式,构建器模式通常会产生更多的代码
  • 但在这个场景下,不同类型节点的添加逻辑确实不同,构建器模式是更合适的选择

7.2 延迟错误处理

Parallel 采用了延迟错误处理策略:错误在发生时被保存,但不会立即返回,而是等到 Parallel 被添加到 Chain 时才被检查。

选择原因

  • 支持流畅的链式调用,不需要在每一步都检查错误
  • 简化了用户代码,错误处理可以集中在一个地方

权衡

  • 错误发生的位置和报告的位置可能有距离,调试时可能稍微困难
  • 一旦发生错误,后续的操作都会被静默跳过,直到错误被报告

7.3 输出键唯一性验证

Parallel 强制要求每个节点的输出键必须唯一。

选择原因

  • 避免结果覆盖,确保每个节点的输出都能被正确获取
  • 简化下游节点的逻辑,它们可以确定每个键对应唯一的值

权衡

  • 增加了一定的约束,用户需要确保键的唯一性
  • 但这是一个合理的约束,因为重复的键在并行执行场景下几乎总是一个错误

7.4 输入复用 vs 输入分区

Parallel 设计为将相同的输入传递给所有节点,而不是将输入分区给不同节点。

选择原因

  • 更符合常见的 AI 应用场景,如多模型比较、多源信息收集等
  • 简化了使用模型,用户不需要考虑如何分区输入

权衡

  • 不适合需要将大数据集分区并行处理的场景
  • 但这可以通过其他模式(如先分区再并行)来实现,不属于本模块的核心职责

8. 使用指南与示例

8.1 基本使用

以下是使用 compose-chain_parallel 的基本步骤:

// 1. 创建 Parallel 实例
parallel := compose.NewParallel()

// 2. 添加节点,每个节点指定一个唯一的输出键
parallel.
    AddChatModel("model_a", modelA).
    AddChatModel("model_b", modelB).
    AddLambda("custom_process", myLambdaFunction)

// 3. 创建 Chain 并添加 Parallel
chain := compose.NewChain[InputType, OutputType]()
chain.
    AppendSomeNode(...).
    AppendParallel(parallel).
    AppendAnotherNode(...)

// 4. 编译并使用 Chain
runnable, err := chain.Compile(ctx)
if err != nil {
    // 处理错误
}

result, err := runnable.Invoke(ctx, input)

8.2 完整示例

让我们看一个更完整的示例,展示如何在实际场景中使用 Parallel:

func TestParallelExample(t *testing.T) {
    // 创建两个不同的聊天模型
    modelGpt := createChatModel("gpt-4")
    modelClaude := createChatModel("claude-3")
    
    // 创建一个并行组件,同时调用两个模型
    parallel := compose.NewParallel()
    parallel.
        AddChatModel("gpt_result", modelGpt).
        AddChatModel("claude_result", modelClaude).
        AddLambda("timestamp", compose.InvokableLambda(
            func(ctx context.Context, input any) (string, error) {
                return time.Now().Format(time.RFC3339), nil
            }))
    
    // 创建一个处理并行结果的 Lambda
    resultProcessor := compose.InvokableLambda(
        func(ctx context.Context, results map[string]any) (string, error) {
            gptResult := results["gpt_result"].(*schema.Message)
            claudeResult := results["claude_result"].(*schema.Message)
            timestamp := results["timestamp"].(string)
            
            // 这里可以比较、聚合或选择结果
            return fmt.Sprintf("[%s]\nGPT: %s\nClaude: %s", 
                timestamp, gptResult.Content, claudeResult.Content), nil
        })
    
    // 构建完整的链
    chain := compose.NewChain[*schema.Message, string]()
    chain.
        AppendParallel(parallel).
        AppendLambda(resultProcessor)
    
    // 编译和运行
    runnable, err := chain.Compile(context.Background())
    if err != nil {
        t.Fatalf("Failed to compile chain: %v", err)
    }
    
    input := schema.UserMessage("What is AI?")
    result, err := runnable.Invoke(context.Background(), input)
    if err != nil {
        t.Fatalf("Failed to invoke chain: %v", err)
    }
    
    t.Logf("Result: %s", result)
}

这个示例展示了如何:

  1. 同时调用两个不同的模型
  2. 添加一个自定义的 Lambda 来提供额外信息(时间戳)
  3. 处理并行执行的结果,将它们组合成一个单一的输出

8.3 嵌套使用

Parallel 也可以与其他结构嵌套使用,例如在 Branch 中使用 Parallel,或者在 Parallel 中使用 Graph/Chain:

// 创建一个内部链
innerChain := compose.NewChain[map[string]any, string]()
innerChain.AppendLambda(someProcessingLambda)

// 创建并行组件,其中包含一个链
parallel := compose.NewParallel()
parallel.
    AddChatModel("direct_model", model).
    AddGraph("chain_result", innerChain)

// 在分支中使用并行
branch := compose.NewChainBranch(conditionFunc)
branch.
    AddParallel("branch_a", parallelA).
    AddParallel("branch_b", parallelB)

// 主链
mainChain := compose.NewChain[InputType, OutputType]()
mainChain.
    AppendBranch(branch).
    AppendSomeMoreNodes(...)

9. 注意事项与常见陷阱

9.1 输出键唯一性

最常见的错误是忘记确保输出键的唯一性。如果尝试添加具有相同输出键的两个节点,Parallel 会记录一个错误,该错误会在添加到 Chain 时暴露出来。

解决方法

  • 为每个节点使用清晰、描述性的键名
  • 考虑在键名中包含节点类型,如 "gpt_response"、"retrieval_results" 等

9.2 节点数量限制

Parallel 至少需要两个节点才能正常工作。如果你尝试添加只有一个节点的 Parallel,AppendParallel 会返回错误。

解决方法

  • 确保在添加到 Chain 之前,Parallel 中至少有两个节点
  • 如果只需要一个节点,直接添加到 Chain 中即可,不需要使用 Parallel

9.3 错误处理

由于 Parallel 采用延迟错误处理,有时很难确定错误发生的确切位置。

解决方法

  • 在构建 Parallel 时逐步测试,每添加几个节点就尝试将其添加到 Chain 中编译,看是否有错误
  • 如果遇到错误,可以通过检查 Parallel.err 字段来获取更早的错误信息(虽然这是内部字段,但在调试时可以临时访问)

9.4 下游节点输入类型

使用 Parallel 后,下一个节点会收到一个 map[string]any 类型的输入,其中键是你在 Parallel 中指定的输出键。

常见陷阱

  • 忘记下游节点需要处理 map 类型的输入
  • 对 map 中的值类型进行不正确的类型断言

解决方法

  • 确保下游节点能够处理 map[string]any 类型的输入
  • 在类型断言时使用逗号-ok 语法来避免 panic
  • 考虑使用 Lambda 节点来进行类型安全的转换

9.5 并行执行的独立性

Parallel 中的所有节点都是独立执行的,它们之间不能直接通信或共享状态。

解决方法

  • 如果节点之间需要协调,考虑使用不同的模式,如顺序执行或使用 Graph 进行更复杂的编排
  • 所有需要共享的信息都应该通过输入传递,或者在后续的聚合节点中处理

9.6 性能考虑

虽然 Parallel 可以并行执行多个操作,但这并不总是意味着性能会线性提升。

注意事项

  • 如果节点受限于相同的资源(如 API 速率限制、数据库连接池),并行执行可能不会带来预期的性能提升
  • 并行执行会增加内存使用,因为需要同时保存所有节点的输入和中间状态
  • 如果节点执行时间差异很大,总执行时间将由最慢的节点决定

优化建议

  • 考虑为不同的节点设置适当的超时
  • 如果某些节点明显比其他节点慢,可能需要重新考虑是否应该将它们放在同一个 Parallel 中
  • 对于非常大量的并行节点,考虑分批处理或使用其他模式

10. 总结

compose-chain_parallel 模块是 Eino 框架中一个强大而灵活的组件,它简化了在链式工作流中添加并行执行逻辑的过程。通过提供直观的 API 和处理底层并发细节,它使开发者能够轻松构建更高效、更强大的 AI 应用。

在本模块的设计中,我们看到了几个关键原则:

  1. 简洁性:提供简单直观的 API,隐藏复杂的并发细节
  2. 灵活性:支持多种类型的节点,并与其他结构良好组合
  3. 安全性:进行适当的验证,防止常见错误
  4. 一致性:与框架的其他部分保持一致的设计风格和使用模式

当你需要在工作流中同时执行多个独立操作时,compose-chain_parallel 是一个值得考虑的强大工具。通过合理使用它,你可以显著提升应用的性能和响应速度,同时保持代码的清晰和可维护性。

On this page