C++可以写出性能高效的程序,一个原因来自语言本身的因素,例如

  1. C++程序编译器可以进行优化,编译直接得到机器码,这让编译后需要执行的指令更少(解释器性能比编译期差的主要原因就是是解释器单行编译执行,而编译器是文件编译执行,获得的信息更多,优化空间更大。明确的类型信息也让编译期获得内存信息,可以在无须创建对象情况下进行优化)
  2. 没有虚函数的C++程序编译后的执行码和C语言一样,没有golang interface{}, java 虚函数等额外的内存开销。C++的class, template等功能抽象不会带来额外的性能开销
  3. C++可以直接管理内存,轻松写出内存零拷贝的程序,无需GC额外的性能开销。

除了语言本身的因素,生态因素对于高性能同样重要。用户程序不可能每次都造轮子,如果没有高性能的库,C++不会成为性能高效程序的首选。例如Python语言的性能虽然差,但python有tensorflow, pytorch等高性能神经网络框架,这让python写出的神经网络性能同样高效。

dpdk和spdk工具链让C++开发高性能网络服务器和高性能存储服务变得容易,GPU等新硬件和cuda等生态让C++成为高性能计算的基础。C++20提供了协程支持,deepseek 开源的3FS 就是C++20高性能编程的典型例子。

C++20协程

C++20 提供了协程支持,协程可以看做任务,任务执行期间可以在某个位置暂停-继续。

C++ 提供std::coroutine_handle<>作为协程句柄, 协程句柄需要传入自定义的Promise类型,用来指定协程的返回值,初始化和退出行为等。
promise_type 必须实现以下成员函数

  1. std::coroutine_handle<promise> get_return_object() 返回coroutine_handle对象
  2. initial_suspend() 控制协程启动时是否立即挂起,返回 suspend_always 表示协程创建后挂起, 需要调用一次resume才会执行, 返回 suspend_never表示协程创建后立即执行
  3. final_suspend() 控制协程结束时是否挂起, 同样可以返回 suspend_always 或者 suspend_never
  4. yield_value(value) 接受co_yield 表达式返回值, 可以处理
  5. return_value(value) 接受co_return value返回值,可以处理
  6. return_void() 执行co_return后执行该函数
  7. unhandled_exception() 处理协程中未捕获的异常。

通过以上函数,可以实现协程创建后,销毁前,co_return/co_yield之后的行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename T>
struct promise_type {
T current_value;
// 协程挂起时的返回值
auto yield_value(T value) {
current_value = value;
return std::suspend_always{}; // 每次生成后暂停
}
// 协程初始化设置
auto initial_suspend() { return std::suspend_always{}; }
auto final_suspend() noexcept { return std::suspend_always{}; }
Generator get_return_object() { return std::coroutine_handle<promise_type>::from_promise(*this); }
void unhandled_exception() { std::terminate(); }
void return_void() {}
};

std::coroutine_handle<promise_type>;

协程一重要的关键字是co_await,表示挂起当前协程,执行流切换到其他任务。co_await 后面需要加Awaiter对象.
awaiter必须要实现的三个函数

  1. bool await_ready() const noexcept; 表示调用co_await后是否立即执行,如果返回true,则直接执行,不会挂起当前协程。
  2. void await_suspend(std::coroutine_handle<> handle) noexcept; 参数为当前协程句柄, 可以将执行流切给指定的coroutine_handle
  3. T await_resume() noexcept; 当协程处于co_await状态,调用resume时执行该函数。
1
2
3
4
5
6
7
8
9
10
struct custom_awaiter {
// 判断是否直接继续执行(true=不挂起)
bool await_ready() noexcept;

// 挂起时执行(参数为当前协程句柄)
void await_suspend(std::coroutine_handle<> h) noexcept;

// 恢复时执行的逻辑与返回值
int await_resume() noexcept;
};

协程执行例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <coroutine>
#include <cstdio>
#include <thread>

// 1. 定义协程返回类型 Task
struct Task {
struct promise_type {
Task get_return_object() {
printf ("创建协程对象\n");
return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() { // 返回suspend_always 对象, 协程初始化后挂起
printf ("初始化挂起\n");
return {};
}
std::suspend_always final_suspend() noexcept { // 协程结束后挂起
printf ("最终挂起\n");
return {};
}
void return_void() {
printf ("协程返回\n");
}
void unhandled_exception() {
std::terminate();
}
};
std::coroutine_handle<promise_type> handle;
explicit Task(std::coroutine_handle<promise_type> h) : handle(h) {}
~Task() {
if (handle) handle.destroy();
printf ("销毁协程\n");
}
// 恢复当前协程执行
void resume() {
if (!handle.done()) {
printf ("恢复协程执行\n");
handle.resume();
}
}
};

// 2. 定义可等待对象(Awaiter)
struct AsyncOperation {
bool await_ready() const {
printf("检查是否就绪\n");
return false; // 总是挂起
}
void await_suspend(std::coroutine_handle<> h) {
printf ("开始异步操作...\n");
// 新建额外线程, suspend不阻塞
std::thread([h] {
std::this_thread::sleep_for(std::chrono::seconds(1));
printf ("异步操作完成\n");
h.resume(); // 完成后恢复协程
}).detach();
}
void await_resume() {
printf ("处理操作结果\n");
}
};

// 3. 协程函数定义
Task my_coroutine() {
printf ("协程开始执行\n");
co_await AsyncOperation{}; // 等待异步操作
printf ("继续执行协程体\n");
co_await std::suspend_always{}; // 主动挂起
printf ("协程最终阶段\n");
}

// 4. 主函数
int main() {
Task task = my_coroutine(); // 创建并初始化协程, 协程挂起
printf ("首次恢复协程:\n");
task.resume(); // 第一次, 启动协程
printf ("执行流切回主线程:\n");
sleep(5);
printf ("二次恢复协程:\n");
task.resume(); // 第二次恢复

printf ("\n程序结束\n");
return 0;
}
// 执行结果
创建协程对象
初始化挂起
首次恢复协程:
恢复协程执行
协程开始执行
检查是否就绪
开始异步操作...
执行流切回主线程:
异步操作完成
处理操作结果
继续执行协程体
二次恢复协程:
恢复协程执行
协程最终阶段
协程返回
最终挂起

程序结束
销毁协程

执行流分析

  1. Task task = my_coroutine() 时,创建协程和初始化对象, 分别执行Task get_return_object() 和std::suspend_always initial_suspend()两个函数
  2. task.resume(); 执行会启动协程,协程执行到co_await,挂起。协程挂起后,执行流交给main函数, 也就是调用resume()的函数
  3. await_suspend 创建了新线程用来执行异步任务,异步任务执行完成后在新线程中调用h.resume(),协程继续在新线程执行,主线程这时候在sleep()
  4. 新线程继续执行协程,直到co_await std::suspend_always{}; 直接挂起(显然std::suspend_always{}; 也是一种awaiter表示无条件挂起)。这时候新线程执行完毕退出,但协程对象挂在co_await
  5. 最后主线程sleep()完执行task.resume(); 协程又继续执行, 直到协程执行完毕, 调用final_suspend由于final_suspend返回std::suspend_always{};,协程被挂起而不是销毁
  6. 主线程执行完毕, 最后调用handle.destroy();销毁协程

执行co_await 时将当前协程交给线程池异步执行, 当前协程切回协程resume的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
struct ThreadPoolAwaiter {
ThreadPool& pool;

bool await_ready() { return false; }

void await_suspend(std::coroutine_handle<> h) {
pool.enqueue([h] { h.resume(); }); // 提交到线程池
}

void await_resume() {}
};

co_await ThreadPoolAwaiter{my_thread_pool};

协程链式调用

我们想要像函数调用那样实现协程调用, 也是co_awaiter一个协程, 当子协程执行完毕后, 返回给父协程继续执行。

  1. 这样的子协程是一个类, 需要实现promise_type管理协程生命周期, 也需要实现Awaiter 接受co_await调用
  2. await_suspend函数会传父协程handle,需要把它记住。当子协程执行完毕, 需要在final_suspend()中把父协程恢复。

举例,

  1. 每个协程函数都返回Task类型。执行co_await Task, 会调用Task的await_suspend(), 将父协程设置成子协程的nextjob, 返回当前协程(Task同时是Awaiter和协程), 表示执行当前协程
  2. 对于PromiseBase, final_suspend() 返回一个FinalAwaiter.协程销毁后会调用FinalAwaiter的await_suspend(), 执行协程的nextjob,也就是恢复父协程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
      struct FinalAwaiter {
    auto await_ready() const noexcept -> bool { return false; }
    template <typename Promise>
    auto await_suspend(std::coroutine_handle<Promise> handle) noexcept -> void
    {
    assert(handle.done() && "handle should done here");
    auto& promise = handle.promise();
    // 这一步同时更新了promise.mNextJob==nullptr, 也就是标识task执行完了
    auto next = promise.mNextJob.exchange(nullptr);
    if (next == nullptr) {
    if (promise.getState() != nullptr) [[unlikely]] {
    promise.getState()->store(JobState::Final, std::memory_order_release);
    promise.getState()->notify_one();
    }
    } else if (next == &detail::kDetachJob) {
    if (promise.getState() != nullptr) [[unlikely]] {
    promise.getState()->store(JobState::Final, std::memory_order_release);
    promise.getState()->notify_one();
    }
    promise.mThisHandle.destroy();
    } else if (next != &detail::kEmptyJob) {
    // 把nextjob加入到loop, 由主线程执行(ExeOpt::prefInOne)
    Proactor::get().execute(next, ExeOpt::prefInOne());
    }
    }
    auto await_resume() noexcept -> void {}
    };

    struct PromiseBase {

    auto initial_suspend() noexcept -> std::suspend_always { return {}; }
    auto final_suspend() noexcept -> FinalAwaiter { return {}; }
    auto unhandled_exception() noexcept -> void { mExceptionPtr = std::current_exception(); }

    auto setNextJob(WorkerJob* next) noexcept -> void { mNextJob = next; }
    auto getNextJob() noexcept -> std::atomic<WorkerJob*>& { return mNextJob; }

    CoroJob mThisJob{this, &CoroJob::run};
    std::coroutine_handle<> mThisHandle;
    std::atomic<WorkerJob*> mNextJob{nullptr};
    std::exception_ptr mExceptionPtr;
    };

    template <typename T>
    class Task {
    public:
    using promise_type = Promise<T>;
    using coroutine_handle_type = std::coroutine_handle<promise_type>;
    using value_type = T;

    Task() noexcept = default;
    explicit Task(coroutine_handle_type handle) noexcept : mHandle(handle)
    {
    assert(mHandle != nullptr);
    mHandle.promise().setCoHandle(mHandle);
    }
    struct AwaiterBase {
    auto await_ready() const noexcept -> bool { return false; }
    template <typename Promise>
    auto await_suspend(std::coroutine_handle<Promise> handle) noexcept -> void
    {
    // 设置为handle.promise.nextjob
    mHandle.promise().setNextJob(handle.promise().getThisJob());
    mHandle.promise().setState(handle.promise().getState());
    // 执行当前协程
    Proactor::get().execute(mHandle.promise().getThisJob(), ExeOpt::prefInOne());
    }
    coroutine_handle_type mHandle;
    };

folly库实现的协程链式调用

  1. 保存父协程promise.continuation_ = continuation;
  2. 返回当前task的coro_handle, coro_, 执行当前task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
bool await_ready() noexcept { return false; }

template <typename Promise>
FOLLY_NOINLINE auto await_suspend(
coroutine_handle<Promise> continuation) noexcept {
DCHECK(coro_);
auto& promise = coro_.promise();

promise.continuation_ = continuation;

auto& calleeFrame = promise.getAsyncFrame();
calleeFrame.setReturnAddress();

if constexpr (detail::promiseHasAsyncFrame_v<Promise>) {
auto& callerFrame = continuation.promise().getAsyncFrame();
folly::pushAsyncStackFrameCallerCallee(callerFrame, calleeFrame);
return coro_;
} else {
folly::resumeCoroutineWithNewAsyncStackRoot(coro_);
return;
}
}

T await_resume() {
DCHECK(coro_);
SCOPE_EXIT {
std::exchange(coro_, {}).destroy();
};
return std::move(coro_.promise().result()).value();
}

协程和异步

我们可以思考链式协程模式的特点

  1. 每个协程Task既是一个挂起-恢复的协程任务,也是一个Awaiter。协程的await_suspend会记录父协程, final_suspend恢复父协程。
  2. 协程的阻塞任务, 包括申请锁, sleep, IO等都可以封装成一个Awaiter, 这个awaiter可以在suspend的时候挂起,如果需要调用read等阻塞调用,需要创建新线程执行
  3. 执行协程的线程会执行一个循环(eventloop),这个线程用于不会阻塞。这个线程做的事情就是1. 循环遍历,如果有协程挂起并创建新线程执行异步任务,就查看异步任务是否执行完,若执行完恢复该协程的调用。 2. 如果协程全部执行完了, 线程根据链式调用, 恢复该协程父协程的执行。3. 尝试接受新的协程任务

协程锁可以通过标志和等待队列实现,

  1. 尝试申请锁时, 如果锁未被申请, 则持有锁, 协程继续执行
  2. 如果锁已被申请, 则将当前协程加入等待队列, 并挂起当前协程
  3. 释放锁时,唤醒处于等待队列的一个协程恢复它执行
    执行协程的线程会执行一个循环,如果无协程可执行(协程全部在挂起),线程则进入下一个循环

协程也可以实现类似golang的channel。channel 由一个ringbuffer和reader, writer两个队列组成。

  1. reader 进入时, 如果ringbuffer为空,则挂起reader; ringbuffer不为空,则读取ringbuffer中的数, 同时唤醒等待队列的writer
  2. writer写入时,如果ringbuffer满了,则挂起;否则写入并唤醒所有reader; reader被唤醒时,读取writer写入的数据。
    显然协程的一个核心是《唤醒》的实现,只需要coroutine.resume()即可, 这比线程的唤醒要简单很多。

协程可以在await_suspend中调用liburing等异步io函数,然后挂起。同时主线程循环检查liburing的cqe(complete queue entry),对于完成的IO,唤醒对应的协程继续处理。

有栈协程

C++20的协程是无栈协程,无栈协程就是一个Task对象,这个task对象可以通过co_await挂起, 并记录当前的状态。等到resume时,可以从当前的状态继续执行。coroutine对象会记录当前协程函数内的局部变量(包括参数)、挂起点等状态。

相比无栈协程,有栈协程更容易理解,在有栈协程中,每个协程函数都相当于给指定线程的任务队列加入一个任务

  1. 一般来说,每个线程会执行一个loop循环, 该循环从任务队列取出一个协程任务,执行,然后下一个
  2. 当协程需要挂起时,会保存当前的上下文,将自己放到等待队列,将挂起的任务提交到异步执行
  3. 当异步任务完成后,从等待队列唤醒对应的任务加到执行队列,继续执行。

执行有栈协程的线程 相当于操作系统的线程CPU调度,典型的就是golang的协程。golang的GMP 调度模型 就类似操作系统的进程-CPU-调度器。有栈协程不需要固定的对象保存状态,需要挂起协程时就创建栈保存状态,协程继续执行时就清理栈恢复状态。而无栈协程通过coroutine对象来保存状态和管理协程生命周期。无栈协程的状态保存和恢复相比有栈协程性能更高,同时执行流更清晰,有利于编译器的优化。

代码越静态,结果越容易预测,越有利于编译期的优化。

异步编程模型

SQE/CQE 模型

​​SQE(Submission Queue Entry,提交队列项)和 CQE(Completion Queue Entry,完成队列项)​​ 是高性能异步 I/O 框架(如 Linux io_uring、SPDK、DPDK)中的核心机制,用于实现高效的 ​​生产者-消费者模型​​。

SQE和CQE 一般是两个环形队列,对于SQE,用户程序是生产者,内核/硬件是消费者。对于CQE,内核/硬件是生产者,应用程序是消费者。

  1. ​SQE(任务提交)​​, 应用程序将 I/O 请求(如读、写)封装为 SQE,提交到​​提交队列(Submission Queue, SQ)​​,通知硬件或内核处理。
  2. ​CQE(完成通知)​​:硬件或内核处理完请求后,生成 CQE 并放入​​完成队列(Completion Queue, CQ)​​,应用程序轮询或异步接收结果。

C++ 协程可以很容易的和SQE/CQE 模型结合,await_suspend函数将任务加到SQE队列等待执行, 而主线程定期轮询CQE队列,对于CQE中完成的异步任务,主线程会唤醒协程继续执行。

io_uring 的SQE和CQE模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <linux/io_uring.h>

// 提交队列项(SQE)结构体
struct io_uring_sqe {
__u8 opcode; // 操作类型(如 IORING_OP_READV)
__u64 addr; // 数据地址(如缓冲区指针)
__u32 len; // 数据长度
__u64 user_data; // 用户自定义数据(用于关联请求上下文)
// ... 其他字段(文件描述符、标志位等)
};

// 完成队列项(CQE)结构体
struct io_uring_cqe {
__u64 user_data; // 对应 SQE 的 user_data
__s32 res; // 操作结果(成功时为字节数,失败时为负的错误码)
__u32 flags; // 附加标志
};

// 提交请求到 SQ
void io_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset);
io_uring_submit(&ring); // 提交 SQEs 到内核

// 从 CQ 获取结果
struct io_uring_cqe *cqe;
io_uring_peek_cqe(&ring, &cqe); // 非阻塞获取 CQE
io_uring_cq_advance(&ring, 1); // 标记 CQE 已处理

SPDK 通过SQE/CQE模型向nvme ssd驱动提交IO请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct spdk_nvme_ns *ns = ...;
struct spdk_nvme_qpair *qpair = ...;
char *buffer = ...;

// 提交读请求(SQE)
spdk_nvme_ns_cmd_read(ns, qpair, buffer, lba, lba_count,
completion_cb, NULL, 0);

// 处理 CQE(回调函数)
void completion_cb(void *ctx, const struct spdk_nvme_cpl *cpl) {
if (spdk_nvme_cpl_is_error(cpl)) {
// 错误处理
} else {
// 处理数据
}
}

DPDK 通过SQE/CQE模型向提交网络包

1
2
3
4
5
6
7
8
9
10
11
struct rte_mbuf *tx_pkts[32];
// 填充发送包(SQE)
for (int i = 0; i < 32; i++) {
tx_pkts[i] = ...; // 构造数据包
}
// 提交发送请求
uint16_t sent = rte_eth_tx_burst(port_id, queue_id, tx_pkts, 32);

// 接收完成的数据包(CQE)
struct rte_mbuf *rx_pkts[32];
uint16_t received = rte_eth_rx_burst(port_id, queue_id, rx_pkts, 32);

事件通知模型

事件通知模型不像sqe/cqe模型有明确的任务提交队列和任务完成队列,而是通过接收事件来确定是否可读可写。典型的就是linux的epoll。对于读操作,sqe/cqe模型只需要提交一个读任务,然后等待完成队列中的读任务完成即可。但对epoll,需要等待读事件触发,才能执行读操作。sqe/cqe是一个主动请求等待返回的模型, 而epoll是一个被动等待触发的模型。

mtcp 实现了一个用户态的epoll,

  1. 通过 DPDK 的轮询模式驱动(Poll Mode Driver, PMD)直接从网卡收取数据包,解析为 TCP 报文。
  2. 当收到TCP报文时,会将对应的连接设置成数据可读(EPOLLIN)、可写(EPOLLOUT)或新连接到达(EPOLLACCEPT)事件,并将该事件加入到就绪队列中
  3. 应用层通过 mtcp_epoll_wait() 等接口从就绪队列中获取事件,执行回调或进一步处理。

协程和事件通知模型结合时,await_suspend可以将操作加入等待队列,而epoll_wait 返回触发的事件时,可以将数据写到指定的buffer,然后通知等待队列里对应的协程继续执行。

可见相比epoll的事件通知模型,协程liburing等sqe/cqe 模型更自然的结合, 通过轮询cqe的方式对IO完成的协程继续执行。

DPDK(Data Plane Development Kit)和SPDK(Storage Performance Development Kit)是两款由英特尔发起的开源项目,分别专注于提升网络数据平面和存储I/O的性能。DPDK目的是成为用户态网络包处理的标准框架,SPDK则是成为用户态绕过内核操作nvme 协议的块设备IO处理的标准框架。

DPDK​, 包括mTCP, 是一个用户态TCP协议栈。数据包直接从网卡读取,发给用户态组成TCP包。

  1. ​用户态网络驱动​​。绕过内核协议栈,直接在用户态处理网络包,减少数据拷贝和上下文切换。
  2. ​​零拷贝技术​​。通过大页内存(HugePage)和内存池(Memory Pool)减少内存访问开销。
  3. 轮询模式​​。使用无锁队列(Ring)和CPU轮询(Poll-mode Driver)避免中断延迟。
  4. ​多核扩展​​ 基于线程绑定(CPU affinity)和流水线模型实现高性能多核处理。

​​SPDK​,是用户态的NVME设备IO框架。IO数据直接从nvme设备到用户态

  1. ​用户态NVMe驱动​​ 完全用户态实现NVMe协议,避免内核存储栈的开销。
  2. ​异步无锁设计​​ 使用异步I/O和事件驱动模型,减少锁竞争。
  3. ​​零拷贝访问​​ 通过内存映射(Memory-mapped I/O)和直接访问SSD的PRP(Physical Region Page)列表提升效率。
  4. ​高并发优化​​ 支持多队列(Multi-queue)和并行I/O请求,充分利用NVMe SSD的多核能力。

iouring 是linux 内核提供的异步IO接口,相比spdk, 存储层还是会经过内核,走vfs接口, 利用内核的pagecache缓存和块设备抽象。

rdma是远程内存访问,通过支持RDMA的网卡(RNIC)直接读写远程内存,需要专门的RDMA网卡。RDMA可以和dpdk/spdk/iouring结合,rdma 是针对rdma专用网卡的网络协议、数据收发,而后者侧重描述的是从用户传输层到网卡的通用架构。rdma 侧重于高性能计算存储,dpdk侧重于更通用的tcp服务。

dpdk, spdk, rdma, iouring 采用的模型都是cqe/sqe 生产消费模型。C++ 通过以上生态,实现了网络和存储IO的高性能。再加上流行的gpu cuda高性能计算框架,使C++在高性能计算/存储/网络领域不可替代。