并发编程是提高程序处理能力的重要手段,也是编程语言必备的能力。并发的单位是函数,函数作为任务被多个并发计算对象执行。并发执行的函数任务可分为一次性任务和定时任务两种。任务可以由主线程直接分发给多线程,也可以借助线程安全的任务/消息队列分发任务。

C语言

pthread

C语言中的多线程编程一般使用pthread库, pthread在linux系统调用上二次开发,实现了posix标准的线程接口。由于linux不区分线程和进程,线程只是一种共享内存、fd等资源的特殊进程,因此pthread本身也是多进程编程接口。

pthread支持的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 线程创建,线程创建后立即执行,函数用void函数指针类型void *(*start_routine) (void *) 传参
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
// 使当前线程退出,并返回一个状态。
void pthread_exit(void *retval);
// 主线程等待子线程执行完毕。
int pthread_join(pthread_t thread, void **retval);
// 返回当前线程的Id
pthread_t pthread_self(void);

// 初始化互斥锁, attr通常为NULL
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
// 申请互斥锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
// 释放互斥锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// 销毁互斥锁, 互斥锁释放内存前需手动销毁
int pthread_mutex_destroy(pthread_mutex_t *mutex);

// 初始化条件变量, attr通常为NULL
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
// 阻塞当前线程,等待条件变量通知。线程唤醒后立即尝试申请锁
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
// 唤醒一个等待条件变量的线程。
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒所有等待条件变量的线程。
int pthread_cond_broadcast(pthread_cond_t *cond);
// 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);

// 初始化线程属性对象
int pthread_attr_init(pthread_attr_t *attr);
// 设置线程PTHREAD_CREATE_DETACHED(分离线程)或 PTHREAD_CREATE_JOINABLE(可等待的线程)。
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
// 销毁线程等待对象
int pthread_attr_destroy(pthread_attr_t *attr);

// 将线程绑定到指定的 CPU 核心上。cpu_set_t 是一个位图类型,每一位表示一个 CPU 核心。可以用CPU_SET(cpu, cpuset)将指定的 CPU 核心(cpu)添加到 cpuset 中
int pthread_setaffinity_np(pthread_t thread, size_t cpusetsize, const cpu_set_t *cpuset);
// 获取cpu亲和性
int pthread_getaffinity_np(pthread_t thread, size_t cpusetsize, cpu_set_t *cpuset);

// 设置线程的调度策略和优先级
int pthread_setschedparam(pthread_t thread, int policy, const struct sched_param *param);
// 获取线程的调度策略和优先级。
int pthread_getschedparam(pthread_t thread, int *policy, struct sched_param *param);

pthread利用锁和条件变量实现生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define BUFFER_SIZE 10 // 缓冲区大小

// 定义缓冲区和相关变量
int buffer[BUFFER_SIZE];
int in = 0; // 指向缓冲区的生产者插入位置
int out = 0; // 指向缓冲区的消费者取出位置
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t empty; // 条件变量,表示缓冲区非空
pthread_cond_t full; // 条件变量,表示缓冲区非满

// 生产者线程函数
void* producer(void* arg) {
int item;
while (1) {
item = rand() % 100; // 生成一个随机项

pthread_mutex_lock(&mutex); // 加锁,访问共享缓冲区

// 如果缓冲区满了,等待消费者消费
while ((in + 1) % BUFFER_SIZE == out) {
pthread_cond_wait(&empty, &mutex); // 等待缓冲区有空位
}

// 将项放入缓冲区
buffer[in] = item;
printf("Producer produced: %d\n", item);
in = (in + 1) % BUFFER_SIZE; // 更新生产者插入位置

// 唤醒消费者线程
pthread_cond_signal(&full);

pthread_mutex_unlock(&mutex); // 解锁

sleep(1); // 模拟生产的时间
}
return NULL;
}

// 消费者线程函数
void* consumer(void* arg) {
int item;
while (1) {
pthread_mutex_lock(&mutex); // 加锁,访问共享缓冲区

// 如果缓冲区为空,等待生产者生产
while (in == out) {
pthread_cond_wait(&full, &mutex); // 等待缓冲区有数据
}

// 从缓冲区取出项
item = buffer[out];
printf("Consumer consumed: %d\n", item);
out = (out + 1) % BUFFER_SIZE; // 更新消费者取出位置

// 唤醒生产者线程
pthread_cond_signal(&empty);

pthread_mutex_unlock(&mutex); // 解锁

sleep(2); // 模拟消费的时间
}
return NULL;
}

int main() {
pthread_t producer_thread, consumer_thread;

// 初始化互斥锁和条件变量
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&empty, NULL);
pthread_cond_init(&full, NULL);

// 创建生产者和消费者线程
pthread_create(&producer_thread, NULL, producer, NULL);
pthread_create(&consumer_thread, NULL, consumer, NULL);

// 等待线程结束
pthread_join(producer_thread, NULL);
pthread_join(consumer_thread, NULL);

// 销毁互斥锁和条件变量
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&empty);
pthread_cond_destroy(&full);

return 0;
}

// 输出
Producer produced: 83
Consumer consumed: 83
Producer produced: 86
Consumer consumed: 86
Producer produced: 77
Producer produced: 15
Consumer consumed: 77
Producer produced: 93
...

线程cpu绑核,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sched.h>
#include <unistd.h>

void* thread_function(void* arg) {
cpu_set_t mask;
pthread_t thread = pthread_self();

// 获取当前线程的 CPU 亲和性
int result = pthread_getaffinity_np(thread, sizeof(cpu_set_t), &mask);
if (result != 0) {
perror("pthread_getaffinity_np");
exit(EXIT_FAILURE);
}

// 输出当前线程可以运行的 CPU 核心
for (int i = 0; i < CPU_SETSIZE; i++) {
if (CPU_ISSET(i, &mask)) {
printf("Thread can run on CPU core: %d\n", i);
}
}

return NULL;
}

int main() {
pthread_t thread;
cpu_set_t cpuset;

// 创建线程
pthread_create(&thread, NULL, thread_function, NULL);

// 设置线程绑定到 CPU 核心 0 和 1
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset); // 允许线程在 CPU 核心 0 上运行
CPU_SET(1, &cpuset); // 允许线程在 CPU 核心 1 上运行

// 设置线程的 CPU 亲和性
int result = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
if (result != 0) {
perror("pthread_setaffinity_np");
exit(EXIT_FAILURE);
}

// 等待线程结束
pthread_join(thread, NULL);

return 0;
}
// 输出
Thread can run on CPU core: 0
Thread can run on CPU core: 1

线程调度策略, SCHED_FIFO:先来先服务(First-In-First-Out)实时调度, SCHED_RR:轮转调度(Round-Robin)实时调度,SCHED_OTHER默认的操作系统决定调度线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sched.h>

void* thread_function(void* arg) {
printf("Thread running\n");
return NULL;
}

int main() {
pthread_t thread;
struct sched_param param;
int policy;

// 创建线程
pthread_create(&thread, NULL, thread_function, NULL);

// 设置调度策略为 SCHED_FIFO,并设置优先级为 10
param.sched_priority = 10;
pthread_setschedparam(thread, SCHED_FIFO, &param);

// 获取并输出当前线程的调度策略和优先级
pthread_getschedparam(thread, &policy, &param);
printf("Thread policy: %d, Priority: %d\n", policy, param.sched_priority);

pthread_join(thread, NULL); // 等待线程结束
return 0;
}

// 输出
Thread policy: 1, Priority: 10
Thread running

原子类型

c语言可以直接使用gcc提供的内建原子操作函数, 这些函数保证操作的原子性、顺序性和可见性。

__sync_fetch_and_add:原子地将一个值加到目标变量,并返回加之前的值。

__sync_fetch_and_sub:原子地从目标变量减去一个值,并返回减之前的值。

__sync_lock_test_and_set:原子地设置一个值,并返回设置之前的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <stdio.h>
#include <pthread.h>

int counter = 0; // 用于计数的共享变量

void* increment(void* arg) {
for (int i = 0; i < 1000; i++) {
__sync_fetch_and_add(&counter, 1); // 原子地增加 1
}
return NULL;
}

int main() {
pthread_t threads[10];

// 创建 10 个线程
for (int i = 0; i < 10; i++) {
pthread_create(&threads[i], NULL, increment, NULL);
}

// 等待线程结束
for (int i = 0; i < 10; i++) {
pthread_join(threads[i], NULL);
}

printf("Counter value: %d\n", counter); // 输出最终值

return 0;
}
// 输出10000

__thread, 是gcc提供的线程局部存储,只能用于全局和静态变量

C++

C++11的并发类

std::thread线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// std::thread(Func&& f, Args&&... args):创建一个线程并开始执行传入的函数 构造函数
// std::thread::join():等待线程执行完毕。
// std::thread::detach():将线程与主线程分离。
// std::thread::get_id(), 返回线程id

#include <iostream>
#include <thread>

void print_thread_id() {
std::thread::id this_id = std::this_thread::get_id();
std::cout << "Thread ID: " << this_id << std::endl;
}

int main() {
std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;

std::thread t(print_thread_id);
t.join();

return 0;
}
// 输出
Main thread ID: 140509241898816
Thread ID: 140509241894656

线程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// std::mutex::lock()
// std::mutex::unlock()
// std::mutex::try_lock():尝试锁定互斥量
// std::lock_guard<std::mutex> guard(mtx); 自动加锁和释放锁
// std::unique_lock<std::mutex> lck(mtx); 可手动释放锁,若未手动释放则自动释放锁
// std::condition_variable::wait() 阻塞当前线程
// std::condition_variable::notify_one():通知一个等待的线程。
// std::condition_variable::notify_all():通知所有等待的线程。

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::queue<int> buffer; // 共享缓冲区
std::mutex mtx; // 互斥锁,保护缓冲区
std::condition_variable cv; // 条件变量,通知生产者或消费者

const int MAX_BUFFER_SIZE = 5; // 缓冲区最大大小

// 生产者线程函数
void producer() {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时
std::unique_lock<std::mutex> lock(mtx);

// 如果缓冲区已满,生产者等待
cv.wait(lock, []() { return buffer.size() < MAX_BUFFER_SIZE; });
buffer.push(i);
std::cout << "Produced: " << i << std::endl;
cv.notify_all();
}
}

// 消费者线程函数
void consumer() {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 模拟消费耗时
std::unique_lock<std::mutex> lock(mtx);

// 如果缓冲区为空,消费者等待
cv.wait(lock, []() { return !buffer.empty(); });
int item = buffer.front();
buffer.pop();
std::cout << "Consumed: " << item << std::endl;
cv.notify_all();
}
}

int main() {
// 创建生产者和消费者线程
std::thread producer_thread(producer);
std::thread consumer_thread(consumer);

// 等待线程执行完毕
producer_thread.join();
consumer_thread.join();

return 0;
}
// 输出
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 2
Produced: 3
Consumed: 2
Produced: 4
...

异步任务

1
2
3
4
5
6
// std::async 启动异步任务,返回std::future
// std::future<int> result = std::async(std::launch::async, calculate_square, 5);

// std::future, 调用 get() 方法会阻塞等待异步任务执行完成,返回任务的结果。

// std::promise,用于在线程之间传递数据
  1. std::future和std::promise联用,用于线程间传递数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    #include <iostream>
    #include <thread>
    #include <future>

    void set_value(std::promise<int>& p) {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    p.set_value(42); // 设置 promise 的值
    }

    int main() {
    // 创建 promise 和 future 对象
    std::promise<int> p;
    std::future<int> f = p.get_future();

    // 创建一个线程
    std::thread t(set_value, std::ref(p));

    std::cout << "Doing some work in the main thread...\n";
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // 获取 promise 设置的结果,调用 get() 会阻塞直到线程设置值
    int result = f.get(); // 阻塞,直到获取结果
    std::cout << "The result from promise is: " << result << std::endl;
    t.join();

    return 0;
    }
    // 输出
    Doing some work in the main thread...
    The result from promise is: 42
  2. std::future和std::async联用,获取异步任务执行完成后的返回值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    #include <iostream>
    #include <thread>
    #include <future>
    #include <chrono>

    int calculate_square(int x) {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return x * x;
    }

    int main() {
    // 使用 std::async 启动异步任务,返回一个 std::future 对象
    std::future<int> result = std::async(std::launch::async, calculate_square, 5);

    std::cout << "Doing other work in main thread...\n";
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // 获取异步任务的结果,调用 get() 会阻塞,直到结果计算完成
    int square = result.get();
    std::cout << "The square of 5 is: " << square << std::endl;

    return 0;
    }

C++线程局部变量

全局变量,静态变量,局部变量只要是可能被多线程同时访问的,都是线程共享变量。thread_local 是 C++11 引入的关键字, 可以使用thread_local声明线程局部变量。相比__thread, thread_local可用于局部变量、全局变量、类成员变量等

线程局部变量没有数据竞争,其他线程对变量的修改也不会影响当前线程的变量值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <stdio.h>
#include <pthread.h>

thread_local int thread_local_var = 0; // 声明线程局部变量

void* thread_function(void* arg) {
thread_local_var++;
printf("Thread %ld: thread_local_var = %d\n", (long)arg, thread_local_var);
return NULL;
}

int main() {
pthread_t threads[3];

// 创建多个线程
for (long i = 0; i < 3; i++) {
pthread_create(&threads[i], NULL, thread_function, (void*)i);
}

// 等待线程结束
for (int i = 0; i < 3; i++) {
pthread_join(threads[i], NULL);
}

return 0;
}
// 输出
Thread 0: thread_local_var = 1
Thread 1: thread_local_var = 1
Thread 2: thread_local_var = 1

C++11 的原子类

std::atomic提供了线程安全的原子操作。std::atomic 是一个模板类,支持不同的数据类型(如整数、指针)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// load() 读取原子变量的值。
// store(x) 写入值到原子变量
// fetch_add(x) 和 fetch_sub(x) 原子加减法操作
// exchange(x) 将原子对象的值替换为给定值,并返回原始值。
// compare_exchange_weak(expected, desired) 和 compare_exchange_strong(expected, desired) 执行原子比较并交换(CAS,Compare and Swap)操作。expected是当前期望的值,desired是期望修改后的值。返回true表示更新成功,false表示未更新

#include <iostream>
#include <atomic>
#include <thread>

std::atomic<int> counter(0);

void increment(int id) {
for (int i = 0; i < 10; ++i) {
int expected = counter.load();
while (!counter.compare_exchange_strong(expected, expected + 1)) {
std::this_thread::yield(); // 如果失败,主动让出 CPU
}
std::cout << "Thread " << id << " incremented counter to " << counter.load() << std::endl;
}
}

int main() {
std::thread t1(increment, 1);
std::thread t2(increment, 2);

t1.join();
t2.join();

std::cout << "Final counter value: " << counter.load() << std::endl;
return 0;
}
// 输出
Thread 1 incremented counter to 2
Thread 1 incremented counter to 3
Thread 1 incremented counter to 4
Thread 1 incremented counter to 5
Thread 1 incremented counter to 6
Thread 1 incremented counter to 7

内存序。内存序是为了保证多线程在多核CPU的可见性,是一种内存屏障。作用和JAVA的volatile类似。既然是可见性,就肯定是有的线程写,有的线程读。如果只有写线程,没必要保证可见性。

  1. memory_order_seq_cst 是默认的内存顺序,相当于JAVA的volatile,对原子类的修改操作会立刻刷到内存。
  2. memory_order_acquire和load连用,memory_order_release和store连用。这个保证,如果某线程执行了store+memory_order_release的操作,在其他线程执行load+memory_order_acquire时,是可见的
    1
    2
    3
    4
    5
    6
    7
    memory_order_relaxed:不保证任何顺序, 只提供原子性,只有写没有读可以使用
    memory_order_consume:保证在该原子操作之后的所有依赖操作的顺序。
    # 下面四个常用
    memory_order_acquire:保证该操作之前的所有操作在该操作后执行。
    memory_order_release:保证该操作之后的所有操作在该操作前执行。
    memory_order_acq_rel:结合了 acquire 和 release 的效果。
    memory_order_seq_cst:保证严格的顺序一致性,即原子类变量多线程并发执行和单线程循环执行顺序完全一致

内存序本身不提供“线程必须等待其他线程写了数据,才可以读”这样的同步语义,需要使用一个load循环,直到load数据。它能保证的是,只要某个线程store了数据,其他线程立刻可以load到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <atomic>
#include <thread>

std::atomic<bool> ready(false);
std::atomic<int> data(0);

void producer() {
data.store(42, std::memory_order_relaxed); // 写入数据
ready.store(true, std::memory_order_release); // 通知消费者
}

void consumer() {
while (!ready.load(std::memory_order_acquire)) { // 等待生产者通知
std::this_thread::yield(); // cpu从当前线程切走, 但线程不会进入等待队列,后面还可能被cpu调度
}
std::cout << "Consumer read data: " << data.load(std::memory_order_relaxed) << std::endl;
}

int main() {
std::thread t1(producer);
std::thread t2(consumer);

t1.join();
t2.join();

return 0;
}

C++的lambda表达式和函数类型

JAVA

JAVA 并发类

Thread类,继承 Thread 类并重写其 run() 方法,实现多线程。不需要import 库

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyThread extends Thread {
public void run() {
System.out.println("Thread is running...");
}
}

public class Main {
public static void main(String[] args) {
MyThread t1 = new MyThread();
t1.start();
}
}

实现 Runnable 接口并重写 run() 方法, 实现多线程。不需要import 库

1
2
3
4
5
6
7
8
9
10
11
12
class MyRunnable implements Runnable {
public void run() {
System.out.println("Runnable thread is running...");
}
}

public class Main {
public static void main(String[] args) {
Thread t1 = new Thread(new MyRunnable());
t1.start();
}
}

Callable和FutureTask接口,与 Runnable 类似,但可以返回结果和抛出异常。需要import java.util.concurrent.Callable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<Integer> {
public Integer call() throws Exception {
return 42; // 返回结果
}
}

public class Main {
public static void main(String[] args) throws Exception {
FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
Thread t1 = new Thread(futureTask);
t1.start();

System.out.println("Result: " + futureTask.get());
}
}

线程池类, Executor 和 ExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);

executor.execute(() -> System.out.println("Task 1"));
executor.execute(() -> System.out.println("Task 2"));

executor.shutdown();
}
}
// 输出
Task 1
Task 2

CountDownLatch, 倒计时计数器,用于等待其他线程完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.CountDownLatch;

public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);

Runnable task = () -> {
System.out.println(Thread.currentThread().getName() + " finished task");
latch.countDown();
};

new Thread(task).start();
new Thread(task).start();
new Thread(task).start();

latch.await();
System.out.println("All tasks finished");
}
}

ThreadLocal,线程独立变量

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Main {
public static void main(String[] args) {
ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

Runnable task = () -> {
threadLocal.set(threadLocal.get() + 1);
System.out.println(Thread.currentThread().getName() + " value: " + threadLocal.get());
};

new Thread(task).start();
new Thread(task).start();
}
}

synchronized 锁, Java 内置的关键字,可用于方法或代码块, 用于线程同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 同步方法
public synchronized void method() {
System.out.println(Thread.currentThread().getName() + " is executing");
}
// 同步代码块
public void method() {
synchronized (this) {
System.out.println(Thread.currentThread().getName() + " is executing");
}
}


class Counter {
private int count = 0;

public synchronized void increment() {
count++;
}

public synchronized int getCount() {
return count;
}
}

public class Main {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();

Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) counter.increment();
});

Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) counter.increment();
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println("Final count: " + counter.getCount());
}
}

并发安全结构

JAVA提供了一些并发安全的数据结构,位于java.util.concurrent包中

  1. ConcurrentHashMap
    线程安全的哈希表实现,支持高效的并发读写。它将内部数据分割为多个段,以减少锁竞争。
  2. CopyOnWriteArrayList
    线程安全的 List 实现
  3. BlockingQueue
    线程安全的队列接口,通常用于生产者-消费者模型。

java.util中的HashMap,ArrayList,LinkedList等都是非并发安全的

JAVA多线程和虚拟机

相比C/C++ 多线程通过pthread直接在linux操作系统中起进程,JAVA的多线程需要经过JAVA虚拟机这一层。

java虚拟机维护了JavaThread结构,JVM是C++实现的。所以JavaThread是C++定义的类。JavaThread维护了线程的状态,一个指针指向java.lang.Thread创建的对象(oop),另一个指针指向对应的操作系统创建的OSThread

java线程模型的实现取决于jvm虚拟机,只要jvm愿意,可以选择类似go使用协程来实现线程。但以市场占有率最大的HotSpot虚拟机举例,一个java线程都是直接映射到操作系统的原生线程来实现的,所有的线程调度都是由操作系统完成的。

原子类

volatile 关键字可以实现变量的可见性,但不提供原子性。

JAVA常见的原子类有 AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
public static void main(String[] args) {
AtomicInteger atomicInt = new AtomicInteger(0);
// 设置值
atomicInt.set(10);
// 获取值
System.out.println("Value: " + atomicInt.get());
// 原子加操作
System.out.println("Incremented: " + atomicInt.incrementAndGet());
// 比较并交换
boolean success = atomicInt.compareAndSet(15, 100);
System.out.println("CAS Success: " + success + ", New Value: " + atomicInt.get());
}
}
// 输出
Value: 10
Incremented: 11
CAS Success: false, New Value: 11

Golang

并发实现

Golang 在语言层面使用协程实现并发,Go内置协程调度器,能自动在协程阻塞时将协程挂起。Golang的线程是无阻塞的,这意味Golang线程不能使用线程阻塞的系统调用。

go 关键字自动启动一个协程运行函数任务。golang提供sync.WaitGroup等待协程运行完。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

func producer(ch chan<- int, wg sync.WaitGroup) {
for i := 0; i < 10; i++ {
item := rand.Intn(100)
fmt.Println("Produced:", item)
ch <- item // 发送数据到channel
time.Sleep(time.Second) // 模拟生产过程的延迟
}
close(ch)
wg.Done()
}

func consumer(ch <-chan int, wg sync.WaitGroup) {
for item := range ch {
fmt.Println("Consumed:", item)
time.Sleep(2 * time.Second) // 模拟消费过程的延迟
}
wg.Done()

}

func main() {
var wg sync.WaitGroup
// 创建一个缓冲区大小为 5 的channel
ch := make(chan int, 5)

// 启动多个消费者
for i := 0; i < 3; i++ {
go consumer(ch, wg)
wg.Add(1)
}

// 启动一个生产者
go producer(ch, wg)
wg.Add(1)

// 主程序运行一段时间后退出
time.Sleep(10 * time.Second)
}

// 输出
Produced: 63
Consumed: 63
Produced: 47
Consumed: 47
Produced: 47
Consumed: 47
Produced: 99
Consumed: 99
Produced: 51
Consumed: 51
...

select 语句可以等待接收多个channel的数据,只要有一个channel写入了数据,select就会执行处理函数并退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan int)

go func() {
ch <- 42
}()

select {
case msg := <-ch:
fmt.Println("Received:", msg)
case <-time.After(1 * time.Second): // 1秒后超时, select执行fmt.Println("Timeout!")
fmt.Println("Timeout!")
}
}
// 输出
Received: 42

sync.Mutex,互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"sync"
)

var (
counter int
mu sync.Mutex // 创建一个互斥锁
)

// 增加计数器的值
func increment() {
mu.Lock()
defer mu.Unlock()

counter++
}

func main() {
var wg sync.WaitGroup

// 启动多个 goroutine 来增加计数器
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
increment()
}()
}

// 等待所有 goroutine 执行完毕
wg.Wait()

fmt.Println("Final counter:", counter)
}

sync.RWMutex 读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"fmt"
"sync"
)

var (
data int
rwMutex sync.RWMutex
)

func read() int {
rwMutex.RLock()
defer rwMutex.RUnlock()
return data
}

func write(value int) {
rwMutex.Lock()
defer rwMutex.Unlock()
data = value
}

func main() {
var wg sync.WaitGroup

// 启动多个读 goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Printf("Reader %d: data = %d\n", i, read())
}(i)
}

// 启动一个写 goroutine
wg.Add(1)
go func() {
defer wg.Done()
write(42)
fmt.Println("Writer: data updated")
}()

wg.Wait() // 等待所有 goroutine 完成
}

sync.Once 是用来确保函数只执行一次,无论是多次调用还是多线程多次执行

sync.Cond 包提供了条件变量,执行条件变量wait前需要持有锁

1
2
3
4
5
6
7
8
9
10
11
12
func (b *Buffer) Produce(item int) {
b.lock.Lock()
defer b.lock.Unlock()

for len(b.data) == bufferSize {
b.cond.Wait()
}
b.data = append(b.data, item)
fmt.Println("Produced:", item)

b.cond.Signal()
}

原子操作

sync/atomic 包提供了多种原子操作函数,

AddInt32:原子地将一个 int32 值加上一个指定的值。

CompareAndSwapInt32:原子地比较并交换 int32 值。如果当前值等于预期值,则将其更改为新值。

LoadInt32:原子地加载一个 int32 值。

StoreInt32:原子地存储一个 int32 值。

Context 信息传递

Python

多线程和多进程

Python 的 Cpython有一个全局解释器锁(GIL)。这意味着在任何时刻,只有一个线程可以执行 Python 字节码。Python的多线程实际是单核的并行。

multiprocessing 和threading 模块使用。Python多进程的问题是,多进程之间是独立的地址空间,难以共享变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time

# 定义一个任务
def print_numbers():
for i in range(5):
print(i)
time.sleep(1)

# 创建线程
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_numbers)
# 创建进程
process1 = multiprocessing.Process(target=print_numbers)
process2 = multiprocessing.Process(target=print_numbers)

thread1.start()
thread2.start()
process1.start()
process2.start()

thread1.join()
thread2.join()
process1.join()
process2.join()

线程池

1
2
3
4
5
6
7
8
9
10
11
12
import concurrent.futures

def print_square(number):
print(f"Square of {number} is {number * number}")

# 创建一个线程池,最多允许 5 个线程同时执行
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务给线程池
executor.submit(print_square, 2)
executor.submit(print_square, 3)
executor.submit(print_square, 4)

线程锁,lock = threading.Lock()

生成器和协程

生成器。python生成器是一种无栈协程,可以实现手动切换执行流。普通函数可以 yield 语句返回生成器,普通函数执行到yield后会转向执行接收yield返回值的函数,当再执行next(),线程回到原先的函数继续执行。

使用next() 和 send() 方法可以控制生成器的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
def simple_gen():
value = yield "Ready" # 初始返回值
print("Received:", value)
yield "End"

gen = simple_gen()
print(next(gen)) # 第一次执行生成器,收到"Ready"
print(gen.send(42)) # 执行流再次切换到simple_gen,simple_gen执行到"End

# 输出
Ready
('Received:', 42)
End

asyncio 是python3提供的无栈协程模块,async 用于定义协程函数,await 用于暂停协程并等待另一个协程完成。async定义的协程函数中不能调用可能导致线程阻塞或者等待的函数,包括阻塞读写、线程锁、sleep等。

await 后面跟的是io耗时的操作,表示把线程从当前协程切换走;同时await会注册事件通知,当耗时的操作执行完时,调度器会回来再执行当前协程

await 同时维护了执行的先后顺序,对于async函数A await aysnc函数B,则必须等待B执行完函数A才会继续执行。这个执行逻辑避免了回调地狱,即对于回调函数链路A->B->C,可以直接在A里面写await B, B里面写 await C,代码更加简洁易懂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio

async def task_1():
print("Task 1 starts")
await asyncio.sleep(2)
print("Task 1 ends")

async def task_2():
print("Task 2 starts")
await task_1()
print("Task 2 ends")

# 并发执行两个任务
async def main():
await asyncio.gather(task_1(), task_2()) # 需等待所有task执行完

asyncio.run(main())

# 输出
Task 1 starts
Task 2 starts
Task 1 starts
Task 1 ends
Task 1 ends
Task 2 ends

golang协程是有栈协程,同时不需要显示指定await进行协程切换,调度器会自动发现io阻塞的调用并自动切换协程。golang执行回调函数只需要对无脑的执行go func(),缺点是降低了灵活性。