如果你曾写过这样的代码——

12345678910Result process(Request req) { req = validate(req); if (!req.valid) return error; req = enrich(req); req = transform(req); req = filterFields(req); saveToDB(req); sendNotification(req); return success;}

——你一定感受过它的问题:函数体越来越长、每个步骤紧耦合、加一步就要改主流程、单元测试只能测整体。

Pipeline 模式就是为这种"多步骤顺序处理"场景而生的。它把每一步封装成独立的阶段(Stage),数据像流水线一样在阶段之间传递——每个阶段只做一件事,且只关心自己的输入和输出。

一、什么是 Pipeline 模式1.1 核心思想Pipeline 模式将复杂处理流程拆解为一系列独立的阶段(Stage),每个阶段接收数据、加工数据、输出数据,阶段之间通过一个统一的**管道(Pipeline)**串联。

1输入 → [Stage 1] → [Stage 2] → [Stage 3] → ... → [Stage N] → 输出

四个关键特征:

特征

说明

单向流动

数据从第一个阶段流入,依次经过每个阶段,最终流出

阶段独立

每个阶段不依赖其他阶段的内部实现,只依赖接口约定

可组合

阶段可以自由排列组合,形成不同的处理链

可复用

同一个阶段可以在多条 Pipeline 中复用

1.2 与责任链模式的本质区别Pipeline 和责任链(Chain of Responsibility)是两种容易混淆的模式,但它们的意图截然不同:

维度

Pipeline

责任链

数据流

每个阶段都必须处理数据

处理者可以选择不处理,跳过

终止条件

数据走完所有阶段

任一处理者处理后即可终止

数据变换

每个阶段修改数据并传递给下一阶段

通常不修改数据,只做判断

典型场景

ETL、编译管道、HTTP 中间件

审批流、事件冒泡、异常处理

阶段感知

阶段知道自己的位置(有时)

处理者不知道自己在链中的位置

一句话区分:Pipeline 是在"加工数据",责任链是在"寻找谁来处理"。

二、基础实现2.1 最小可用 Pipeline1234567891011121314151617181920212223242526// pipeline.h#include #include #include templateclass Pipeline {public: using StageFunc = std::function; Pipeline& addStage(StageFunc stage) { stages.push_back(std::move(stage)); return *this; // 支持链式调用 } T execute(T input) const { T current = std::move(input); for (const auto& stage : stages) { current = stage(current); } return current; } private: std::vector stages;};

使用示例:一个最简单的字符串处理管道

1234567891011121314151617181920212223242526272829303132Pipeline textPipeline;textPipeline .addStage([](const std::string& s) { // 阶段1:移除首尾空白 auto start = s.find_first_not_of(" \t\n\r"); auto end = s.find_last_not_of(" \t\n\r"); return (start == std::string::npos) ? "" : s.substr(start, end - start + 1); }) .addStage([](const std::string& s) { // 阶段2:全部转大写 std::string result = s; std::transform(result.begin(), result.end(), result.begin(), ::toupper); return result; }) .addStage([](const std::string& s) { // 阶段3:将多个空格合并为一个 std::string result; bool lastWasSpace = false; for (char c : s) { if (std::isspace(c)) { if (!lastWasSpace) result += ' '; lastWasSpace = true; } else { result += c; lastWasSpace = false; } } return result; });std::string result = textPipeline.execute(" hello world ");// 结果:"HELLO WORLD"

2.2 带错误的 Pipeline实际项目中,处理过程可能失败。这时需要对结果类型建模:

1234567891011121314151617181920212223242526272829303132333435363738394041424344templateclass Result {public: static Result ok(T value) { return Result(true, std::move(value), ""); } static Result error(std::string msg) { return Result(false, T{}, std::move(msg)); } bool isOk() const { return success; } const T& value() const { return data; } const std::string& errorMsg() const { return errMsg; } private: bool success; T data; std::string errMsg; Result(bool ok, T val, std::string err) : success(ok), data(std::move(val)), errMsg(std::move(err)) {}};templateclass FalliblePipeline {public: using StageFunc = std::function(const T&)>; FalliblePipeline& addStage(StageFunc stage) { stages.push_back(std::move(stage)); return *this; } // 一旦失败就短路,跳过后续阶段 Result execute(T input) const { T current = std::move(input); for (const auto& stage : stages) { auto result = stage(current); if (!result.isOk()) { return result; // 短路:立即返回错误 } current = result.value(); } return Result::ok(current); } private: std::vector stages;};

短路行为是 Pipeline 的关键设计决策之一。上面选择"遇错即停",但也可以设计为"收集所有错误继续执行",取决于业务场景。

三、进阶:并行 Pipeline当 Pipeline 中某几个阶段可以并行执行时(阶段之间无数据依赖),可以进一步榨干多核性能。

3.1 并行阶段示意123 ┌→ [Stage 2a: OCR识别] ──┐输入 → [Stage 1] ─┼→ [Stage 2b: 人脸检测] ─┼→ [Stage 3: 结果聚合] → 输出 └→ [Stage 2c: 关键词提取] ┘

Stage 2a、2b、2c 互不依赖,可以并行。Stage 3 等它们全部完成后再聚合。

3.2 C++ 实现12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970#include #include templateclass ParallelPipeline {public: struct StageGroup { // 同组内的阶段可以并行执行 std::vector> stages; }; ParallelPipeline& addSequential(std::function stage) { // 单阶段 = 只含一个阶段的组 groups.push_back({{std::move(stage)}}); return *this; } ParallelPipeline& addParallel(std::vector> stages) { groups.push_back({std::move(stages)}); return *this; } T execute(T input) const { T current = std::move(input); for (const auto& group : groups) { if (group.stages.size() == 1) { // 顺序阶段 current = group.stages[0](current); } else { // 并行阶段:每个 stage 拿到的是 same 输入,输出需要合并 auto results = runParallel(current, group.stages); current = mergeResults(std::move(results)); } } return current; } private: std::vector groups; std::vector runParallel(const T& input, const std::vector>& stages) const { std::vector> futures; for (const auto& stage : stages) { futures.push_back(std::async(std::launch::async, [&stage, &input]() { return stage(input); })); } std::vector results; for (auto& f : futures) { results.push_back(f.get()); } return results; } T mergeResults(std::vector results) const { // 默认:合并所有字符串 if constexpr (std::is_same_v) { std::string merged; for (const auto& r : results) { merged += r + "\n"; } return merged; } // 其他类型:取第一个结果(可根据业务自定义) return results.empty() ? T{} : results[0]; }};

3.3 何时用并行 Pipeline并行 Pipeline 不是银弹。判断标准:

条件

适合并行?

阶段之间无数据依赖

每个阶段的耗时较均匀

阶段数量超过 CPU 核心数

❌ 反而引入调度开销

数据量小、阶段计算轻

❌ 线程创建开销大于计算本身

阶段有共享状态

❌ 需要同步,抵消并行收益

经验法则:单个阶段耗时 > 100μs 且阶段之间无依赖时,并行才有正向收益。

四、典型场景4.1 编译管道这是 Pipeline 模式最经典的实现。从源码到可执行文件,数据(源代码文本)依次经过:

1源码 → [词法分析] → [语法分析] → [语义分析] → [中间代码生成] → [优化] → [目标代码生成] → 可执行文件

LLVM 和 GCC 的内部架构都遵循这个 Pipeline 模型。每个 Pass 是一个阶段,Pass 之间通过 IR(中间表示)传递数据。

4.2 HTTP 中间件Web 框架的中间件机制本质上是 Pipeline:

1请求 → [日志] → [鉴权] → [限流] → [参数校验] → [业务处理] → 响应

Go 的 net/http 中间件、Express/Koa 的中间件、ASP.NET Core 的 Middleware Pipeline,都是同一模式的不同实现。

1234567891011121314// 伪代码:HTTP 中间件 Pipelineclass HttpPipeline { std::vector middlewares; Response handle(Request req) { auto handler = wrapHandler(businessLogic); for (auto it = middlewares.rbegin(); it != middlewares.rend(); ++it) { handler = it->wrap(handler); // 洋葱模型包装 } return handler(req); }};// 执行顺序:日志 → 鉴权 → 限流 → 业务 → 限流 → 鉴权 → 日志// ────── 请求方向 → ← 响应方向 ──────

4.3 ETL 数据处理在数据工程中,ETL(Extract-Transform-Load)流程天然适合 Pipeline:

1数据源 → [抽取] → [清洗] → [转换] → [校验] → [聚合] → [加载到数据仓库]

4.4 图像/视频处理12原始帧 → [解码] → [缩放] → [降噪] → [色彩校正] → [人脸检测] → [编码输出] ↘ [OCR识别] ↗

两个下游阶段(人脸检测、OCR)可以并行。

五、设计考量5.1 阶段粒度太粗:一个阶段做太多事,丧失灵活性和可复用性太细:阶段数量爆炸,Pipeline 的执行开销超过业务逻辑

经验法则:一个阶段 = 一个明确的、可独立命名的职责。如果命名时不得不使用"和"字("解析和校验"),就该拆成两个。

5.2 阶段间数据格式是所有阶段共享同一数据类型,还是每个阶段有不同的输入/输出类型?

123456789方案 A(统一类型): Pipeline pipeline; 优点:简单 缺点:Request 会成为"上帝对象",携带所有阶段可能需要的字段方案 B(类型转换): Pipeline pipeline; 优点:类型安全,每个阶段明确表达自己的输入输出 缺点:实现复杂,需要类型列表或 variant

建议:初期用方案 A(统一类型),当 Request 膨胀到不可维护时再迁移到方案 B。

5.3 错误处理策略

策略

行为

适用场景

短路停止

遇错立即返回,跳过后续阶段

多数业务场景

收集继续

记录错误仍继续执行,最后汇总

批量处理、数据校验

降级跳过

遇错跳过当前阶段,继续后续

非关键步骤(如日志、分析)

12345678910111213141516171819enum class ErrorPolicy { FailFast, CollectAndContinue, SkipOnError };templateclass ConfigurablePipeline { FalliblePipeline mainPipeline; FalliblePipeline fallbackPipeline; // 降级 Pipeline public: Result execute(const T& input, ErrorPolicy policy) { switch (policy) { case ErrorPolicy::FailFast: return mainPipeline.execute(input); case ErrorPolicy::SkipOnError: // 主 Pipeline 失败时走降级 Pipeline auto result = mainPipeline.execute(input); return result.isOk() ? result : fallbackPipeline.execute(input); } }};

5.4 生命周期管理Pipeline 对象本身应该无状态还是有状态?

无状态 Pipeline:阶段本身不保存状态,每次 execute 独立。线程安全,可复用。推荐。

有状态 Pipeline:阶段内部有缓存或计数器。需要关注线程安全和重置逻辑。

12345678910111213141516171819202122// 无状态阶段(推荐)class TrimStage {public: std::string operator()(const std::string& input) const { // const 成员函数,无副作用 auto start = input.find_first_not_of(" \t\n\r"); auto end = input.find_last_not_of(" \t\n\r"); return (start == std::string::npos) ? "" : input.substr(start, end - start + 1); }};// 有状态阶段(需要谨慎)class CounterStage { std::atomic count{0}; // 线程安全public: Request operator()(const Request& input) { count.fetch_add(1); // 利用计数做某些处理... return input; } uint64_t getCount() const { return count.load(); }};

六、实战:日志处理 Pipeline用一个完整的例子来串联所有概念——一个服务端日志处理 Pipeline。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081struct LogEntry { std::string timestamp; std::string level; // INFO, WARN, ERROR std::string service; std::string message; bool isValid = true;};// 阶段1:解析原始文本Result parseStage(const std::string& raw) { // 格式:"2026-05-06T20:11:00Z|ERROR|PaymentService|timeout:3000ms" std::istringstream ss(raw); std::string ts, level, service, msg; if (!std::getline(ss, ts, '|') || !std::getline(ss, level, '|') || !std::getline(ss, service, '|') || !std::getline(ss, msg)) { return Result::error("Parse failed: " + raw); } return Result::ok({ts, level, service, msg, true});}// 阶段2:校验Result validateStage(const LogEntry& entry) { static const std::set validLevels = {"INFO", "WARN", "ERROR", "FATAL"}; if (entry.timestamp.empty() || entry.service.empty()) { return Result::error("Missing required fields"); } if (!validLevels.count(entry.level)) { return Result::error("Invalid level: " + entry.level); } LogEntry validated = entry; validated.isValid = true; return Result::ok(validated);}// 阶段3:脱敏Result maskStage(const LogEntry& entry) { LogEntry masked = entry; // 脱敏手机号 static std::regex phoneRegex(R"(\b1[3-9]\d{9}\b)"); masked.message = std::regex_replace(entry.message, phoneRegex, "1**********"); return Result::ok(masked);}// 阶段4:分级存储(ERROR 走告警通道)Result routeStage(const LogEntry& entry) { if (entry.level == "ERROR" || entry.level == "FATAL") { sendAlert(entry); // 发送告警 } writeToStorage(entry); // 持久化 return Result::ok(entry);}// 组装 Pipelineint main() { FalliblePipeline logPipeline; logPipeline .addStage(validateStage) .addStage(maskStage) .addStage(routeStage); // 处理日志流 std::string rawLine; while (std::getline(std::cin, rawLine)) { auto parsed = parseStage(rawLine); if (!parsed.isOk()) { std::cerr << "[SKIP] " << parsed.errorMsg() << std::endl; continue; } auto result = logPipeline.execute(parsed.value()); if (!result.isOk()) { std::cerr << "[FAIL] " << result.errorMsg() << std::endl; } }}

这个例子展示了 Pipeline 的核心价值:

每个阶段独立:你可以单独测试 parseStage、validateStage、maskStage

易于扩展:想加一个"采样"阶段?addStage(samplingStage) 一行搞定

错误短路:解析失败的日志直接跳过,不会进入校验和脱敏

可替换:生产环境和测试环境可以用不同的 routeStage

七、总结Pipeline 模式的本质是用组合替代过程。它不是技术上的创新,而是组织上的优化——把一个大函数拆成一系列小阶段,然后声明式地组合它们。

三个最重要的原则:

每个阶段做且只做一件事——如果一个阶段既校验又脱敏,拆成两个

阶段之间通过数据耦合,不通过控制流耦合——阶段不调用其他阶段,只返回结果

Pipeline 本身是可配置、可替换的——不同环境、不同场景可以组装不同的 Pipeline

当你下次面对一个超过 50 行的 process() 函数时,可以问自己一个问题:这里面的步骤,哪些可以独立成一个阶段? 答案通常决定了 Pipeline 的边界。

延伸阅读

责任链模式:Pipeline 的"近亲",适用于"谁处理不确定"的场景

本系列《Runtime Architecture》中的其他运行时架构模式