并发编程是提高程序处理能力的重要手段,也是编程语言必备的能力。并发的单位是函数,函数作为任务被多个并发计算对象执行。并发执行的函数任务可分为一次性任务和定时任务两种。任务可以由主线程直接分发给多线程,也可以借助线程安全的任务/消息队列分发任务。
C语言 pthread C语言中的多线程编程一般使用pthread库, pthread在linux系统调用上二次开发,实现了posix标准的线程接口。由于linux不区分线程和进程,线程只是一种共享内存、fd等资源的特殊进程,因此pthread本身也是多进程编程接口。
pthread支持的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 int pthread_create (pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg) ;void pthread_exit (void *retval) ;int pthread_join (pthread_t thread, void **retval) ;pthread_t pthread_self (void ) ;int pthread_mutex_init (pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) ;int pthread_mutex_lock (pthread_mutex_t *mutex) ;int pthread_mutex_unlock (pthread_mutex_t *mutex) ;int pthread_mutex_destroy (pthread_mutex_t *mutex) ;int pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr) ;int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex) ;int pthread_cond_signal (pthread_cond_t *cond) ;int pthread_cond_broadcast (pthread_cond_t *cond) ;int pthread_cond_destroy (pthread_cond_t *cond) ;int pthread_attr_init (pthread_attr_t *attr) ;int pthread_attr_setdetachstate (pthread_attr_t *attr, int detachstate) ;int pthread_attr_destroy (pthread_attr_t *attr) ;int pthread_setaffinity_np (pthread_t thread, size_t cpusetsize, const cpu_set_t *cpuset) ;int pthread_getaffinity_np (pthread_t thread, size_t cpusetsize, cpu_set_t *cpuset) ;int pthread_setschedparam (pthread_t thread, int policy, const struct sched_param *param) ;int pthread_getschedparam (pthread_t thread, int *policy, struct sched_param *param) ;
pthread利用锁和条件变量实现生产者消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #define BUFFER_SIZE 10 int buffer[BUFFER_SIZE];int in = 0 ; int out = 0 ; pthread_mutex_t mutex; pthread_cond_t empty; pthread_cond_t full; void * producer (void * arg) { int item; while (1 ) { item = rand () % 100 ; pthread_mutex_lock (&mutex); while ((in + 1 ) % BUFFER_SIZE == out) { pthread_cond_wait (&empty, &mutex); } buffer[in] = item; printf ("Producer produced: %d\n" , item); in = (in + 1 ) % BUFFER_SIZE; pthread_cond_signal (&full); pthread_mutex_unlock (&mutex); sleep (1 ); } return NULL ; } void * consumer (void * arg) { int item; while (1 ) { pthread_mutex_lock (&mutex); while (in == out) { pthread_cond_wait (&full, &mutex); } item = buffer[out]; printf ("Consumer consumed: %d\n" , item); out = (out + 1 ) % BUFFER_SIZE; pthread_cond_signal (&empty); pthread_mutex_unlock (&mutex); sleep (2 ); } return NULL ; } int main () { pthread_t producer_thread, consumer_thread; pthread_mutex_init (&mutex, NULL ); pthread_cond_init (&empty, NULL ); pthread_cond_init (&full, NULL ); pthread_create (&producer_thread, NULL , producer, NULL ); pthread_create (&consumer_thread, NULL , consumer, NULL ); pthread_join (producer_thread, NULL ); pthread_join (consumer_thread, NULL ); pthread_mutex_destroy (&mutex); pthread_cond_destroy (&empty); pthread_cond_destroy (&full); return 0 ; } Producer produced: 83 Consumer consumed: 83 Producer produced: 86 Consumer consumed: 86 Producer produced: 77 Producer produced: 15 Consumer consumed: 77 Producer produced: 93 ...
线程cpu绑核,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <sched.h> #include <unistd.h> void * thread_function (void * arg) { cpu_set_t mask; pthread_t thread = pthread_self (); int result = pthread_getaffinity_np (thread, sizeof (cpu_set_t ), &mask); if (result != 0 ) { perror ("pthread_getaffinity_np" ); exit (EXIT_FAILURE); } for (int i = 0 ; i < CPU_SETSIZE; i++) { if (CPU_ISSET (i, &mask)) { printf ("Thread can run on CPU core: %d\n" , i); } } return NULL ; } int main () { pthread_t thread; cpu_set_t cpuset; pthread_create (&thread, NULL , thread_function, NULL ); CPU_ZERO (&cpuset); CPU_SET (0 , &cpuset); CPU_SET (1 , &cpuset); int result = pthread_setaffinity_np (thread, sizeof (cpu_set_t ), &cpuset); if (result != 0 ) { perror ("pthread_setaffinity_np" ); exit (EXIT_FAILURE); } pthread_join (thread, NULL ); return 0 ; } Thread can run on CPU core: 0 Thread can run on CPU core: 1
线程调度策略, SCHED_FIFO:先来先服务(First-In-First-Out)实时调度, SCHED_RR:轮转调度(Round-Robin)实时调度,SCHED_OTHER默认的操作系统决定调度线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <sched.h> void * thread_function (void * arg) { printf ("Thread running\n" ); return NULL ; } int main () { pthread_t thread; struct sched_param param; int policy; pthread_create (&thread, NULL , thread_function, NULL ); param.sched_priority = 10 ; pthread_setschedparam (thread, SCHED_FIFO, ¶m); pthread_getschedparam (thread, &policy, ¶m); printf ("Thread policy: %d, Priority: %d\n" , policy, param.sched_priority); pthread_join (thread, NULL ); return 0 ; } Thread policy: 1 , Priority: 10 Thread running
原子类型 c语言可以直接使用gcc提供的内建原子操作函数, 这些函数保证操作的原子性、顺序性和可见性。
__sync_fetch_and_add:原子地将一个值加到目标变量,并返回加之前的值。
__sync_fetch_and_sub:原子地从目标变量减去一个值,并返回减之前的值。
__sync_lock_test_and_set:原子地设置一个值,并返回设置之前的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <stdio.h> #include <pthread.h> int counter = 0 ; void * increment (void * arg) { for (int i = 0 ; i < 1000 ; i++) { __sync_fetch_and_add(&counter, 1 ); } return NULL ; } int main () { pthread_t threads[10 ]; for (int i = 0 ; i < 10 ; i++) { pthread_create (&threads[i], NULL , increment, NULL ); } for (int i = 0 ; i < 10 ; i++) { pthread_join (threads[i], NULL ); } printf ("Counter value: %d\n" , counter); return 0 ; }
__thread, 是gcc提供的线程局部存储,只能用于全局和静态变量
C++ C++11的并发类 std::thread线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <iostream> #include <thread> void print_thread_id () { std::thread::id this_id = std::this_thread::get_id (); std::cout << "Thread ID: " << this_id << std::endl; } int main () { std::cout << "Main thread ID: " << std::this_thread::get_id () << std::endl; std::thread t (print_thread_id) ; t.join (); return 0 ; } Main thread ID: 140509241898816 Thread ID: 140509241894656
线程同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <iostream> #include <thread> #include <queue> #include <mutex> #include <condition_variable> #include <chrono> std::queue<int > buffer; std::mutex mtx; std::condition_variable cv; const int MAX_BUFFER_SIZE = 5 ; void producer () { for (int i = 0 ; i < 10 ; ++i) { std::this_thread::sleep_for (std::chrono::milliseconds (100 )); std::unique_lock<std::mutex> lock (mtx) ; cv.wait (lock, []() { return buffer.size () < MAX_BUFFER_SIZE; }); buffer.push (i); std::cout << "Produced: " << i << std::endl; cv.notify_all (); } } void consumer () { for (int i = 0 ; i < 10 ; ++i) { std::this_thread::sleep_for (std::chrono::milliseconds (150 )); std::unique_lock<std::mutex> lock (mtx) ; cv.wait (lock, []() { return !buffer.empty (); }); int item = buffer.front (); buffer.pop (); std::cout << "Consumed: " << item << std::endl; cv.notify_all (); } } int main () { std::thread producer_thread (producer) ; std::thread consumer_thread (consumer) ; producer_thread.join (); consumer_thread.join (); return 0 ; } Produced: 0 Consumed: 0 Produced: 1 Consumed: 1 Produced: 2 Produced: 3 Consumed: 2 Produced: 4 ...
异步任务
std::future和std::promise联用,用于线程间传递数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <iostream> #include <thread> #include <future> void set_value (std::promise<int >& p) { std::this_thread::sleep_for (std::chrono::seconds (2 )); p.set_value (42 ); } int main () { std::promise<int > p; std::future<int > f = p.get_future (); std::thread t (set_value, std::ref(p)) ; std::cout << "Doing some work in the main thread...\n" ; std::this_thread::sleep_for (std::chrono::seconds (1 )); int result = f.get (); std::cout << "The result from promise is: " << result << std::endl; t.join (); return 0 ; } Doing some work in the main thread... The result from promise is: 42
std::future和std::async联用,获取异步任务执行完成后的返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <iostream> #include <thread> #include <future> #include <chrono> int calculate_square (int x) { std::this_thread::sleep_for (std::chrono::seconds (2 )); return x * x; } int main () { std::future<int > result = std::async (std::launch::async, calculate_square, 5 ); std::cout << "Doing other work in main thread...\n" ; std::this_thread::sleep_for (std::chrono::seconds (1 )); int square = result.get (); std::cout << "The square of 5 is: " << square << std::endl; return 0 ; }
C++线程局部变量 全局变量,静态变量,局部变量只要是可能被多线程同时访问的,都是线程共享变量。thread_local 是 C++11 引入的关键字, 可以使用thread_local声明线程局部变量。相比__thread, thread_local可用于局部变量、全局变量、类成员变量等
线程局部变量没有数据竞争,其他线程对变量的修改也不会影响当前线程的变量值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <stdio.h> #include <pthread.h> thread_local int thread_local_var = 0 ; void * thread_function (void * arg) { thread_local_var++; printf ("Thread %ld: thread_local_var = %d\n" , (long )arg, thread_local_var); return NULL ; } int main () { pthread_t threads[3 ]; for (long i = 0 ; i < 3 ; i++) { pthread_create (&threads[i], NULL , thread_function, (void *)i); } for (int i = 0 ; i < 3 ; i++) { pthread_join (threads[i], NULL ); } return 0 ; } Thread 0 : thread_local_var = 1 Thread 1 : thread_local_var = 1 Thread 2 : thread_local_var = 1
C++11 的原子类 std::atomic提供了线程安全的原子操作。std::atomic 是一个模板类,支持不同的数据类型(如整数、指针)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <iostream> #include <atomic> #include <thread> std::atomic<int > counter (0 ) ;void increment (int id) { for (int i = 0 ; i < 10 ; ++i) { int expected = counter.load (); while (!counter.compare_exchange_strong (expected, expected + 1 )) { std::this_thread::yield (); } std::cout << "Thread " << id << " incremented counter to " << counter.load () << std::endl; } } int main () { std::thread t1 (increment, 1 ) ; std::thread t2 (increment, 2 ) ; t1. join (); t2. join (); std::cout << "Final counter value: " << counter.load () << std::endl; return 0 ; } Thread 1 incremented counter to 2 Thread 1 incremented counter to 3 Thread 1 incremented counter to 4 Thread 1 incremented counter to 5 Thread 1 incremented counter to 6 Thread 1 incremented counter to 7
内存序。内存序是为了保证多线程在多核CPU的可见性,是一种内存屏障。作用和JAVA的volatile类似。既然是可见性,就肯定是有的线程写,有的线程读。 如果只有写线程,没必要保证可见性。
memory_order_seq_cst 是默认的内存顺序,相当于JAVA的volatile,对原子类的修改操作会立刻刷到内存。
memory_order_acquire和load连用,memory_order_release和store连用。这个保证,如果某线程执行了store+memory_order_release的操作,在其他线程执行load+memory_order_acquire时,是可见的1 2 3 4 5 6 7 memory_order_relaxed:不保证任何顺序, 只提供原子性,只有写没有读可以使用 memory_order_consume:保证在该原子操作之后的所有依赖操作的顺序。 # 下面四个常用 memory_order_acquire:保证该操作之前的所有操作在该操作后执行。 memory_order_release:保证该操作之后的所有操作在该操作前执行。 memory_order_acq_rel:结合了 acquire 和 release 的效果。 memory_order_seq_cst:保证严格的顺序一致性,即原子类变量多线程并发执行和单线程循环执行顺序完全一致
内存序本身不提供“线程必须等待其他线程写了数据,才可以读”这样的同步语义,需要使用一个load循环,直到load数据。它能保证的是,只要某个线程store了数据,其他线程立刻可以load到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <atomic> #include <thread> std::atomic<bool > ready (false ) ;std::atomic<int > data (0 ) ;void producer () { data.store (42 , std::memory_order_relaxed); ready.store (true , std::memory_order_release); } void consumer () { while (!ready.load (std::memory_order_acquire)) { std::this_thread::yield (); } std::cout << "Consumer read data: " << data.load (std::memory_order_relaxed) << std::endl; } int main () { std::thread t1 (producer) ; std::thread t2 (consumer) ; t1. join (); t2. join (); return 0 ; }
C++的lambda表达式和函数类型 JAVA JAVA 并发类 Thread类,继承 Thread 类并重写其 run() 方法,实现多线程。不需要import 库
1 2 3 4 5 6 7 8 9 10 11 12 13 class MyThread extends Thread { public void run () { System.out.println("Thread is running..." ); } } public class Main { public static void main (String[] args) { MyThread t1 = new MyThread (); t1.start(); } }
实现 Runnable 接口并重写 run() 方法, 实现多线程。不需要import 库
1 2 3 4 5 6 7 8 9 10 11 12 class MyRunnable implements Runnable { public void run () { System.out.println ("Runnable thread is running..." ); } } public class Main { public static void main (String[] args) { Thread t1 = new Thread (new MyRunnable ()); t1. start (); } }
Callable和FutureTask接口,与 Runnable 类似,但可以返回结果和抛出异常。需要import java.util.concurrent.Callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import java.util.concurrent.Callable;import java.util.concurrent.FutureTask;class MyCallable implements Callable <Integer> { public Integer call () throws Exception { return 42 ; } } public class Main { public static void main (String[] args) throws Exception { FutureTask<Integer> futureTask = new FutureTask <>(new MyCallable ()); Thread t1 = new Thread (futureTask); t1.start(); System.out.println("Result: " + futureTask.get()); } }
线程池类, Executor 和 ExecutorService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Main { public static void main (String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2 ); executor.execute(() -> System.out.println("Task 1" )); executor.execute(() -> System.out.println("Task 2" )); executor.shutdown(); } } Task 1 Task 2
CountDownLatch, 倒计时计数器,用于等待其他线程完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.CountDownLatch;public class Main { public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (3 ); Runnable task = () -> { System.out.println(Thread.currentThread().getName() + " finished task" ); latch.countDown(); }; new Thread (task).start(); new Thread (task).start(); new Thread (task).start(); latch.await(); System.out.println("All tasks finished" ); } }
ThreadLocal,线程独立变量
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Main { public static void main (String[] args) { ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0 ); Runnable task = () -> { threadLocal.set(threadLocal.get() + 1 ); System.out.println(Thread.currentThread().getName() + " value: " + threadLocal.get()); }; new Thread (task).start(); new Thread (task).start(); } }
synchronized 锁, Java 内置的关键字,可用于方法或代码块, 用于线程同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public synchronized void method () { System.out.println(Thread.currentThread().getName() + " is executing" ); } public void method () { synchronized (this ) { System.out.println(Thread.currentThread().getName() + " is executing" ); } } class Counter { private int count = 0 ; public synchronized void increment () { count++; } public synchronized int getCount () { return count; } } public class Main { public static void main (String[] args) throws InterruptedException { Counter counter = new Counter (); Thread t1 = new Thread (() -> { for (int i = 0 ; i < 1000 ; i++) counter.increment(); }); Thread t2 = new Thread (() -> { for (int i = 0 ; i < 1000 ; i++) counter.increment(); }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Final count: " + counter.getCount()); } }
并发安全结构 JAVA提供了一些并发安全的数据结构,位于java.util.concurrent包中
ConcurrentHashMap 线程安全的哈希表实现,支持高效的并发读写。它将内部数据分割为多个段,以减少锁竞争。
CopyOnWriteArrayList 线程安全的 List 实现
BlockingQueue 线程安全的队列接口,通常用于生产者-消费者模型。
java.util中的HashMap,ArrayList,LinkedList等都是非并发安全的
JAVA多线程和虚拟机 相比C/C++ 多线程通过pthread直接在linux操作系统中起进程,JAVA的多线程需要经过JAVA虚拟机这一层。
java虚拟机维护了JavaThread结构,JVM是C++实现的。所以JavaThread是C++定义的类。JavaThread维护了线程的状态,一个指针指向java.lang.Thread创建的对象(oop),另一个指针指向对应的操作系统创建的OSThread
java线程模型的实现取决于jvm虚拟机,只要jvm愿意,可以选择类似go使用协程来实现线程。但以市场占有率最大的HotSpot虚拟机举例,一个java线程都是直接映射到操作系统的原生线程来实现的,所有的线程调度都是由操作系统完成的。
原子类 volatile 关键字可以实现变量的可见性,但不提供原子性。
JAVA常见的原子类有 AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import java.util.concurrent.atomic.AtomicInteger;public class Main { public static void main (String[] args) { AtomicInteger atomicInt = new AtomicInteger (0 ); atomicInt.set(10 ); System.out.println("Value: " + atomicInt.get()); System.out.println("Incremented: " + atomicInt.incrementAndGet()); boolean success = atomicInt.compareAndSet(15 , 100 ); System.out.println("CAS Success: " + success + ", New Value: " + atomicInt.get()); } } Value: 10 Incremented: 11 CAS Success: false , New Value: 11
Golang 并发实现 Golang 在语言层面使用协程实现并发,Go内置协程调度器,能自动在协程阻塞时将协程挂起。Golang的线程是无阻塞的,这意味Golang线程不能使用线程阻塞的系统调用。
go 关键字自动启动一个协程运行函数任务。golang提供sync.WaitGroup等待协程运行完。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 package mainimport ( "fmt" "math/rand" "sync" "time" ) func producer (ch chan <- int , wg sync.WaitGroup) { for i := 0 ; i < 10 ; i++ { item := rand.Intn(100 ) fmt.Println("Produced:" , item) ch <- item time.Sleep(time.Second) } close (ch) wg.Done() } func consumer (ch <-chan int , wg sync.WaitGroup) { for item := range ch { fmt.Println("Consumed:" , item) time.Sleep(2 * time.Second) } wg.Done() } func main () { var wg sync.WaitGroup ch := make (chan int , 5 ) for i := 0 ; i < 3 ; i++ { go consumer(ch, wg) wg.Add(1 ) } go producer(ch, wg) wg.Add(1 ) time.Sleep(10 * time.Second) } Produced: 63 Consumed: 63 Produced: 47 Consumed: 47 Produced: 47 Consumed: 47 Produced: 99 Consumed: 99 Produced: 51 Consumed: 51 ...
select 语句可以等待接收多个channel的数据,只要有一个channel写入了数据,select就会执行处理函数并退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package mainimport ( "fmt" "time" ) func main () { ch := make (chan int ) go func () { ch <- 42 }() select { case msg := <-ch: fmt.Println("Received:" , msg) case <-time.After(1 * time.Second): fmt.Println("Timeout!" ) } } Received: 42
sync.Mutex,互斥锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package mainimport ( "fmt" "sync" ) var ( counter int mu sync.Mutex ) func increment () { mu.Lock() defer mu.Unlock() counter++ } func main () { var wg sync.WaitGroup for i := 0 ; i < 1000 ; i++ { wg.Add(1 ) go func () { defer wg.Done() increment() }() } wg.Wait() fmt.Println("Final counter:" , counter) }
sync.RWMutex 读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package mainimport ( "fmt" "sync" ) var ( data int rwMutex sync.RWMutex ) func read () int { rwMutex.RLock() defer rwMutex.RUnlock() return data } func write (value int ) { rwMutex.Lock() defer rwMutex.Unlock() data = value } func main () { var wg sync.WaitGroup for i := 0 ; i < 5 ; i++ { wg.Add(1 ) go func (i int ) { defer wg.Done() fmt.Printf("Reader %d: data = %d\n" , i, read()) }(i) } wg.Add(1 ) go func () { defer wg.Done() write(42 ) fmt.Println("Writer: data updated" ) }() wg.Wait() }
sync.Once 是用来确保函数只执行一次,无论是多次调用还是多线程多次执行
sync.Cond 包提供了条件变量,执行条件变量wait前需要持有锁
1 2 3 4 5 6 7 8 9 10 11 12 func (b *Buffer) Produce (item int ) { b.lock.Lock () defer b.lock.Unlock () for len (b.data) == bufferSize { b.cond.Wait () } b.data = append (b.data, item) fmt.Println ("Produced:" , item) b.cond.Signal () }
原子操作 sync/atomic 包提供了多种原子操作函数,
AddInt32:原子地将一个 int32 值加上一个指定的值。
CompareAndSwapInt32:原子地比较并交换 int32 值。如果当前值等于预期值,则将其更改为新值。
LoadInt32:原子地加载一个 int32 值。
StoreInt32:原子地存储一个 int32 值。
Context 信息传递 Python 多线程和多进程 Python 的 Cpython有一个全局解释器锁(GIL)。这意味着在任何时刻,只有一个线程可以执行 Python 字节码。Python的多线程实际是单核的并行。
multiprocessing 和threading 模块使用。Python多进程的问题是,多进程之间是独立的地址空间,难以共享变量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import threadingimport timedef print_numbers (): for i in range (5 ): print (i) time.sleep(1 ) thread1 = threading.Thread(target=print_numbers) thread2 = threading.Thread(target=print_numbers) process1 = multiprocessing.Process(target=print_numbers) process2 = multiprocessing.Process(target=print_numbers) thread1.start() thread2.start() process1.start() process2.start() thread1.join() thread2.join() process1.join() process2.join()
线程池
1 2 3 4 5 6 7 8 9 10 11 12 import concurrent.futuresdef print_square (number ): print (f"Square of {number} is {number * number} " ) with concurrent.futures.ThreadPoolExecutor(max_workers=5 ) as executor: executor.submit(print_square, 2 ) executor.submit(print_square, 3 ) executor.submit(print_square, 4 )
线程锁,lock = threading.Lock()
生成器和协程 生成器。python生成器是一种无栈协程,可以实现手动切换执行流。普通函数可以 yield 语句返回生成器,普通函数执行到yield后会转向执行接收yield返回值的函数,当再执行next(),线程回到原先的函数继续执行。
使用next() 和 send() 方法可以控制生成器的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 def simple_gen (): value = yield "Ready" print ("Received:" , value) yield "End" gen = simple_gen() print (next (gen)) print (gen.send(42 )) Ready ('Received:' , 42 ) End
asyncio 是python3提供的无栈协程模块,async 用于定义协程函数,await 用于暂停协程并等待另一个协程完成。async定义的协程函数中不能调用可能导致线程阻塞或者等待的函数,包括阻塞读写、线程锁、sleep等。
await 后面跟的是io耗时的操作,表示把线程从当前协程切换走;同时await会注册事件通知,当耗时的操作执行完时,调度器会回来再执行当前协程
await 同时维护了执行的先后顺序,对于async函数A await aysnc函数B,则必须等待B执行完函数A才会继续执行。这个执行逻辑避免了回调地狱,即对于回调函数链路A->B->C,可以直接在A里面写await B, B里面写 await C ,代码更加简洁易懂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioasync def task_1 (): print ("Task 1 starts" ) await asyncio.sleep(2 ) print ("Task 1 ends" ) async def task_2 (): print ("Task 2 starts" ) await task_1() print ("Task 2 ends" ) async def main (): await asyncio.gather(task_1(), task_2()) asyncio.run(main()) Task 1 starts Task 2 starts Task 1 starts Task 1 ends Task 1 ends Task 2 ends
golang协程是有栈协程,同时不需要显示指定await进行协程切换,调度器会自动发现io阻塞的调用并自动切换协程。golang执行回调函数只需要对无脑的执行go func()
,缺点是降低了灵活性。