并发编程是提高程序处理能力的重要手段,也是编程语言必备的能力。并发的对象是函数,函数作为任务被多个并发计算对象执行。
并发的单位可以是进程、线程和协程(用户态线程),并发需要保证的三个特点:原子性、顺序性和可见性。
原子性表示操作要么全部成功,要么全部失败,执行中不能被中断。原子性通常使用锁(包括悲观锁、乐观锁)保证。
顺序性表示任务执行的顺序与任务提交时的顺序一致。顺序性通常使用队列(先进先出)保证,常见的是是生产者-消费队列保证顺序性,例如golang的channel。好的顺序设计,可以保证每个阶段只有少量线程执行,减少锁的使用。减少共享内存的使用,多用顺序性的信号通知,减少锁争抢。在分布式系统中,面对ABA问题,还会对操作和共享内存增加版本号来保证顺序性。
可见性表示对共享内存的修改对其他线程可见。常见的是使用内存屏障(memory barrier)保证可见性,内存屏障是CPU提供的一种同步机制,保证CPU执行到内存屏障之前的指令都执行完成,再执行内存屏障之后的指令。C++原子类的内存序就是在保证可见性。
C语言 pthread C语言中的多线程编程一般使用pthread库, pthread(POSIX线程)库是用于在类Unix系统中实现多线程编程的API实现了posix标准的线程接口。
pthread支持的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include <pthread.h> int pthread_create (pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg) ;void pthread_exit (void *retval) ; int pthread_join (pthread_t thread, void **retval) ; pthread_t pthread_self (void ) ; int pthread_mutex_init (pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) ;int pthread_mutex_lock (pthread_mutex_t *mutex) ; int pthread_mutex_unlock (pthread_mutex_t *mutex) ; int pthread_mutex_destroy (pthread_mutex_t *mutex) ; int pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr) ;int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex) ;int pthread_cond_signal (pthread_cond_t *cond) ; int pthread_cond_broadcast (pthread_cond_t *cond) ; int pthread_cond_destroy (pthread_cond_t *cond) ; int pthread_attr_init (pthread_attr_t *attr) ;int pthread_attr_setdetachstate (pthread_attr_t *attr, int detachstate) ;int pthread_attr_destroy (pthread_attr_t *attr) ; int pthread_setaffinity_np (pthread_t thread, size_t cpusetsize, const cpu_set_t *cpuset) ;int pthread_getaffinity_np (pthread_t thread, size_t cpusetsize, cpu_set_t *cpuset) ;int pthread_setschedparam (pthread_t thread, int policy, const struct sched_param *param) ;int pthread_getschedparam (pthread_t thread, int *policy, struct sched_param *param) ;
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
需要传入锁, 表示条件变量等待函数需要先获取锁, 发现条件不满足, 才释放锁进入等待状态。这说明条件变量的设计目的是减少申请锁的冲突, 将不符合条件的线程进入等待,而不是让所有线程都竞争锁。(事实上锁竞争的线程也是忙等待,一般线程没有抢到锁会先执行一段时间的自旋等待,如果还没有抢到锁则进入锁等待队列, 释放锁的线程会唤醒处于锁等待状态的线程)
如果要单纯的等待-通知, 不用使用锁,可以使用信号
1 2 3 4 5 6 7 8 9 #include <sys/signalfd.h> sigset_t mask;sigaddset (&mask, SIGUSR1);int sfd = signalfd (-1 , &mask, SFD_NONBLOCK);struct signalfd_siginfo info;read (sfd, &info, sizeof (info));
pthread利用锁、条件变量和数组模拟的环形缓冲区实现生产者消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #define BUFFER_SIZE 10 int buffer[BUFFER_SIZE];int in = 0 ; int out = 0 ; pthread_mutex_t mutex; pthread_cond_t empty; pthread_cond_t full; void * producer (void * arg) { int item; while (1 ) { item = rand () % 100 ; pthread_mutex_lock (&mutex); while ((in + 1 ) % BUFFER_SIZE == out) { pthread_cond_wait (&empty, &mutex); } buffer[in] = item; printf ("Producer produced: %d\n" , item); in = (in + 1 ) % BUFFER_SIZE; pthread_cond_signal (&full); pthread_mutex_unlock (&mutex); sleep (1 ); } return NULL ; } void * consumer (void * arg) { int item; while (1 ) { pthread_mutex_lock (&mutex); while (in == out) { pthread_cond_wait (&full, &mutex); } item = buffer[out]; printf ("Consumer consumed: %d\n" , item); out = (out + 1 ) % BUFFER_SIZE; pthread_cond_signal (&empty); pthread_mutex_unlock (&mutex); sleep (2 ); } return NULL ; } int main () { pthread_t producer_thread, consumer_thread; pthread_mutex_init (&mutex, NULL ); pthread_cond_init (&empty, NULL ); pthread_cond_init (&full, NULL ); pthread_create (&producer_thread, NULL , producer, NULL ); pthread_create (&consumer_thread, NULL , consumer, NULL ); pthread_join (producer_thread, NULL ); pthread_join (consumer_thread, NULL ); pthread_mutex_destroy (&mutex); pthread_cond_destroy (&empty); pthread_cond_destroy (&full); return 0 ; } Producer produced: 83 Consumer consumed: 83 Producer produced: 86 Consumer consumed: 86 Producer produced: 77 Producer produced: 15 Consumer consumed: 77 Producer produced: 93 ...
线程cpu绑核,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <sched.h> #include <unistd.h> void * thread_function (void * arg) { cpu_set_t mask; pthread_t thread = pthread_self (); int result = pthread_getaffinity_np (thread, sizeof (cpu_set_t ), &mask); if (result != 0 ) { perror ("pthread_getaffinity_np" ); exit (EXIT_FAILURE); } for (int i = 0 ; i < CPU_SETSIZE; i++) { if (CPU_ISSET (i, &mask)) { printf ("Thread can run on CPU core: %d\n" , i); } } return NULL ; } int main () { pthread_t thread; cpu_set_t cpuset; pthread_create (&thread, NULL , thread_function, NULL ); CPU_ZERO (&cpuset); CPU_SET (0 , &cpuset); CPU_SET (1 , &cpuset); int result = pthread_setaffinity_np (thread, sizeof (cpu_set_t ), &cpuset); if (result != 0 ) { perror ("pthread_setaffinity_np" ); exit (EXIT_FAILURE); } pthread_join (thread, NULL ); return 0 ; } Thread can run on CPU core: 0 Thread can run on CPU core: 1
线程调度策略,
SCHED_FIFO:先来先服务(First-In-First-Out)实时调度,
SCHED_RR:轮转调度(Round-Robin)实时调度,
SCHED_OTHER默认的操作系统决定调度线程1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <sched.h> void * thread_function (void * arg) { printf ("Thread running\n" ); return NULL ; } int main () { pthread_t thread; struct sched_param param; int policy; pthread_create (&thread, NULL , thread_function, NULL ); param.sched_priority = 10 ; pthread_setschedparam (thread, SCHED_FIFO, ¶m); pthread_getschedparam (thread, &policy, ¶m); printf ("Thread policy: %d, Priority: %d\n" , policy, param.sched_priority); pthread_join (thread, NULL ); return 0 ; } Thread policy: 1 , Priority: 10 Thread running
原子操作 编程语言的赋值一般是原子的,但算术操作不是原子的(需要访存、操作、写回三个操作)。c语言可以直接使用gcc提供的内建原子操作函数, 原子函数在操作函数会加内存屏障,保证常见操作的原子性、可见性。
__sync_fetch_and_add, 原子地将一个值加到目标变量,返回旧值。
__sync_fetch_and_sub:原子地目标变量减去一个值,返回旧值。
__sync_lock_test_and_set:原子地设置一个值,返回旧值。
原子函数的实现原理
默认包含一个 完整的顺序一致性屏障(Full Memory Barrier),确保操作前的所有内存访问(读/写)在原子操作前完成:操作后的所有内存访问(读/写)在原子操作后开始。也就是是原子操作时可以读到最新的旧值, 原子操作后设置的新值其他线程可见
硬件支持,在 x86/x64 架构中,原子操作通常有对应的硬件指令,例如 xchg(Exchange)指令(带 lock 前缀),确保操作的原子性。
虽然赋值操作是原子的,但C和C++没有java volatile 关键字保证可见性, 因此赋值操作还是要用原子操作__sync_lock_test_and_set。java的volatile关键字可以保证赋值操作的可见性,无须加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <stdio.h> #include <pthread.h> int counter = 0 ; void * increment (void * arg) { for (int i = 0 ; i < 1000 ; i++) { __sync_fetch_and_add(&counter, 1 ); } return NULL ; } int main () { pthread_t threads[10 ]; for (int i = 0 ; i < 10 ; i++) { pthread_create (&threads[i], NULL , increment, NULL ); } for (int i = 0 ; i < 10 ; i++) { pthread_join (threads[i], NULL ); } printf ("Counter value: %d\n" , counter); return 0 ; }
GCC 4.7+ 引入了更灵活的 __atomic 系列函数,允许显式控制内存顺序:
1 2 type __atomic_exchange_n(type *ptr, type value, int memorder);
__thread, 是gcc提供的线程局部存储,每个线程操作自己的变量副本,互不影响。
只能用于全局和静态变量
静态初始化:变量必须在编译时初始化,且只能为 简单数据类型(如 int、float、指针),不支持动态初始化或复杂类型(如结构体、动态数组)。1 2 3 4 5 6 7 static __thread int thread_local_var; static __thread unsigned long task_count; void process_task () { task_count++; }
C++ C++11的并发类 C++11 提供的并发类主要包括
std::thread线程, 创建立即执行, 提供join(), detach(), get_id()等方。头文件#include <thread>
std::mutex互斥锁, 提供lock(), unlock(), try_lock(), std::lock_guard<std::mutex>
, std::unique_lock<std::mutex>
。头文件#include <mutex>
std::condition_variable 条件变量,提供wait(), notify_one(), notify_all()等方法, 头文件#include <condition_variable>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <iostream> #include <thread> void print_thread_id () { std::thread::id this_id = std::this_thread::get_id (); std::cout << "Thread ID: " << this_id << std::endl; } int main () { std::cout << "Main thread ID: " << std::this_thread::get_id () << std::endl; std::thread t (print_thread_id) ; t.join (); return 0 ; } Main thread ID: 140509241898816 Thread ID: 140509241894656
线程同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <iostream> #include <thread> #include <queue> #include <mutex> #include <condition_variable> #include <chrono> std::queue<int > buffer; std::mutex mtx; std::condition_variable cv; const int MAX_BUFFER_SIZE = 5 ; void producer () { for (int i = 0 ; i < 10 ; ++i) { std::this_thread::sleep_for (std::chrono::milliseconds (100 )); std::unique_lock<std::mutex> lock (mtx) ; cv.wait (lock, []() { return buffer.size () < MAX_BUFFER_SIZE; }); buffer.push (i); std::cout << "Produced: " << i << std::endl; cv.notify_all (); } } void consumer () { for (int i = 0 ; i < 10 ; ++i) { std::this_thread::sleep_for (std::chrono::milliseconds (150 )); std::unique_lock<std::mutex> lock (mtx) ; cv.wait (lock, []() { return !buffer.empty (); }); int item = buffer.front (); buffer.pop (); std::cout << "Consumed: " << item << std::endl; cv.notify_all (); } } int main () { std::thread producer_thread (producer) ; std::thread consumer_thread (consumer) ; producer_thread.join (); consumer_thread.join (); return 0 ; } Produced: 0 Consumed: 0 Produced: 1 Consumed: 1 Produced: 2 Produced: 3 Consumed: 2 Produced: 4 ...
异步任务
std::promise和std::future。允诺(Promise)对象,用于在线程中存储一个值或异常,供其他线程通过 future 获取。未来(Future)对象,用于从 promise 中异步获取结果。提供阻塞或非阻塞方式访问数据。头文件#include <future>
std::future和std::async, std::aync 直接传入函数, 返回一个future。该future可以通过get()获取函数返回值。std::async会自动创建异步执行任务
std::future和std::promise联用,用于线程间传递数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <iostream> #include <thread> #include <future> void set_value (std::promise<int >& p) { std::this_thread::sleep_for (std::chrono::seconds (2 )); p.set_value (42 ); } int main () { std::promise<int > p; std::future<int > f = p.get_future (); std::thread t (set_value, std::ref(p)) ; std::cout << "Doing some work in the main thread...\n" ; std::this_thread::sleep_for (std::chrono::seconds (1 )); int result = f.get (); std::cout << "The result from promise is: " << result << std::endl; t.join (); return 0 ; } Doing some work in the main thread... The result from promise is: 42
std::future和std::async联用,获取异步任务执行完成后的返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <iostream> #include <thread> #include <future> #include <chrono> int calculate_square (int x) { std::this_thread::sleep_for (std::chrono::seconds (2 )); return x * x; } int main () { std::future<int > result = std::async (std::launch::async, calculate_square, 5 ); std::cout << "Doing other work in main thread...\n" ; std::this_thread::sleep_for (std::chrono::seconds (1 )); int square = result.get (); std::cout << "The square of 5 is: " << square << std::endl; return 0 ; }
在并发处理上,C++20 提供了协程和std::jthread。jthread兼容 std::thread 接口和方法,提供了自动 Join:在析构时自动调用 join(),避免未回收线程导致程序终止(传统 std::thread 析构时若未 join() 或 detach() 会触发 std::terminate)。
C++20 还引入了std::latch, 用于等待线程完成(这种特性加入的也太晚了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <iostream> #include <latch> #include <thread> #include <vector> int main () { std::latch completion_latch (3 ) ; auto worker = [&](int id) { std::cout << "Worker " << id << " started\n" ; std::this_thread::sleep_for (std::chrono::seconds (1 )); completion_latch.count_down (); }; std::vector<std::jthread> threads; for (int i = 0 ; i < 3 ; ++i) { threads.emplace_back (worker, i + 1 ); } completion_latch.wait (); std::cout << "All workers completed!\n" ; return 0 ; }
C++17 中引入的 std::scoped_lock, 可以一次性锁住多个互斥量,避免死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 std::mutex mtx; void safe_function () { std::scoped_lock lock (mtx) ; } std::mutex mtx1, mtx2; void safe_function () { std::scoped_lock lock (mtx1, mtx2) ; }
智能指针 std::unique_ptr<T>
,创建对象时使用,具有对象的所有权。
std::shared_ptr<T>
,共享所有权, 使用引用计数维护对象生命周期, 在所有权不明确或对象析构时刻不明确时使用, 尽量使用unique_ptr
std::weak_ptr<T>
,弱指针,不维护对象生命周期。当对象处于观察者状态时使用,可通过尝试lock()函数判断对象是否存在,如果存在则访问。当对象A 持有对象B的shared_ptr,同时B又想持有对象A的shared_ptr时,为了避免循环引用,B应当持有A的weak_ptr 。
C++线程局部变量 thread_local 是 C++11 引入的关键字, 相比__thread, 前者只能修饰简单数据类型(如 int、float、指针),thread_local可用于局部变量、全局变量、类成员变量等。性能上两者均接近普通全局变量,无显著差异。
线程局部变量没有数据竞争,其他线程对变量的修改也不会影响当前线程的变量值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <stdio.h> #include <pthread.h> thread_local int thread_local_var = 0 ; void * thread_function (void * arg) { thread_local_var++; printf ("Thread %ld: thread_local_var = %d\n" , (long )arg, thread_local_var); return NULL ; } int main () { pthread_t threads[3 ]; for (long i = 0 ; i < 3 ; i++) { pthread_create (&threads[i], NULL , thread_function, (void *)i); } for (int i = 0 ; i < 3 ; i++) { pthread_join (threads[i], NULL ); } return 0 ; } Thread 0 : thread_local_var = 1 Thread 1 : thread_local_var = 1 Thread 2 : thread_local_var = 1 class MyClass {public : MyClass () { } ~MyClass () { } }; thread_local MyClass obj;
C++11 的原子类 std::atomic提供了线程安全的原子操作。std::atomic 是一个模板类,支持不同的数据类型(如整数、指针)。常用的方法
load() 读取原子变量的值。
store(x) 写入值到原子变量
fetch_add(x) 和 fetch_sub(x) 原子加减法操作,返回旧值
exchange(x) 将原子对象的值替换为给定值,并返回旧值
compare_exchange_weak(expected, desired) 和 compare_exchange_strong(expected, desired) 执行原子比较并交换(CAS,Compare and Swap)操作。expected是当前期望的值,desired是期望修改后的值。返回true表示更新成功,false表示未更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #include <iostream> #include <atomic> #include <thread> std::atomic<int > counter (0 ) ;void increment (int id) { for (int i = 0 ; i < 10 ; ++i) { int expected = counter.load (); while (!counter.compare_exchange_strong (expected, expected + 1 )) { std::this_thread::yield (); } std::cout << "Thread " << id << " incremented counter to " << counter.load () << std::endl; } } int main () { std::thread t1 (increment, 1 ) ; std::thread t2 (increment, 2 ) ; t1. join (); t2. join (); std::cout << "Final counter value: " << counter.load () << std::endl; return 0 ; } Thread 1 incremented counter to 2 Thread 1 incremented counter to 3 Thread 1 incremented counter to 4 Thread 1 incremented counter to 5 Thread 1 incremented counter to 6 Thread 1 incremented counter to 7
内存序和可见性 内存序是为了保证多线程在多核CPU的可见性 ,是一种内存屏障。作用和JAVA的volatile类似。
memory_order_seq_cst 是默认的内存顺序,相当于JAVA的volatile,对原子类的修改操作会立刻刷到内存。
memory_order_acquire和load连用,memory_order_release和store连用。这个保证,如果某线程执行了store+memory_order_release的操作,在其他线程执行load+memory_order_acquire时,前面的store操作是可见的
由于C++没有类似java 的volatile关键字(C++的volatile 只是避免编译层的优化的重排序, 只有底层比较hack的代码告知编译期不要优化我hack的代码时才会用到, C++ 编程很少遇到),因此一旦涉及多线程共享变量,要么加锁,要么上原子类(一般不会手动使用内存屏障)。
memory_order_relaxed:不保证任何顺序,只提供原子性,一般不会使用
memory_order_consume:保证在该原子操作之后的所有依赖操作的顺序, 这个类似memory_order_acquire, 比 memory_order_acquire 更轻量,但适用范围更窄, 只保证依赖原子类的顺序。 以下常用
memory_order_acquire:保证acquire之后的操作实际也在该操作后执行,不会重排序到操作之前,常修饰load操作
memory_order_release:保证release之前的所有操作在该操作前执行,不会重排序到操作之后,常修饰store操作 store+memory_order_release 和load+memory_order_acquire 保证写之后读之前是有序的。写-读是一个pair。release-acquire之间不是原子类的操作, 也会保证有序性。
memory_order_acq_rel:同时具备获取和释放语义,用来修饰load+store操作,如fetch_add
memory_order_seq_cst:保证严格的顺序一致性,即原子类变量多线程并发执行和单线程循环执行顺序完全一致, 原子类默认内存序
内存序本身不提供“线程必须等待其他线程写了数据,才可以读”这样的同步语义,需要使用一个load循环,直到load数据。它能保证的是,只要某个线程store了数据,其他线程立刻可以load到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <atomic> #include <thread> std::atomic<bool > ready (false ) ;std::atomic<int > data (0 ) ;void producer () { data.store (42 , std::memory_order_relaxed); ready.store (true , std::memory_order_release); } void consumer () { while (!ready.load (std::memory_order_acquire)) { std::this_thread::yield (); } std::cout << "Consumer read data: " << data.load (std::memory_order_relaxed) << std::endl; } int main () { std::thread t1 (producer) ; std::thread t2 (consumer) ; t1. join (); t2. join (); return 0 ; }
C++的lambda表达式和函数类型 Lambda 是一种匿名闭包对象,每个 Lambda 表达式的类型都是唯一的闭包类型,无法显式声明,但可以通过 auto 或 decltype 推导。
std::function<ReturnType(Args...)>
可用来存储可调用对象,如函数对象,lambda表达式等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #include <functional> #include <vector> int main () { std::vector<std::function<void ()>> tasks; tasks.push_back ([]{ std::cout << "Task 1" ; }); tasks.push_back (&some_function); tasks.push_back (Functor{}); for (auto & task : tasks) { task (); } return 0 ; }
异常处理 C++异常处理基于三个关键字:try、catch、throw。
1 2 3 4 5 6 7 try { double result = divide (10 , 0 ); } catch (const std::runtime_error& e) { std::cerr << "Error: " << e.what () << std::endl; } catch (...) { std::cerr << "Unknown error!" << std::endl; }
异常处理的缺陷
栈展开Stack Unwinding,抛出异常时,编译器需生成回溯调用栈,逐个退出函数作用域的逻辑 ,调用局部对象的析构函数,直到找到匹配的 catch 块。
编译器需为每个 try 块生成额外的元数据(如异常表),增加二进制文件体积。
异常改变了代码的显式执行路径,错误处理逻辑分散在 catch 块中,而非就近处理。
C++推荐使用错误码来进行错误处理,无额外性能开销;错误处理需要保证资源释放
如果调用的函数可能抛出异常,那么调用者需要捕获,防止程序崩溃。
C++ noexcept关键字指定函数不抛异常,编译器可能为 noexcept 函数生成更精简的代码(无需准备栈展开逻辑)。析构函数和移动构造/赋值函数默认是noexcept函数。若 noexcept 函数意外抛异常,程序直接终止。
JAVA JAVA 并发类 Thread类,继承 Thread 类并重写其 run() 方法,实现多线程。JAVA原生支持, 不需要import 库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class MyThread extends Thread { public void run () { System.out.println("Thread is running..." ); } } public class Main { public static void main (String[] args) { MyThread t1 = new MyThread (); t1.start(); } } class MyRunnable implements Runnable { public void run () { System.out.println("Runnable thread is running..." ); } } public class Main { public static void main (String[] args) { Thread t1 = new Thread (new MyRunnable ()); t1.start(); } }
Callable和FutureTask接口,与 Runnable 类似,但可以返回结果和抛出异常。需要import java.util.concurrent.Callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import java.util.concurrent.Callable;import java.util.concurrent.FutureTask;class MyCallable implements Callable <Integer> { public Integer call () throws Exception { return 42 ; } } public class Main { public static void main (String[] args) throws Exception { FutureTask<Integer> futureTask = new FutureTask <>(new MyCallable ()); Thread t1 = new Thread (futureTask); t1.start(); System.out.println("Result: " + futureTask.get()); } }
线程池类, Executor 和 ExecutorService。C++标准库里没有线程池类, 需要引用第三方库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Main { public static void main (String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2 ); executor.execute(() -> System.out.println("Task 1" )); executor.execute(() -> System.out.println("Task 2" )); executor.shutdown(); } } Task 1 Task 2
CountDownLatch, latch计数器,用于等待其他线程完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.CountDownLatch;public class Main { public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (3 ); Runnable task = () -> { System.out.println(Thread.currentThread().getName() + " finished task" ); latch.countDown(); }; new Thread (task).start(); new Thread (task).start(); new Thread (task).start(); latch.await(); System.out.println("All tasks finished" ); } }
ThreadLocal,线程内部变量
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Main { public static void main (String[] args) { ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0 ); Runnable task = () -> { threadLocal.set(threadLocal.get() + 1 ); System.out.println(Thread.currentThread().getName() + " value: " + threadLocal.get()); }; new Thread (task).start(); new Thread (task).start(); } }
synchronized 锁, Java 内置的关键字,可用于方法或代码块, 用于线程同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public synchronized void method () { System.out.println(Thread.currentThread().getName() + " is executing" ); } public void method () { synchronized (this ) { System.out.println(Thread.currentThread().getName() + " is executing" ); } } class Counter { private int count = 0 ; public synchronized void increment () { count++; } public synchronized int getCount () { return count; } } public class Main { public static void main (String[] args) throws InterruptedException { Counter counter = new Counter (); Thread t1 = new Thread (() -> { for (int i = 0 ; i < 1000 ; i++) counter.increment(); }); Thread t2 = new Thread (() -> { for (int i = 0 ; i < 1000 ; i++) counter.increment(); }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Final count: " + counter.getCount()); } }
java提供volatile关键字保证变量的可见性, volatile关键字实现如果某线程只对变量进行赋值或读取操作,操作时不需要加锁 。对于volatile使用场景, C++一般要使用原子类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class WorkerThread extends Thread { private volatile boolean running = true ; public void stopTask () { running = false ; } @Override public void run () { while (running) { } } } public class Singleton { private static volatile Singleton instance; public static Singleton getInstance () { if (instance == null ) { synchronized (Singleton.class) { if (instance == null ) { instance = new Singleton (); } } } return instance; } }
线程安全数据结构 JAVA提供了一些并发安全的数据结构,位于java.util.concurrent包中。C++标准库不提供线程安全结构, 需要第三方库。
ConcurrentHashMap 线程安全的哈希表实现,支持高效的并发读写。它将内部数据分割为多个段,每个段一个锁,可以减少锁竞争。
CopyOnWriteArrayList 线程安全的 List 实现
BlockingQueue 线程安全的队列接口,通常用于生产者-消费者模型。
java.util中的HashMap,ArrayList,LinkedList等都是非并发安全的
JAVA多线程和虚拟机 相比C/C++ 多线程通过pthread直接在linux操作系统中起进程,JAVA的多线程需要经过JAVA虚拟机这一层。
java虚拟机维护了JavaThread结构,JVM是C++实现的。所以JavaThread是C++定义的类。JavaThread维护了线程的状态,一个指针指向java.lang.Thread创建的对象(oop),另一个指针指向对应的操作系统创建的OSThread
java线程模型的实现取决于jvm虚拟机,只要jvm愿意,可以选择类似go使用协程来实现线程。但以市场占有率最大的HotSpot虚拟机举例,一个java线程都是直接映射到操作系统的原生线程来实现的,所有的线程调度都是由操作系统完成的。
原子类 volatile 关键字可以实现变量的可见性,但不提供原子性。 JAVA常见的原子类有 AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference
.set()设置值, .get()获取值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import java.util.concurrent.atomic.AtomicInteger;public class Main { public static void main (String[] args) { AtomicInteger atomicInt = new AtomicInteger (0 ); atomicInt.set(10 ); System.out.println("Value: " + atomicInt.get()); System.out.println("Incremented: " + atomicInt.incrementAndGet()); boolean success = atomicInt.compareAndSet(15 , 100 ); System.out.println("CAS Success: " + success + ", New Value: " + atomicInt.get()); } } Value: 10 Incremented: 11 CAS Success: false , New Value: 11
异常处理 java 的推荐使用异常处理,因此函数调用者需要通过try-catch-finally结构来捕获异常,防止程序崩溃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 try { FileInputStream file = new FileInputStream ("data.txt" ); } catch (FileNotFoundException e) { System.err.println("文件未找到: " + e.getMessage()); } catch (IOException e) { e.printStackTrace(); } finally { if (file != null ) { try { file.close(); } catch (IOException e) { e.printStackTrace(); } } }
java 函数可以加throws关键字,声明该方法可能抛出的 受检异常(Checked Exceptions),函数调用者必须处理异常,否则编译器报错
1 2 3 4 public void readFile (String path) throws FileNotFoundException, IOException { FileInputStream file = new FileInputStream (path); }
Golang 并发实现 Golang 在语言层面使用协程实现并发,Go内置协程调度器,能自动在协程阻塞时将协程挂起。Golang的线程是无阻塞的,这意味Golang线程不能使用线程阻塞的系统调用。
GMP调度器, Go调度器基于 G(Goroutine)、M(Machine)、P(Processor) 三者的协作:
G(Goroutine):轻量级协程,初始栈仅2KB(可动态扩缩),创建和切换成本极低。
M(Machine):对应操作系统的线程(OS Thread),负责实际执行代码。
P(Processor):逻辑处理器(调度器),管理一组本地队列(存储待运行的Goroutines)。P的数量默认等于CPU核心数(可通过GOMAXPROCS调整)。
Goroutine 的创建与执行
每个协程(G)由Go运行时创建,并被分配到某个P的本地队列。
M需要绑定一个P才能执行Goroutines。
当P的本地队列为空时,会从全局队列或其他P的队列中窃取Goroutines(Work Stealing)。
Golang的线程不会执行任何阻塞的系统调用, 当协程阻塞时,协程主动让出执行权,M会释放绑定的P,P转去服务其他M。系统调用完成后,G尝试获取新的P继续执行(若无可用P,则G进入全局队列)。
Go运行时有专门的网络轮询器(NetPoller),将阻塞的G挂起,待IO就绪后唤醒,避免占用M。
一个Go程序可轻松创建数十万个Goroutines,而同等规模的OS线程会耗尽资源。M的数量由调度器动态管理(通常远少于Goroutines的数量),避免频繁创建/销毁OS线程。
golang提供sync.WaitGroup等待协程运行完, 相当于latch。golang的无缓冲channel 可以用来实现阻塞-等待,有缓冲channel可以用来当任务队列。golang利用channel可以实现顺序性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 package mainimport ( "fmt" "math/rand" "sync" "time" ) func producer (ch chan <- int , wg sync.WaitGroup) { for i := 0 ; i < 10 ; i++ { item := rand.Intn(100 ) fmt.Println("Produced:" , item) ch <- item time.Sleep(time.Second) } close (ch) wg.Done() } func consumer (ch <-chan int , wg sync.WaitGroup) { for item := range ch { fmt.Println("Consumed:" , item) time.Sleep(2 * time.Second) } wg.Done() } func main () { var wg sync.WaitGroup ch := make (chan int , 5 ) for i := 0 ; i < 3 ; i++ { go consumer(ch, wg) wg.Add(1 ) } go producer(ch, wg) wg.Add(1 ) time.Sleep(10 * time.Second) } Produced: 63 Consumed: 63 Produced: 47 Consumed: 47 Produced: 47 Consumed: 47 Produced: 99 Consumed: 99 Produced: 51 Consumed: 51 ...
for range语句可以用来遍历channel的数组,直到channel被close。select 语句可以等待接收多个channel的数据,只要有一个channel写入了数据,select就会执行处理函数并退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package mainimport ( "fmt" "time" ) func main () { ch := make (chan int ) go func () { ch <- 42 }() select { case msg := <-ch: fmt.Println("Received:" , msg) case <-time.After(1 * time.Second): fmt.Println("Timeout!" ) } } Received: 42
sync.Mutex,互斥锁。sync.RWMutex 读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 package mainimport ( "fmt" "sync" ) var ( counter int mu sync.Mutex ) func increment () { mu.Lock() defer mu.Unlock() counter++ } func main () { var wg sync.WaitGroup for i := 0 ; i < 1000 ; i++ { wg.Add(1 ) go func () { defer wg.Done() increment() }() } wg.Wait() fmt.Println("Final counter:" , counter) } import ( "fmt" "sync" ) var ( data int rwMutex sync.RWMutex ) func read () int { rwMutex.RLock() defer rwMutex.RUnlock() return data } func write (value int ) { rwMutex.Lock() defer rwMutex.Unlock() data = value } func main () { var wg sync.WaitGroup for i := 0 ; i < 5 ; i++ { wg.Add(1 ) go func (i int ) { defer wg.Done() fmt.Printf("Reader %d: data = %d\n" , i, read()) }(i) } wg.Add(1 ) go func () { defer wg.Done() write(42 ) fmt.Println("Writer: data updated" ) }() wg.Wait() }
sync.Once 确保函数只执行一次,无论是多次调用还是多线程多次执行。可用来实现单例模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import "sync" type Singleton struct { } var ( instance *Singleton once sync.Once ) func GetInstance () *Singleton { once.Do(func () { instance = &Singleton{ } }) return instance }
sync.Cond 条件变量,执行条件变量wait前需要持有锁
1 2 3 4 5 6 7 8 9 10 11 12 func (b *Buffer) Produce (item int ) { b.lock.Lock () defer b.lock.Unlock () for len (b.data) == bufferSize { b.cond.Wait () } b.data = append (b.data, item) fmt.Println ("Produced:" , item) b.cond.Signal () }
原子操作 sync/atomic 包提供了多种原子操作函数,
AddInt32, func AddInt32(addr *int32, delta int32) (new int32)
原子地将一个 int32 值加上一个指定的值。返回操作后的 新值
CompareAndSwapInt32, func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
原子地比较并交换 int32 值。如果当前值等于预期值,则将其更改为新值。返回是否更改
LoadInt32, func LoadInt32(addr *int32) (val int32)
原子地读取一个 int32 值。
StoreInt32, func StoreInt32(addr *int32, val int32)
原子地写入一个 int32 值。
例子, 无锁计数器
1 2 3 4 5 6 7 8 9 var counter int64 func increment () { atomic.AddInt64(&counter, 1 ) } func get () int64 { return atomic.LoadInt64(&counter) }
自旋锁
1 2 3 4 5 6 7 8 9 10 11 var lock int32 func acquireLock () { for !atomic.CompareAndSwapInt32(&lock, 0 , 1 ) { } } func releaseLock () { atomic.StoreInt32(&lock, 0 ) }
Context 协程生命周期管理 channel用来协程间通信,而Context主要用来管理协程的生命周期。
异常处理 panic用来抛出异常,recover用来捕获异常。 panic,
运行时错误自动触发:例如数组越界、空指针解引用等。
主动调用 panic(v)
:开发者可手动抛出任意类型的值(通常是 error
或 string
)。
行为
立即终止当前函数的执行。
逐层向上回溯调用栈,执行每个函数的 defer
语句。
若未被 recover
捕获,最终程序崩溃 并打印堆栈信息。
recover:捕获异常
必须在 defer 函数中调用:recover 仅在 defer 上下文中生效。
仅在发生 panic 后生效:若无 panic,recover 返回 nil。
Go 的设计者觉得 try/catch
机制的使用太泛滥了,而且从底层向更高的层级抛异常太耗费资源 。他们给 Go 设计的机制也可以“捕捉”异常,但是更轻量,并且只应该作为(处理错误的)最后的手段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import "fmt" func riskyOperation () { panic ("something went wrong!" ) } func safeOperation () { defer func () { if r := recover (); r != nil { fmt.Println ("Recovered from panic:" , r) } }() riskyOperation () fmt.Println ("This line will NOT be executed" ) } func main () { safeOperation () fmt.Println ("Program continues normally" ) } Recovered from panic: something went wrong! Program continues normally
golang推荐使用error错误码来处理错误,产生错误的函数会返回两个变量,一个值和一个错误码;如果后者是 nil
就是成功,非 nil
就是发生了错误。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 if value, err := pack1.Func1(param1); err != nil { fmt.Printf("Error %s in pack1.Func1 with parameter %v" , err.Error(), param1) return } else { } type error interface { Error() string } type MyError struct { Code int Message string Details map [string ]interface {} } func (e *MyError) Error() string { return fmt.Sprintf("code=%d, msg=%s" , e.Code, e.Message) } func ProcessRequest () error { return &MyError{ Code: 400 , Message: "invalid request" , Details: map [string ]interface {}{"field" : "username" }, } }
Python 多线程和多进程 Python 的 Cpython有一个全局解释器锁(GIL)。这意味着在任何时刻,只有一个线程可以执行 Python 字节码。这简化了python的多线程管理,GIL单条字节码指令是原子操作。向x = 42, shared_list.append(item) 是线程安全的。但这让Python的多线程只能利用到单核。
虽然Python可以使用多进程,但多进程之间是独立的地址空间,难以访问共享变量,使用比较难受。因此threading 多线程模块是python常用的并发模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import threadingimport timedef print_numbers (): for i in range (5 ): print (i) time.sleep(1 ) thread1 = threading.Thread(target=print_numbers) thread2 = threading.Thread(target=print_numbers) process1 = multiprocessing.Process(target=print_numbers) process2 = multiprocessing.Process(target=print_numbers) thread1.start() thread2.start() process1.start() process2.start() thread1.join() thread2.join() process1.join() process2.join()
线程锁和条件变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import threadingclass Counter : def __init__ (self ): self .value = 0 self .lock = threading.Lock() def increment (self ): with self .lock: self .value += 1 cond = threading.Condition() shared_data = [] def producer (): with cond: shared_data.append(42 ) cond.notify() def consumer (): with cond: while not shared_data: cond.wait() print ("Received:" , shared_data.pop())
线程池,concurrent.futures.ThreadPoolExecutor
1 2 3 4 5 6 7 8 9 10 11 import concurrent.futuresdef print_square (number ): print (f"Square of {number} is {number * number} " ) with concurrent.futures.ThreadPoolExecutor(max_workers=5 ) as executor: executor.submit(print_square, 2 ) executor.submit(print_square, 3 ) executor.submit(print_square, 4 )
生成器和协程 生成器。python生成器是一种无栈协程,可以实现手动切换执行流。普通函数可以 yield 语句返回生成器,普通函数执行到yield后会转向执行接收yield返回值的函数,当再执行next(),线程回到原先的函数继续执行。
使用next() 和 send() 方法可以控制生成器的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 def simple_gen (): value = yield "Ready" print ("Received:" , value) yield "End" gen = simple_gen() print (next (gen)) print (gen.send(42 )) Ready ('Received:' , 42 ) End
asyncio 是python3提供的无栈协程模块,内部实现了事件循环。
async 用于定义协程函数,await 用于暂停协程并等待另一个协程完成。async定义的协程函数中不能调用可能导致线程阻塞或者等待的函数,包括阻塞读写、线程锁、sleep等。
await 后面跟的是io耗时的操作,表示把线程从当前协程切换走;同时await会注册事件通知,当耗时的操作执行完时,调度器会回来再执行当前协程
await 同时维护了执行的先后顺序,对于async函数A await aysnc函数B,则必须等待B执行完函数A才会继续执行。这个执行逻辑避免了回调地狱,即对于回调函数链路A->B->C,可以直接在A里面写await B, B里面写 await C ,代码更加简洁易懂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioasync def task_1 (): print ("Task 1 starts" ) await asyncio.sleep(2 ) print ("Task 1 ends" ) async def task_2 (): print ("Task 2 starts" ) await task_1() print ("Task 2 ends" ) async def main (): await asyncio.gather(task_1(), task_2()) asyncio.run(main()) Task 1 starts Task 2 starts Task 1 starts Task 1 ends Task 1 ends Task 2 ends