基础概念 避免多个线程同时访问并修改同一块内存,造成数据冲突。线程同步的结果是,如果多个线程都要访问并修改同一块内存时,依次只让一个进程进行访问,其余进程阻塞,直到所有进程执行完毕。 阻塞:意味着失去CPU时间片
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include <stdio.h> #include <unistd.h> #include <pthread.h> #define MAX 50 int number = 0 ;void * ThreadWorkingA (void * arg) { for (int i=0 ; i < MAX; i++){ int cur = number; cur++; usleep(10 ); number = cur; printf ("thread A id = %ld, nummber = %d\n" , pthread_self(), number); } return NULL ; }void * ThreadWorkingB (void * arg) { for (int i=0 ; i < MAX; i++){ int cur = number; cur++; usleep(10 ); number = cur; printf ("thread B id = %ld, nummber = %d\n" , pthread_self(), number); } return NULL ; }int main () { pthread_t p1, p2; pthread_create(&p1, NULL , ThreadWorkingA, NULL ); pthread_create(&p2, NULL , ThreadWorkingB, NULL ); pthread_join(p1, NULL ); pthread_join(p2, NULL ); return 0 ; }
程序运行时,每次都不能运行到number=100,这是因为线程A修改了cur数据之后,CPU被挂起,导致没有及时修改number数据,不能及时更新到物理内存中,线程B此时从物理内存中读取到的数据不是最新状态,只能从旧数据开始累加。
线程同步方式 多个线程访问共享资源的时候,很容易出现问题,因此需要进行线程同步,常用的线程同步方式有四种:
互斥锁
读写锁
条件变量
信号量
共享资源/临界资源:
确定临界资源之后,与临界资源相关的上下文代码块被称为临界区,确定好临界区之后,就可以进行线程同步了。
在临界区代码块起始位置,添加加锁函数,对临界区上锁
在临界区代码块的结束位置,添加解锁函数,对临界区解锁
通过锁机制可以保证临界区代码最多只能同时有一个线程访问,即并行访问变成了串行访问
互斥锁 互斥锁类型
创建的锁对象中保存了这把锁的状态信息:锁定还是打开,如果是锁定状态还记录了给这把锁加锁的线程信息(线程ID)。一个互斥锁变量,只能被一个线程锁定,被锁定之后,其他线程再对互斥锁变量加锁,就会被阻塞,直到这把锁被解锁。一般情况下,一个共享资源对应一把互斥锁,锁的个数和线程个数无关。
互斥锁初始化 1 int pthread_mutex_init (pthread_mutex_t *resrict mutex, const pthread_mutexattr_t *restrict attr) ;
restrict
关键字,用来修饰指针,只有这个关键字修饰的指针可以访问指向的内存空间,其他指针不行
参数:
mutex:互斥锁变量的地址
attr:互斥锁的属性,一般使用默认属性,指定为NULL
返回值
释放互斥锁资源 1 int pthread_mutex_destroy (pthread_mutex_t *mutex) ;
参数:
返回值:
加锁 1 int pthread_mutex_lock (pthread_mutex_t *mutex) ;
这个函数的调用,首先会判断参数mutex互斥锁中的状态是不是锁定状态:
如果这个锁没有被锁定,是打开状态,则这个线程可以加锁成功,并在这个锁中记录是哪个线程加锁成功了
如果这个锁处于被锁定状态,还没有被开锁,那其他线程用这个锁的时候,就会加锁失败,并阻塞在这把锁上
当这把锁被开锁之后,那些阻塞在这把锁上的线程就解除阻塞,并通过竞争的方式抢占这把锁,没抢到的线程继续阻塞
尝试加锁 1 int pthread_mutex_trylock (pthread_mutex_t *mutex) ;
调用这个函数对互斥锁变量加锁有两种情况:
如果这把锁没有被锁定,是打开的,则线程加锁成功
如果锁变量被锁住了,加锁失败,直接返回错误码,并且调用这个函数加锁的线程不会被阻塞
解锁 1 int pthread_mutex_unlock (pthread_mutex_t *mutex) ;
不是所有线程都可以对互斥锁解锁,而是哪个线程加的锁,哪个线程才能解锁成功。
互斥锁使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 #include <stdio.h> #include <unistd.h> #include <pthread.h> #include <sys/time.h> #define MAX 50 int number = 0 ;pthread_mutex_t mutex;void * ThreadWorkingA (void * arg) { for (int i=0 ; i < MAX; i++){ pthread_mutex_lock(&mutex); int cur = number; cur++; number = cur; printf ("thread A id = %ld, nummber = %d\n" , pthread_self(), number); pthread_mutex_unlock(&mutex); usleep(5 ); } return NULL ; }void * ThreadWorkingB (void * arg) { for (int i=0 ; i < MAX; i++){ pthread_mutex_lock(&mutex); int cur = number; cur++; usleep(10 ); number = cur; printf ("thread B id = %ld, nummber = %d\n" , pthread_self(), number); pthread_mutex_unlock(&mutex); } return NULL ; }int main () { pthread_t p1, p2; struct timeval start , end ; gettimeofday(&start, NULL ); pthread_mutex_init(&mutex, NULL ); pthread_create(&p1, NULL , ThreadWorkingA, NULL ); pthread_create(&p2, NULL , ThreadWorkingB, NULL ); pthread_join(p1, NULL ); pthread_join(p2, NULL ); pthread_mutex_destroy(&mutex); gettimeofday(&end, NULL ); long seconds = end.tv_sec - start.tv_sec; long micros = seconds * 1000000 + end.tv_usec - start.tv_usec; printf ("Elapsed time: %ld microseconds\n" , micros); return 0 ; }
加锁前:Elapsed time: 5095 microseconds 加锁后:Elapsed time: 10915 microseconds
死锁 死锁:所有线程都被阻塞,并且线程的阻塞无法解开(可以解锁的线程也被阻塞了)。 造成死锁的场景如下:
加锁之后忘记解锁;
重复加锁;
程序中有多个共享资源,互相加锁,导致互相被阻塞
解决方法:
对共享资源访问完之后,一定要解锁,或者在加锁的时候使用trylock()
如果程序中有多把锁,可以控制对锁的访问顺序,或者在对其他互斥锁进行加锁操作之前,先释放当前线程拥有的互斥锁
引入专门用于死锁检测的模块
读写锁 读写锁是互斥锁的升级版,在做读操作的时候可以提高程序的执行效率,如果所有的线程都是读操作,那么读是并行的。
读锁是共享的
写锁是独占的
使用读写锁分别对两个临界区加了读锁和写锁,两个线程要同时访问这两个临界区,访问写锁临界区的线程继续运行,访问读锁的临界区的线程阻塞。因为写锁比读锁的优先级高。
读写锁类型 1 pthread_rwlock_t rwlock;
读写锁中保存了这把锁的状态信息:
锁的状态:锁定/打开
锁定的是什么操作:读操作/写操作,使用读写锁锁定了写操作之后,需要先解锁才能执行写操作,反之亦然。不能使用一把锁同时进行读锁定和写锁定。
哪个线程将这把锁锁上的
读写锁初始化 1 int pthread_rwlock_init (pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr) ;
参数:
rwlock:读写锁的地址,传出参数
attr:读写锁属性,一般使用默认属性,指定为NULL
释放读写锁占用的系统资源 1 int pthread_rwlock_destroy (pthread_rwlock_t *rwlock) ;
参数:
返回值:
读操作加锁 1 int pthread_rwlock_rdlock (pthread_rwlock_t *rwlock) ;
调用该函数的时候,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用该函数依然可以加锁成功,因为读锁是共享的;但是如果读写锁已经锁定了写操作,那么调用这个函数的线程就会被阻塞。
写操作加锁 1 int pthread_rwlock_wrlock (pthread_rwlock_t *rwlock) ;
调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作或者写操作,调用这个函数的线程就会被阻塞。
尝试读操作加锁 1 int pthread_rwlock_tryrdlock (pthread_rwlock_t *rwlock) ;
这个函数可以有效的避免死锁,如果加读锁失败,不会阻塞当前线程,直接返回错误号。 调用该函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功;如果读写锁已经锁定了写操作,调用这个函数加锁失败,对应的线程不会被阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。
尝试写操作加锁 1 int pthread_rwlock_trywrlock (pthread_rwlock_t *rwlock) ;
调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作或者写操作,调用这个函数加锁失败,但是线程不会阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。
读/写锁解锁 1 int pthread_rwlock_unlock (pthread_rwlock_t *rwlock) ;
读写锁的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <stdio.h> #include <unistd.h> #include <pthread.h> #include <stdlib.h> #include <sys/time.h> #define count 50 int number = 0 ;pthread_rwlock_t rwlock;void * WriteThread (void * arg) { for (int i = 0 ; i < count; i++) { pthread_rwlock_wrlock(&rwlock); int cur = number; cur ++; usleep(5 ); number = cur; printf ("++写操作完毕,number = %d, tid = %ld\n" , number, pthread_self()); pthread_rwlock_unlock(&rwlock); } return NULL ; }void * ReadThread (void * arg) { for (int i = 0 ; i < count; i++) { pthread_rwlock_rdlock(&rwlock); printf ("--读操作完毕,number = %d, tid = %ld\n" , number, pthread_self()); pthread_rwlock_unlock(&rwlock); usleep(5 ); } return NULL ; }int main () { struct timeval start , end ; gettimeofday(&start, NULL ); pthread_rwlock_init(&rwlock, NULL ); pthread_t wtid[3 ]; pthread_t rtid[5 ]; for (int i=0 ; i<3 ; i++){ pthread_create(&wtid[i], NULL , WriteThread, NULL ); } for (int i=0 ; i<5 ; i++){ pthread_create(&rtid[i], NULL , ReadThread, NULL ); } for (int i=0 ; i<3 ; i++){ pthread_join(wtid[i], NULL ); } for (int i=0 ; i<5 ; i++){ pthread_join(rtid[i], NULL ); } pthread_rwlock_destroy(&rwlock); gettimeofday(&end, NULL ); long seconds = end.tv_sec - start.tv_sec; long micros = seconds*1000000 + end.tv_usec - start.tv_usec; printf ("Elapsed time: %ld microseconds\n" , micros); return 0 ; }
w=3, r=20 读写锁运行时间:读写锁 - Elapsed time: 42484 microseconds 互斥锁运行时间:互斥锁 - Elapsed time: 44594 microseconds
w=3, r=100 读写锁 - Elapsed time: 123020 microseconds 互斥锁 - Elapsed time: 125475 microseconds
w=0, r=100 读写锁 - Elapsed time: 108057 microseconds 互斥锁 - Elapsed time: 113061 microseconds
w=0, r=1000 读写锁 - Elapsed time: 1006307 microseconds 互斥锁 - Elapsed time: 939342 microseconds
w=0, r=1000(把延时加到锁里面,对比非常明显) 读写锁 - Elapsed time: 977554 microseconds 互斥锁 - Elapsed time: 4813041 microseconds
无论是读写锁还是互斥锁,它们在操作系统层面都有开销。这个开销包括上下文切换、锁的获取和释放等。如果不将延时放入上锁的代码内,系统层面的开销占据了主导地位,使得读写锁的性能优势不明显。
条件变量
条件变量的主要作用不是处理线程同步,而是进行线程的阻塞。
条件变量一般需要配合互斥锁来使用。
假设一共有1-26个线程,共同访问同一把互斥锁,如果其中一个线程加锁成功,那么其余的25个线程访问互斥锁都阻塞,所有线程都只能顺序访问临界区
条件变量只有满足指定条件下才会阻塞线程,如果条件不满足,多个线程可以同时进入临界区,同时读写资源(也会出现共享资源中的数据混乱)
条件变量类型
里面记录了被条件变量阻塞的线程的线程信息,以便在解除阻塞的时候使用。
条件变量初始化 1 int pthread_cond_init (pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr) ;
参数:
cond:条件变量的地址,传出参数
attr:条件变量的属性,一般使用默认属性,指定为NULL
条件变量销毁并释放资源 1 int pthread_cond_destroy (pthread_cond_t *cond) ;
参数:
条件变量的线程阻塞函数 1 int pthread_cond_wait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex) ;
参数:
cond:条件变量的地址
mutex:互斥锁的地址
函数在阻塞线程的时候,需要一个互斥锁参数,这个互斥锁的功能主要是进行线程同步,让线程顺序进入临界区,避免出现共享资源的数据混乱,该函数会对互斥锁做以下几件事情:
在阻塞线程的时候,如果线程已经对互斥锁mutex上锁,那么该函数会将这把锁打开,从而避免死锁
当线程解除阻塞的时候,函数内部会帮助这个线程再次将这个mutex互斥锁上锁,继续向下访问临界区
条件变量的线程阻塞函数(定时解除) 1 int pthread_cond_timewait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime) ;
将线程阻塞一定的时间,时间到达之后,线程解除阻塞。 其中struct timespec的结构为:
1 2 3 4 5 struct timespec { time_t tv_sec; long tv_nsec; }
timespec参数的赋值方式比较麻烦:
1 2 3 4 time_t mytime = time(NULL ); struct timespec tmsp ; tmsp.tv_sec = time(NULL ) + 100 ; tmsp.tv_nsec = 0 ;
条件变量的唤醒阻塞函数 1 2 3 4 5 int pthread_cond_signal (pthread_cond_t *cond) ;int pthread_cond_broadcast (pthread_cond_t *cond) ;
上述两个唤醒阻塞函数的区别是,第一个是随机唤醒几个被阻塞在条件变量上的线程,第二个是唤醒所有被阻塞在条件变量上的线程。因为条件变量搭配互斥锁使用,很多情况下,即使将阻塞在条件变量上的互斥锁全部唤醒,当其中一个线程抢占到互斥锁之后,其余线程也会立马进行阻塞状态。以及如果,当消费者远远大于生产者的数目,当生产者供不应求的时候,也没必要唤醒全部被条件变量阻塞的线程,因为唤醒之后,又会立马因为供不应求,再次陷入阻塞。
条件变量的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 #include <stdio.h> #include <unistd.h> #include <pthread.h> #include <stdlib.h> #include <sys/time.h> #define N 5 pthread_cond_t cond;pthread_mutex_t mutex;int total = 0 ;struct Node { int number; struct Node * next ; };struct Node * HEAD = NULL ;void * producer (void * arg) { while (1 ){ pthread_mutex_lock(&mutex); struct Node *newNode = (struct Node*)malloc (sizeof (struct Node)); newNode -> next = HEAD; newNode ->number = ++total; HEAD = newNode; printf ("+++producer, number = %d, tid = %ld\n" , newNode->number, pthread_self()); pthread_mutex_unlock(&mutex); pthread_cond_broadcast(&cond); sleep(1 ); } return NULL ; }void * consumer (void * arg) { while (1 ) { pthread_mutex_lock(&mutex); while (HEAD == NULL ) { pthread_cond_wait(&cond, &mutex); } struct Node *newNode = HEAD; printf ("--consumer: number: %d, tid = %ld\n" , newNode->number, pthread_self()); HEAD = newNode -> next; free (newNode); pthread_mutex_unlock(&mutex); sleep(1 ); } return NULL ; }int main () { pthread_cond_init(&cond, NULL ); pthread_mutex_init(&mutex, NULL ); pthread_t ptid[N]; for (int i = 0 ; i < N; i++) { pthread_create(&ptid[i], NULL , producer, NULL ); } pthread_t ctid[N]; for (int i = 0 ; i < N; i++) { pthread_create(&ctid[i], NULL , consumer, NULL ); } for (int i = 0 ; i < N; i++) { pthread_join(ptid[i], NULL ); pthread_join(ctid[i], NULL ); } pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); return 0 ; }
信号量 对于信号量,我的理解是,他代表一种资源,如果信号量不为0,即代表有剩余资源,线程可以继续运行;如果信号量为0,即代表资源为空,线程阻塞。 信号量的主要作用也是阻塞线程,并不能保证线程安全,如果要保证线程安全,需要信号量和互斥锁一起使用。
信号量的头文件为:<semaphore.h>
信号量的类型为:sem_t sem;
信号量初始化 1 int sem_init (sem_t *sem, int pshared, unsigned int value) ;
参数:
sem:信号量变量地址
pshared:
value:初始化当前信号量拥有的资源数,如果资源数为0,线程就会被阻塞
信号量的资源释放 1 int sem_destroy (sem_t *sem) ;
参数:
信号量资源消耗(-1)函数 1 int sem_wait (sem_t *sem) ;
当sem中的资源数大于0,线程不会阻塞,线程会占用sem中的一个资源,并且资源数减1
当sem中的资源数减为0时,资源被耗尽,线程阻塞
信号量资源增加(+1)函数 1 int sem_post (sem_t *sem) ;
调用该函数,会将sem中的资源数加1
如果有线程在调用sem_wait,sem_trywait,sem_timedwait时因为sem中的资源数为0被阻塞了,这时这些线程会解除阻塞,获取到资源之后继续向下运行。
其他信号量资源消耗函数 1 2 3 4 5 int sem_trywait (sem_t *sem) ;int sem_timewait (sem_t *sem, const struct timespec *abs_timeout) ;
第一个函数:
当sem中的资源数大于0,线程不会阻塞,线程会占用sem中的一个资源,并且资源数减1
当sem中的资源数减为0时,资源被耗尽,但是线程不会阻塞,而是直接返回错误号,可以加判断语句,用于处理获取资源失败的情况
第二个函数:
当sem中的资源数大于0,线程不会阻塞,线程会占用sem中的一个资源,并且资源数减1
当sem中的资源数减为0时,资源被耗尽,线程开始阻塞,当阻塞指定时长之后,线程解除阻塞
获取信号量资源数函数 1 int sem_getvalue (sem_t *sem, int *sval) ;
参数:
sem:信号量变量的地址
sval:传出参数,传出信号量剩余资源
信号量使用方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <semaphore.h> #include <pthread.h> #define N 5 sem_t psem;sem_t csem;pthread_mutex_t mutex;int total = 0 ;struct Node { int number; struct Node * next ; };struct Node * head = NULL ;void * producer (void * arg) { while (1 ) { sem_wait(&psem); pthread_mutex_lock(&mutex); struct Node * pnew = (struct Node*)malloc (sizeof (struct Node)); pnew->number = ++total; pnew->next = head; head = pnew; printf ("+++producer, number = %d, tid = %ld\n" , pnew->number, pthread_self()); pthread_mutex_unlock(&mutex); sem_post(&csem); sleep(1 ); } return NULL ; }void * consumer (void * arg) { while (1 ) { sem_wait(&csem); pthread_mutex_lock(&mutex); struct Node * pnode = head; printf ("--consumer: number: %d, tid = %ld\n" , pnode->number, pthread_self()); head = pnode->next; free (pnode); pthread_mutex_unlock(&mutex); sem_post(&psem); sleep(1 ); } return NULL ; }int main () { sem_init(&psem, 0 , 5 ); sem_init(&csem, 0 , 0 ); pthread_mutex_init(&mutex, NULL ); pthread_t ptid[N]; pthread_t ctid[N]; for (int i=0 ; i<N; ++i) { pthread_create(&ptid[i], NULL , producer, NULL ); } for (int i=0 ; i<N; ++i) { pthread_create(&ctid[i], NULL , consumer, NULL ); } for (int i=0 ; i<N; ++i) { pthread_join(ptid[i], NULL ); pthread_join(ctid[i], NULL ); } sem_destroy(&psem); sem_destroy(&csem); pthread_mutex_destroy(&mutex); return 0 ; }
如果生产者和消费者使用的信号量总资源数为1,那么不会出现生产者线程和消费者线程同时访问共享资源的情况,不管生产者和消费者线程有多少个,它们都是顺序执行的。