并发编程是提高程序处理能力的重要手段,也是编程语言必备的能力。并发的对象是函数,函数作为任务被多个并发计算对象执行。

并发的单位可以是进程、线程和协程(用户态线程),并发需要保证的三个特点:原子性、顺序性和可见性。

  1. 原子性表示操作要么全部成功,要么全部失败,执行中不能被中断。原子性通常使用锁(包括悲观锁、乐观锁)保证。
  2. 顺序性表示任务执行的顺序与任务提交时的顺序一致。顺序性通常使用队列(先进先出)保证,常见的是是生产者-消费队列保证顺序性,例如golang的channel。好的顺序设计,可以保证每个阶段只有少量线程执行,减少锁的使用。减少共享内存的使用,多用顺序性的信号通知,减少锁争抢。在分布式系统中,面对ABA问题,还会对操作和共享内存增加版本号来保证顺序性。
  3. 可见性表示对共享内存的修改对其他线程可见。常见的是使用内存屏障(memory barrier)保证可见性,内存屏障是CPU提供的一种同步机制,保证CPU执行到内存屏障之前的指令都执行完成,再执行内存屏障之后的指令。C++原子类的内存序就是在保证可见性。

C语言

pthread

C语言中的多线程编程一般使用pthread库, pthread(POSIX线程)库是用于在类Unix系统中实现多线程编程的API实现了posix标准的线程接口。

pthread支持的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>
// 线程创建,线程创建后立即执行,函数用void函数指针类型void *(*start_routine) (void *) 传参
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
void pthread_exit(void *retval); // 使当前线程退出,并返回一个状态。
int pthread_join(pthread_t thread, void **retval); // 主线程等待子线程执行完毕。
pthread_t pthread_self(void); // 返回当前线程的Id

// 初始化互斥锁, attr通常为NULL
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_lock(pthread_mutex_t *mutex); // 申请互斥锁. 返回错误码
int pthread_mutex_unlock(pthread_mutex_t *mutex); // 释放互斥锁
int pthread_mutex_destroy(pthread_mutex_t *mutex); // 销毁互斥锁, 互斥锁释放内存前需手动销毁

// 初始化条件变量, attr通常为NULL
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
// 阻塞当前线程,等待条件变量通知。线程唤醒后立即尝试申请锁
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒一个等待条件变量的线程。
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有等待条件变量的线程。
int pthread_cond_destroy(pthread_cond_t *cond); // 销毁条件变量

// 初始化线程属性对象
int pthread_attr_init(pthread_attr_t *attr);
// 设置线程PTHREAD_CREATE_DETACHED(分离线程)或 PTHREAD_CREATE_JOINABLE(可等待的线程)。
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
int pthread_attr_destroy(pthread_attr_t *attr); // 销毁线程等待对象

// 设置cpu亲和力, 将线程绑定到指定的 CPU 核心上。
// cpu_set_t 是一个位图类型,每一位表示一个 CPU 核心。可以用CPU_SET(cpu, cpuset)将指定的 CPU 核心(cpu)添加到 cpuset 中
int pthread_setaffinity_np(pthread_t thread, size_t cpusetsize, const cpu_set_t *cpuset);
// 获取cpu亲和性
int pthread_getaffinity_np(pthread_t thread, size_t cpusetsize, cpu_set_t *cpuset);

// 设置线程的调度策略和优先级
int pthread_setschedparam(pthread_t thread, int policy, const struct sched_param *param);
// 获取线程的调度策略和优先级。
int pthread_getschedparam(pthread_t thread, int *policy, struct sched_param *param);

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); 需要传入锁, 表示条件变量等待函数需要先获取锁, 发现条件不满足, 才释放锁进入等待状态。这说明条件变量的设计目的是减少申请锁的冲突, 将不符合条件的线程进入等待,而不是让所有线程都竞争锁。(事实上锁竞争的线程也是忙等待,一般线程没有抢到锁会先执行一段时间的自旋等待,如果还没有抢到锁则进入锁等待队列, 释放锁的线程会唤醒处于锁等待状态的线程)

如果要单纯的等待-通知, 不用使用锁,可以使用信号

1
2
3
4
5
6
7
8
9
#include <sys/signalfd.h>

sigset_t mask;
sigaddset(&mask, SIGUSR1);
int sfd = signalfd(-1, &mask, SFD_NONBLOCK);

// 线程等待信号
struct signalfd_siginfo info;
read(sfd, &info, sizeof(info)); // 阻塞直到收到信号

pthread利用锁、条件变量和数组模拟的环形缓冲区实现生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define BUFFER_SIZE 10 // 缓冲区大小

// 定义缓冲区和相关变量
int buffer[BUFFER_SIZE];
int in = 0; // 指向缓冲区的生产者插入位置
int out = 0; // 指向缓冲区的消费者取出位置
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t empty; // 条件变量,表示缓冲区非空
pthread_cond_t full; // 条件变量,表示缓冲区非满

// 生产者线程函数
void* producer(void* arg) {
int item;
while (1) {
item = rand() % 100; // 生成一个随机项

pthread_mutex_lock(&mutex); // 加锁,访问共享缓冲区

// 如果缓冲区满了,等待消费者消费
while ((in + 1) % BUFFER_SIZE == out) {
pthread_cond_wait(&empty, &mutex); // 等待缓冲区有空位
}

// 将项放入缓冲区
buffer[in] = item;
printf("Producer produced: %d\n", item);
in = (in + 1) % BUFFER_SIZE; // 更新生产者插入位置

// 唤醒消费者线程
pthread_cond_signal(&full);

pthread_mutex_unlock(&mutex); // 解锁

sleep(1); // 模拟生产的时间
}
return NULL;
}

// 消费者线程函数
void* consumer(void* arg) {
int item;
while (1) {
pthread_mutex_lock(&mutex); // 加锁,访问共享缓冲区

// 如果缓冲区为空,等待生产者生产
while (in == out) {
pthread_cond_wait(&full, &mutex); // 等待缓冲区有数据
}

// 从缓冲区取出项
item = buffer[out];
printf("Consumer consumed: %d\n", item);
out = (out + 1) % BUFFER_SIZE; // 更新消费者取出位置

// 唤醒生产者线程
pthread_cond_signal(&empty);

pthread_mutex_unlock(&mutex); // 解锁

sleep(2); // 模拟消费的时间
}
return NULL;
}

int main() {
pthread_t producer_thread, consumer_thread;

// 初始化互斥锁和条件变量
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&empty, NULL);
pthread_cond_init(&full, NULL);

// 创建生产者和消费者线程
pthread_create(&producer_thread, NULL, producer, NULL);
pthread_create(&consumer_thread, NULL, consumer, NULL);

// 等待线程结束
pthread_join(producer_thread, NULL);
pthread_join(consumer_thread, NULL);

// 销毁互斥锁和条件变量
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&empty);
pthread_cond_destroy(&full);

return 0;
}

// 输出
Producer produced: 83
Consumer consumed: 83
Producer produced: 86
Consumer consumed: 86
Producer produced: 77
Producer produced: 15
Consumer consumed: 77
Producer produced: 93
...

线程cpu绑核,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sched.h>
#include <unistd.h>

void* thread_function(void* arg) {
cpu_set_t mask;
pthread_t thread = pthread_self();

// 获取当前线程的 CPU 亲和性
int result = pthread_getaffinity_np(thread, sizeof(cpu_set_t), &mask);
if (result != 0) {
perror("pthread_getaffinity_np");
exit(EXIT_FAILURE);
}

// 输出当前线程可以运行的 CPU 核心
for (int i = 0; i < CPU_SETSIZE; i++) {
if (CPU_ISSET(i, &mask)) {
printf("Thread can run on CPU core: %d\n", i);
}
}

return NULL;
}

int main() {
pthread_t thread;
cpu_set_t cpuset;

// 创建线程
pthread_create(&thread, NULL, thread_function, NULL);

// 设置线程绑定到 CPU 核心 0 和 1
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset); // 允许线程在 CPU 核心 0 上运行
CPU_SET(1, &cpuset); // 允许线程在 CPU 核心 1 上运行

// 设置线程的 CPU 亲和性
int result = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
if (result != 0) {
perror("pthread_setaffinity_np");
exit(EXIT_FAILURE);
}

// 等待线程结束
pthread_join(thread, NULL);

return 0;
}
// 输出
Thread can run on CPU core: 0
Thread can run on CPU core: 1

线程调度策略,

  1. SCHED_FIFO:先来先服务(First-In-First-Out)实时调度,
  2. SCHED_RR:轮转调度(Round-Robin)实时调度,
  3. SCHED_OTHER默认的操作系统决定调度线程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    #include <pthread.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <sched.h>

    void* thread_function(void* arg) {
    printf("Thread running\n");
    return NULL;
    }

    int main() {
    pthread_t thread;
    struct sched_param param;
    int policy;

    // 创建线程
    pthread_create(&thread, NULL, thread_function, NULL);

    // 设置调度策略为 SCHED_FIFO,并设置优先级为 10
    param.sched_priority = 10;
    pthread_setschedparam(thread, SCHED_FIFO, &param);

    // 获取并输出当前线程的调度策略和优先级
    pthread_getschedparam(thread, &policy, &param);
    printf("Thread policy: %d, Priority: %d\n", policy, param.sched_priority);

    pthread_join(thread, NULL); // 等待线程结束
    return 0;
    }

    // 输出
    Thread policy: 1, Priority: 10
    Thread running

原子操作

编程语言的赋值一般是原子的,但算术操作不是原子的(需要访存、操作、写回三个操作)。c语言可以直接使用gcc提供的内建原子操作函数, 原子函数在操作函数会加内存屏障,保证常见操作的原子性、可见性。

  1. __sync_fetch_and_add, 原子地将一个值加到目标变量,返回旧值。
  2. __sync_fetch_and_sub:原子地目标变量减去一个值,返回旧值。
  3. __sync_lock_test_and_set:原子地设置一个值,返回旧值。

原子函数的实现原理

  1. 默认包含一个 ​​完整的顺序一致性屏障(Full Memory Barrier)​​,确保操作前的所有内存访问(读/写)在原子操作前完成:操作后的所有内存访问(读/写)在原子操作后开始。也就是是原子操作时可以读到最新的旧值, 原子操作后设置的新值其他线程可见
  2. 硬件支持,在 x86/x64 架构中,原子操作通常有对应的硬件指令,例如 xchg(Exchange)指令(带 lock 前缀),确保操作的原子性。

虽然赋值操作是原子的,但C和C++没有java volatile 关键字保证可见性, 因此赋值操作还是要用原子操作__sync_lock_test_and_set。java的volatile关键字可以保证赋值操作的可见性,无须加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <stdio.h>
#include <pthread.h>

int counter = 0; // 用于计数的共享变量

void* increment(void* arg) {
for (int i = 0; i < 1000; i++) {
__sync_fetch_and_add(&counter, 1); // 原子地增加 1
}
return NULL;
}

int main() {
pthread_t threads[10];

// 创建 10 个线程
for (int i = 0; i < 10; i++) {
pthread_create(&threads[i], NULL, increment, NULL);
}

// 等待线程结束
for (int i = 0; i < 10; i++) {
pthread_join(threads[i], NULL);
}

printf("Counter value: %d\n", counter); // 输出最终值

return 0;
}
// 输出10000

GCC 4.7+ 引入了更灵活的 __atomic 系列函数,允许显式控制内存顺序:

1
2
// __atomic 版本(可指定内存顺序)
type __atomic_exchange_n(type *ptr, type value, int memorder);

__thread, 是gcc提供的线程局部存储,每个线程操作自己的变量副本,互不影响。

  1. 只能用于全局和静态变量
  2. ​静态初始化​​:变量必须在编译时初始化,且只能为 ​​简单数据类型​​(如 int、float、指针),不支持动态初始化或复杂类型(如结构体、动态数组)。
    1
    2
    3
    4
    5
    6
    7
    static __thread int thread_local_var;  // 必须为 static 或全局变量

    static __thread unsigned long task_count; // 线程私有计数器

    void process_task() {
    task_count++; // 无锁操作
    }

C++

C++11的并发类

C++11 提供的并发类主要包括

  1. std::thread线程, 创建立即执行, 提供join(), detach(), get_id()等方。头文件#include <thread>
  2. std::mutex互斥锁, 提供lock(), unlock(), try_lock(), std::lock_guard<std::mutex>, std::unique_lock<std::mutex>。头文件#include <mutex>
  3. std::condition_variable 条件变量,提供wait(), notify_one(), notify_all()等方法, 头文件#include <condition_variable>
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // std::thread(Func&& f, Args&&... args):创建一个线程并开始执行传入的函数 构造函数
    // std::thread::join():等待线程执行完毕。
    // std::thread::detach():将线程与主线程分离。
    // std::thread::get_id(), 返回线程id

    #include <iostream>
    #include <thread>

    void print_thread_id() {
    std::thread::id this_id = std::this_thread::get_id();
    std::cout << "Thread ID: " << this_id << std::endl;
    }

    int main() {
    std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;

    std::thread t(print_thread_id);
    t.join();

    return 0;
    }
    // 输出
    Main thread ID: 140509241898816
    Thread ID: 140509241894656

线程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// std::mutex::lock()
// std::mutex::unlock()
// std::mutex::try_lock():尝试锁定互斥量
// std::lock_guard<std::mutex> guard(mtx); 自动加锁和释放锁
// std::unique_lock<std::mutex> lck(mtx); 可手动释放锁,若未手动释放则自动释放锁
// std::condition_variable::wait() 阻塞当前线程
// std::condition_variable::notify_one():通知一个等待的线程。
// std::condition_variable::notify_all():通知所有等待的线程。

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::queue<int> buffer; // 共享缓冲区
std::mutex mtx; // 互斥锁,保护缓冲区
std::condition_variable cv; // 条件变量,通知生产者或消费者

const int MAX_BUFFER_SIZE = 5; // 缓冲区最大大小

// 生产者线程函数
void producer() {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时
std::unique_lock<std::mutex> lock(mtx);

// 如果缓冲区已满,生产者等待
cv.wait(lock, []() { return buffer.size() < MAX_BUFFER_SIZE; });
buffer.push(i);
std::cout << "Produced: " << i << std::endl;
cv.notify_all();
}
}

// 消费者线程函数
void consumer() {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 模拟消费耗时
std::unique_lock<std::mutex> lock(mtx);

// 如果缓冲区为空,消费者等待
cv.wait(lock, []() { return !buffer.empty(); });
int item = buffer.front();
buffer.pop();
std::cout << "Consumed: " << item << std::endl;
cv.notify_all();
}
}

int main() {
// 创建生产者和消费者线程
std::thread producer_thread(producer);
std::thread consumer_thread(consumer);

// 等待线程执行完毕
producer_thread.join();
consumer_thread.join();

return 0;
}
// 输出
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 2
Produced: 3
Consumed: 2
Produced: 4
...

异步任务

  1. std::promise和std::future。允诺(Promise)对象,用于在线程中存储一个值或异常,供其他线程通过 future 获取。未来(Future)对象,用于从 promise 中异步获取结果。提供阻塞或非阻塞方式访问数据。头文件#include <future>
  2. std::future和std::async, std::aync 直接传入函数, 返回一个future。该future可以通过get()获取函数返回值。std::async会自动创建异步执行任务

std::future和std::promise联用,用于线程间传递数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
#include <future>

void set_value(std::promise<int>& p) {
std::this_thread::sleep_for(std::chrono::seconds(2));
p.set_value(42); // 设置 promise 的值
}

int main() {
// 创建 promise 和 future 对象
std::promise<int> p; // 可以存储一个变量
std::future<int> f = p.get_future(); // 可以从promise中获取存储的变量

// 创建一个线程
std::thread t(set_value, std::ref(p));

std::cout << "Doing some work in the main thread...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));

// 获取 promise 设置的结果,调用 get() 会阻塞直到线程设置值
int result = f.get(); // 阻塞,直到获取结果
std::cout << "The result from promise is: " << result << std::endl;
t.join();

return 0;
}
// 输出
Doing some work in the main thread...
The result from promise is: 42

std::future和std::async联用,获取异步任务执行完成后的返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

int calculate_square(int x) {
std::this_thread::sleep_for(std::chrono::seconds(2));
return x * x;
}

int main() {
// 使用 std::async 启动异步任务,返回一个 std::future 对象
std::future<int> result = std::async(std::launch::async, calculate_square, 5);

std::cout << "Doing other work in main thread...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));

// 获取异步任务的结果,调用 get() 会阻塞,直到结果计算完成
int square = result.get();
std::cout << "The square of 5 is: " << square << std::endl;

return 0;
}

在并发处理上,C++20 提供了协程和std::jthread。​jthread​​兼容 std::thread 接口​​和方法,提供了​自动 Join​​:在析构时自动调用 join(),避免未回收线程导致程序终止(传统 std::thread 析构时若未 join() 或 detach() 会触发 std::terminate)。

C++20 还引入了std::latch, 用于等待线程完成(这种特性加入的也太晚了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <latch>
#include <thread>
#include <vector>

int main() {
std::latch completion_latch(3); // 初始计数器为3

auto worker = [&](int id) {
std::cout << "Worker " << id << " started\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
completion_latch.count_down(); // 完成任务,计数器减1
};

std::vector<std::jthread> threads;
for (int i = 0; i < 3; ++i) {
threads.emplace_back(worker, i + 1);
}

completion_latch.wait(); // 主线程等待所有子线程完成
std::cout << "All workers completed!\n";

return 0;
}

C++17 中引入的 std::scoped_lock, 可以一次性锁住多个互斥量,避免死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
std::mutex mtx;

void safe_function() {
std::scoped_lock lock(mtx); // 锁定 mtx
// 操作共享资源...
} // 自动解锁 mtx

std::mutex mtx1, mtx2;

void safe_function() {
std::scoped_lock lock(mtx1, mtx2); // 同时锁定 mtx1 和 mtx2
// 操作共享资源...
} // 自动解锁 mtx2 和 mtx1(逆序)

智能指针

std::unique_ptr<T>,创建对象时使用,具有对象的所有权。

std::shared_ptr<T>,共享所有权, 使用引用计数维护对象生命周期, 在所有权不明确或对象析构时刻不明确时使用, 尽量使用unique_ptr

std::weak_ptr<T>,弱指针,不维护对象生命周期。当对象处于观察者状态时使用,可通过尝试lock()函数判断对象是否存在,如果存在则访问。当对象A 持有对象B的shared_ptr,同时B又想持有对象A的shared_ptr时,为了避免循环引用,B应当持有A的weak_ptr 。

C++线程局部变量

thread_local 是 C++11 引入的关键字, 相比__thread, 前者只能修饰​​简单数据类型​​(如 int、float、指针),thread_local可用于局部变量、全局变量、类成员变量等。性能上两者均接近普通全局变量,无显著差异。

线程局部变量没有数据竞争,其他线程对变量的修改也不会影响当前线程的变量值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <pthread.h>

thread_local int thread_local_var = 0; // 声明线程局部变量

void* thread_function(void* arg) {
thread_local_var++;
printf("Thread %ld: thread_local_var = %d\n", (long)arg, thread_local_var);
return NULL;
}

int main() {
pthread_t threads[3];

// 创建多个线程
for (long i = 0; i < 3; i++) {
pthread_create(&threads[i], NULL, thread_function, (void*)i);
}

// 等待线程结束
for (int i = 0; i < 3; i++) {
pthread_join(threads[i], NULL);
}

return 0;
}
// 输出
Thread 0: thread_local_var = 1
Thread 1: thread_local_var = 1
Thread 2: thread_local_var = 1


class MyClass {
public:
MyClass() { /* 构造函数 */ }
~MyClass() { /* 析构函数 */ }
};
thread_local MyClass obj; // 合法,每个线程独立构造/析构

C++11 的原子类

std::atomic提供了线程安全的原子操作。std::atomic 是一个模板类,支持不同的数据类型(如整数、指针)。常用的方法

  1. load() 读取原子变量的值。
  2. store(x) 写入值到原子变量
  3. fetch_add(x) 和 fetch_sub(x) 原子加减法操作,返回旧值
  4. exchange(x) 将原子对象的值替换为给定值,并返回旧值
  5. compare_exchange_weak(expected, desired) 和 compare_exchange_strong(expected, desired) 执行原子比较并交换(CAS,Compare and Swap)操作。expected是当前期望的值,desired是期望修改后的值。返回true表示更新成功,false表示未更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

#include <iostream>
#include <atomic>
#include <thread>

std::atomic<int> counter(0);

void increment(int id) {
for (int i = 0; i < 10; ++i) {
int expected = counter.load();
while (!counter.compare_exchange_strong(expected, expected + 1)) {
std::this_thread::yield(); // 如果失败,线程主动让出 CPU
}
std::cout << "Thread " << id << " incremented counter to " << counter.load() << std::endl;
}
}

int main() {
std::thread t1(increment, 1);
std::thread t2(increment, 2);

t1.join();
t2.join();

std::cout << "Final counter value: " << counter.load() << std::endl;
return 0;
}
// 输出
Thread 1 incremented counter to 2
Thread 1 incremented counter to 3
Thread 1 incremented counter to 4
Thread 1 incremented counter to 5
Thread 1 incremented counter to 6
Thread 1 incremented counter to 7

内存序和可见性

内存序是为了保证多线程在多核CPU的可见性,是一种内存屏障。作用和JAVA的volatile类似。

  1. memory_order_seq_cst 是默认的内存顺序,相当于JAVA的volatile,对原子类的修改操作会立刻刷到内存。
  2. memory_order_acquire和load连用,memory_order_release和store连用。这个保证,如果某线程执行了store+memory_order_release的操作,在其他线程执行load+memory_order_acquire时,前面的store操作是可见的

由于C++没有类似java 的volatile关键字(C++的volatile 只是避免编译层的优化的重排序, 只有底层比较hack的代码告知编译期不要优化我hack的代码时才会用到, C++ 编程很少遇到),因此一旦涉及多线程共享变量,要么加锁,要么上原子类(一般不会手动使用内存屏障)。

  1. memory_order_relaxed:不保证任何顺序,只提供原子性,一般不会使用

  2. memory_order_consume:保证在该原子操作之后的所有依赖操作的顺序, 这个类似memory_order_acquire, 比 memory_order_acquire 更轻量,但适用范围更窄, 只保证依赖原子类的顺序。
    以下常用

  3. memory_order_acquire:保证acquire之后的操作实际也在该操作后执行,不会重排序到操作之前,常修饰load操作

  4. memory_order_release:保证release之前的所有操作在该操作前执行,不会重排序到操作之后,常修饰store操作
    store+memory_order_release 和load+memory_order_acquire 保证写之后读之前是有序的。写-读是一个pair。release-acquire之间不是原子类的操作, 也会保证有序性。

  5. memory_order_acq_rel:同时具备获取和释放语义,用来修饰load+store操作,如fetch_add

  6. memory_order_seq_cst:保证严格的顺序一致性,即原子类变量多线程并发执行和单线程循环执行顺序完全一致, 原子类默认内存序

内存序本身不提供“线程必须等待其他线程写了数据,才可以读”这样的同步语义,需要使用一个load循环,直到load数据。它能保证的是,只要某个线程store了数据,其他线程立刻可以load到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <atomic>
#include <thread>

std::atomic<bool> ready(false);
std::atomic<int> data(0);

void producer() {
data.store(42, std::memory_order_relaxed); // 写入数据
ready.store(true, std::memory_order_release); // 通知消费者
}

void consumer() {
while (!ready.load(std::memory_order_acquire)) { // 等待生产者通知
std::this_thread::yield(); // cpu从当前线程切走, 但线程不会进入等待队列,后面还可能被cpu调度
}
std::cout << "Consumer read data: " << data.load(std::memory_order_relaxed) << std::endl;
}

int main() {
std::thread t1(producer);
std::thread t2(consumer);

t1.join();
t2.join();

return 0;
}

C++的lambda表达式和函数类型

Lambda 是一种匿名闭包对象,​​每个 Lambda 表达式的类型都是唯一的闭包类型​​,无法显式声明,但可以通过 auto 或 decltype 推导。

std::function<ReturnType(Args...)>可用来存储可调用对象,如函数对象,lambda表达式等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <functional>
#include <vector>

int main() {
std::vector<std::function<void()>> tasks;

// 添加不同类型的可调用对象
tasks.push_back([]{ std::cout << "Task 1"; }); // Lambda
tasks.push_back(&some_function); // 函数指针
tasks.push_back(Functor{}); // 仿函数, 实现operator()的对象

for (auto& task : tasks) {
task(); // 统一调用
}
return 0;
}

异常处理

C++异常处理基于三个关键字:try、catch、throw。

1
2
3
4
5
6
7
try {
double result = divide(10, 0);
} catch (const std::runtime_error& e) {
std::cerr << "Error: " << e.what() << std::endl;
} catch (...) { // 捕获所有异常
std::cerr << "Unknown error!" << std::endl;
}

异常处理的缺陷

  1. 栈展开Stack Unwinding,抛出异常时,编译器需生成回溯调用栈,逐个退出函数作用域的逻辑,调用局部对象的析构函数,直到找到匹配的 catch 块。

  2. 编译器需为每个 try 块生成额外的元数据(如异常表),增加二进制文件体积。

  3. 异常改变了代码的显式执行路径,错误处理逻辑分散在 catch 块中,而非就近处理。

  4. C++推荐使用错误码来进行错误处理,无额外性能开销;错误处理需要保证资源释放

  5. 如果调用的函数可能抛出异常,那么调用者需要捕获,防止程序崩溃。

C++ noexcept关键字指定函数不抛异常,编译器可能为 noexcept 函数生成更精简的代码(无需准备栈展开逻辑)。析构函数和移动构造/赋值函数默认是noexcept函数。若 noexcept 函数意外抛异常,程序直接终止。

JAVA

JAVA 并发类

Thread类,继承 Thread 类并重写其 run() 方法,实现多线程。JAVA原生支持, 不需要import 库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class MyThread extends Thread {
public void run() {
System.out.println("Thread is running...");
}
}

public class Main {
public static void main(String[] args) {
MyThread t1 = new MyThread();
t1.start();
}
}

// 实现 Runnable 接口并重写 run() 方法, 实现多线程。不需要import 库
class MyRunnable implements Runnable {
public void run() {
System.out.println("Runnable thread is running...");
}
}

public class Main {
public static void main(String[] args) {
Thread t1 = new Thread(new MyRunnable());
t1.start();
}
}

Callable和FutureTask接口,与 Runnable 类似,但可以返回结果和抛出异常。需要import java.util.concurrent.Callable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<Integer> {
public Integer call() throws Exception {
return 42; // 返回结果
}
}

public class Main {
public static void main(String[] args) throws Exception {
FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
Thread t1 = new Thread(futureTask);
t1.start();

System.out.println("Result: " + futureTask.get());
}
}

线程池类, Executor 和 ExecutorService。C++标准库里没有线程池类, 需要引用第三方库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);

executor.execute(() -> System.out.println("Task 1"));
executor.execute(() -> System.out.println("Task 2"));

executor.shutdown();
}
}
// 输出
Task 1
Task 2

CountDownLatch, latch计数器,用于等待其他线程完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.CountDownLatch;

public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);

Runnable task = () -> {
System.out.println(Thread.currentThread().getName() + " finished task");
latch.countDown();
};

new Thread(task).start();
new Thread(task).start();
new Thread(task).start();

latch.await();
System.out.println("All tasks finished");
}
}

ThreadLocal,线程内部变量

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Main {
public static void main(String[] args) {
ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

Runnable task = () -> {
threadLocal.set(threadLocal.get() + 1);
System.out.println(Thread.currentThread().getName() + " value: " + threadLocal.get());
};

new Thread(task).start();
new Thread(task).start();
}
}

synchronized 锁, Java 内置的关键字,可用于方法或代码块, 用于线程同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 同步方法,整个方法需要互斥执行
public synchronized void method() {
System.out.println(Thread.currentThread().getName() + " is executing");
}
// 同步对象,整个this对象需要互斥执行
public void method() {
synchronized (this) {
System.out.println(Thread.currentThread().getName() + " is executing");
}
}


class Counter {
private int count = 0;

public synchronized void increment() {
count++;
}

public synchronized int getCount() {
return count;
}
}

public class Main {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();

Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) counter.increment();
});

Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) counter.increment();
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("Final count: " + counter.getCount());
}
}

java提供volatile关键字保证变量的可见性, volatile关键字实现如果某线程只对变量进行赋值或读取操作,操作时不需要加锁。对于volatile使用场景, C++一般要使用原子类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class WorkerThread extends Thread {
private volatile boolean running = true;

public void stopTask() {
running = false; // 设置为线程终止标志, 其他线程修改后立即可见, 不需要加锁
}

@Override
public void run() {
while (running) {
// 执行任务...
}
}
}

public class Singleton {
private static volatile Singleton instance;

public static Singleton getInstance() {
if (instance == null) { // 第一次检查无锁, 使用volatile保证第一次检查时的可见性
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查
instance = new Singleton(); // volatile禁止重排序
}
}
}
return instance;
}
}

线程安全数据结构

JAVA提供了一些并发安全的数据结构,位于java.util.concurrent包中。C++标准库不提供线程安全结构, 需要第三方库。

  1. ConcurrentHashMap
    线程安全的哈希表实现,支持高效的并发读写。它将内部数据分割为多个段,每个段一个锁,可以减少锁竞争。
  2. CopyOnWriteArrayList
    线程安全的 List 实现
  3. BlockingQueue
    线程安全的队列接口,通常用于生产者-消费者模型。

java.util中的HashMap,ArrayList,LinkedList等都是非并发安全的

JAVA多线程和虚拟机

相比C/C++ 多线程通过pthread直接在linux操作系统中起进程,JAVA的多线程需要经过JAVA虚拟机这一层。

java虚拟机维护了JavaThread结构,JVM是C++实现的。所以JavaThread是C++定义的类。JavaThread维护了线程的状态,一个指针指向java.lang.Thread创建的对象(oop),另一个指针指向对应的操作系统创建的OSThread

java线程模型的实现取决于jvm虚拟机,只要jvm愿意,可以选择类似go使用协程来实现线程。但以市场占有率最大的HotSpot虚拟机举例,一个java线程都是直接映射到操作系统的原生线程来实现的,所有的线程调度都是由操作系统完成的。

原子类

volatile 关键字可以实现变量的可见性,但不提供原子性。 JAVA常见的原子类有 AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference

.set()设置值, .get()获取值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
public static void main(String[] args) {
AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.set(10); // 设置值
System.out.println("Value: " + atomicInt.get()); // 获取值
System.out.println("Incremented: " + atomicInt.incrementAndGet()); // 增加并获取
boolean success = atomicInt.compareAndSet(15, 100); // 比较并设置
System.out.println("CAS Success: " + success + ", New Value: " + atomicInt.get());
}
}
// 输出
Value: 10
Incremented: 11
CAS Success: false, New Value: 11

异常处理

java 的推荐使用异常处理,因此函数调用者需要通过try-catch-finally结构来捕获异常,防止程序崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

try {
// 可能抛出异常的代码
FileInputStream file = new FileInputStream("data.txt");
} catch (FileNotFoundException e) {
System.err.println("文件未找到: " + e.getMessage());
} catch (IOException e) {
e.printStackTrace();
} finally {
// 无论是否发生异常,都会执行的代码(如资源释放)
if (file != null) {
try {
file.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

java 函数可以加throws关键字,声明该方法可能抛出的 ​​受检异常(Checked Exceptions)​​,函数调用者必须处理异常,否则编译器报错

1
2
3
4
public void readFile(String path) throws FileNotFoundException, IOException {
FileInputStream file = new FileInputStream(path);
// 其他操作
}

Golang

并发实现

Golang 在语言层面使用协程实现并发,Go内置协程调度器,能自动在协程阻塞时将协程挂起。Golang的线程是无阻塞的,这意味Golang线程不能使用线程阻塞的系统调用。

GMP调度器, Go调度器基于 ​​G(Goroutine)、M(Machine)、P(Processor)​​ 三者的协作:

  1. ​G(Goroutine)​​:轻量级协程,初始栈仅2KB(可动态扩缩),创建和切换成本极低。
  2. ​​M(Machine)​​:对应操作系统的线程(OS Thread),负责实际执行代码。
  3. P(Processor)​​:逻辑处理器(调度器),管理一组本地队列(存储待运行的Goroutines)。P的数量默认等于CPU核心数(可通过GOMAXPROCS调整)。

Goroutine 的创建与执行​

  1. 每个协程(G)由Go运行时创建,并被分配到某个P的本地队列。
  2. M需要绑定一个P才能执行Goroutines。
  3. 当P的本地队列为空时,会从全局队列或其他P的队列中​​窃取Goroutines​​(Work Stealing)。

Golang的线程不会执行任何阻塞的系统调用, 当协程阻塞时,协程主动让出执行权,M会释放绑定的P,P转去服务其他M。系统调用完成后,G尝试获取新的P继续执行(若无可用P,则G进入全局队列)。

Go运行时有专门的​​网络轮询器(NetPoller)​​,将阻塞的G挂起,待IO就绪后唤醒,避免占用M。

一个Go程序可轻松创建数十万个Goroutines,而同等规模的OS线程会耗尽资源。M的数量由调度器动态管理(通常远少于Goroutines的数量),避免频繁创建/销毁OS线程。

golang提供sync.WaitGroup等待协程运行完, 相当于latch。golang的无缓冲channel 可以用来实现阻塞-等待,有缓冲channel可以用来当任务队列。golang利用channel可以实现顺序性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

func producer(ch chan<- int, wg sync.WaitGroup) {
for i := 0; i < 10; i++ {
item := rand.Intn(100)
fmt.Println("Produced:", item)
ch <- item // 发送数据到channel
time.Sleep(time.Second) // 模拟生产过程的延迟
}
close(ch)
wg.Done()
}

func consumer(ch <-chan int, wg sync.WaitGroup) {
for item := range ch {
fmt.Println("Consumed:", item)
time.Sleep(2 * time.Second) // 模拟消费过程的延迟
}
wg.Done()

}

func main() {
var wg sync.WaitGroup
// 创建一个缓冲区大小为 5 的channel
ch := make(chan int, 5)

// 启动多个消费者
for i := 0; i < 3; i++ {
go consumer(ch, wg)
wg.Add(1)
}

// 启动一个生产者
go producer(ch, wg)
wg.Add(1)

// 主程序运行一段时间后退出
time.Sleep(10 * time.Second)
}

// 输出
Produced: 63
Consumed: 63
Produced: 47
Consumed: 47
Produced: 47
Consumed: 47
Produced: 99
Consumed: 99
Produced: 51
Consumed: 51
...

for range语句可以用来遍历channel的数组,直到channel被close。select 语句可以等待接收多个channel的数据,只要有一个channel写入了数据,select就会执行处理函数并退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan int)

go func() {
ch <- 42
}()

select {
case msg := <-ch:
fmt.Println("Received:", msg)
case <-time.After(1 * time.Second): // 1秒后超时, select执行fmt.Println("Timeout!")
fmt.Println("Timeout!")
}
}
// 输出
Received: 42

sync.Mutex,互斥锁。sync.RWMutex 读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main

import (
"fmt"
"sync"
)

var (
counter int
mu sync.Mutex // 创建一个互斥锁
)

// 增加计数器的值
func increment() {
mu.Lock()
defer mu.Unlock()

counter++
}

func main() {
var wg sync.WaitGroup

// 启动多个 goroutine 来增加计数器
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
increment()
}()
}

// 等待所有 goroutine 执行完毕
wg.Wait()

fmt.Println("Final counter:", counter)
}

import (
"fmt"
"sync"
)

var (
data int
rwMutex sync.RWMutex
)

func read() int {
rwMutex.RLock()
defer rwMutex.RUnlock()
return data
}

func write(value int) {
rwMutex.Lock()
defer rwMutex.Unlock()
data = value
}

func main() {
var wg sync.WaitGroup

// 启动多个读 goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Printf("Reader %d: data = %d\n", i, read())
}(i)
}

// 启动一个写 goroutine
wg.Add(1)
go func() {
defer wg.Done()
write(42)
fmt.Println("Writer: data updated")
}()

wg.Wait() // 等待所有 goroutine 完成
}

sync.Once 确保函数只执行一次,无论是多次调用还是多线程多次执行。可用来实现单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import "sync"

type Singleton struct {
// 单例对象的字段
}

var (
instance *Singleton
once sync.Once
)

func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{
// 初始化字段
}
})
return instance
}

sync.Cond 条件变量,执行条件变量wait前需要持有锁

1
2
3
4
5
6
7
8
9
10
11
12
func (b *Buffer) Produce(item int) {
b.lock.Lock()
defer b.lock.Unlock()

for len(b.data) == bufferSize {
b.cond.Wait()
}
b.data = append(b.data, item)
fmt.Println("Produced:", item)

b.cond.Signal()
}

原子操作

sync/atomic 包提供了多种原子操作函数,

  1. AddInt32, func AddInt32(addr *int32, delta int32) (new int32)原子地将一个 int32 值加上一个指定的值。返回操作后的 ​​新值​​
  2. CompareAndSwapInt32, func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) 原子地比较并交换 int32 值。如果当前值等于预期值,则将其更改为新值。返回是否更改
  3. LoadInt32, func LoadInt32(addr *int32) (val int32) 原子地读取一个 int32 值。
  4. StoreInt32, func StoreInt32(addr *int32, val int32) 原子地写入一个 int32 值。

例子, 无锁计数器

1
2
3
4
5
6
7
8
9
var counter int64

func increment() {
atomic.AddInt64(&counter, 1) // 无需关心返回值
}

func get() int64 {
return atomic.LoadInt64(&counter)
}

自旋锁

1
2
3
4
5
6
7
8
9
10
11
var lock int32

func acquireLock() {
for !atomic.CompareAndSwapInt32(&lock, 0, 1) {
// 自旋等待
}
}

func releaseLock() {
atomic.StoreInt32(&lock, 0)
}

Context 协程生命周期管理

channel用来协程间通信,而Context主要用来管理协程的生命周期。

异常处理

panic用来抛出异常,recover用来捕获异常。
panic,

  1. 运行时错误自动触发​​:例如数组越界、空指针解引用等。
  2. ​主动调用 panic(v)​​:开发者可手动抛出任意类型的值(通常是 error 或 string)。

行为

  1. 立即​​终止当前函数​​的执行。
  2. ​逐层向上回溯调用栈​​,执行每个函数的 defer 语句。
  3. 若未被 recover 捕获,最终​​程序崩溃​​并打印堆栈信息。

recover:捕获异常​​

  1. ​必须在 defer 函数中调用​​:recover 仅在 defer 上下文中生效。
  2. ​仅在发生 panic 后生效​​:若无 panic,recover 返回 nil。

Go 的设计者觉得 try/catch 机制的使用太泛滥了,而且从底层向更高的层级抛异常太耗费资源。他们给 Go 设计的机制也可以“捕捉”异常,但是更轻量,并且只应该作为(处理错误的)最后的手段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import "fmt"

func riskyOperation() {
panic("something went wrong!")
}

func safeOperation() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
}()
riskyOperation() // 触发 panic
fmt.Println("This line will NOT be executed")
}

func main() {
safeOperation()
fmt.Println("Program continues normally")
}

// 输出
Recovered from panic: something went wrong!
Program continues normally

golang推荐使用error错误码来处理错误,产生错误的函数会返回两个变量,一个值和一个错误码;如果后者是 nil 就是成功,非 nil 就是发生了错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if value, err := pack1.Func1(param1); err != nil {
fmt.Printf("Error %s in pack1.Func1 with parameter %v", err.Error(), param1)
return // or: return err
} else {
// Process(value)
}

type error interface {
Error() string
}


// 自定义错误码
type MyError struct {
Code int
Message string
Details map[string]interface{}
}

func (e *MyError) Error() string {
return fmt.Sprintf("code=%d, msg=%s", e.Code, e.Message)
}

func ProcessRequest() error {
return &MyError{
Code: 400,
Message: "invalid request",
Details: map[string]interface{}{"field": "username"},
}
}

Python

多线程和多进程

Python 的 Cpython有一个全局解释器锁(GIL)。这意味着在任何时刻,只有一个线程可以执行 Python 字节码。这简化了python的多线程管理,GIL​​单条字节码指令是原子操作​​。向x = 42, shared_list.append(item) 是线程安全的。但这让Python的多线程只能利用到单核。

虽然Python可以使用多进程,但多进程之间是独立的地址空间,难以访问共享变量,使用比较难受。因此threading 多线程模块是python常用的并发模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time

# 定义一个任务
def print_numbers():
for i in range(5):
print(i)
time.sleep(1)

# 创建线程
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_numbers)
# 创建进程
process1 = multiprocessing.Process(target=print_numbers)
process2 = multiprocessing.Process(target=print_numbers)

thread1.start()
thread2.start()
process1.start()
process2.start()

thread1.join()
thread2.join()
process1.join()
process2.join()

线程锁和条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading

class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()

def increment(self):
with self.lock: # 自动加锁/解锁
self.value += 1


cond = threading.Condition()
shared_data = []

def producer():
with cond: # 自动加锁
shared_data.append(42)
cond.notify() # 通知消费者

def consumer():
with cond: # 自动加锁
while not shared_data:
cond.wait() # 自动释放锁并等待,唤醒后重新加锁
print("Received:", shared_data.pop())

线程池,concurrent.futures.ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
import concurrent.futures

def print_square(number):
print(f"Square of {number} is {number * number}")

# 创建一个线程池,最多允许 5 个线程同时执行
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务给线程池
executor.submit(print_square, 2)
executor.submit(print_square, 3)
executor.submit(print_square, 4)

生成器和协程

生成器。python生成器是一种无栈协程,可以实现手动切换执行流。普通函数可以 yield 语句返回生成器,普通函数执行到yield后会转向执行接收yield返回值的函数,当再执行next(),线程回到原先的函数继续执行。

使用next() 和 send() 方法可以控制生成器的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
def simple_gen():
value = yield "Ready" # 初始返回值
print("Received:", value)
yield "End"

gen = simple_gen()
print(next(gen)) # 第一次执行生成器,收到"Ready"
print(gen.send(42)) # 执行流再次切换到simple_gen,simple_gen执行到"End

# 输出
Ready
('Received:', 42)
End

asyncio 是python3提供的无栈协程模块,内部实现了事件循环。

  1. async 用于定义协程函数,await 用于暂停协程并等待另一个协程完成。async定义的协程函数中不能调用可能导致线程阻塞或者等待的函数,包括阻塞读写、线程锁、sleep等。
  2. await 后面跟的是io耗时的操作,表示把线程从当前协程切换走;同时await会注册事件通知,当耗时的操作执行完时,调度器会回来再执行当前协程

await 同时维护了执行的先后顺序,对于async函数A await aysnc函数B,则必须等待B执行完函数A才会继续执行。这个执行逻辑避免了回调地狱,即对于回调函数链路A->B->C,可以直接在A里面写await B, B里面写 await C,代码更加简洁易懂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio

async def task_1():
print("Task 1 starts")
await asyncio.sleep(2)
print("Task 1 ends")

async def task_2():
print("Task 2 starts")
await task_1()
print("Task 2 ends")

# 并发执行两个任务
async def main():
await asyncio.gather(task_1(), task_2()) # 需等待所有task执行完

asyncio.run(main())

# 输出
Task 1 starts
Task 2 starts
Task 1 starts
Task 1 ends
Task 1 ends
Task 2 ends