线程同步

基础概念

避免多个线程同时访问并修改同一块内存,造成数据冲突。线程同步的结果是,如果多个线程都要访问并修改同一块内存时,依次只让一个进程进行访问,其余进程阻塞,直到所有进程执行完毕。
阻塞:意味着失去CPU时间片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

#define MAX 50
int number = 0;

void* ThreadWorkingA(void* arg){
for(int i=0; i < MAX; i++){
int cur = number;
cur++;
usleep(10);
number = cur;
printf("thread A id = %ld, nummber = %d\n", pthread_self(), number);
}
return NULL;
}

void* ThreadWorkingB(void* arg){
for(int i=0; i < MAX; i++){
int cur = number;
cur++;
usleep(10);
number = cur;
printf("thread B id = %ld, nummber = %d\n", pthread_self(), number);
}
return NULL;
}

int main(){
pthread_t p1, p2;
pthread_create(&p1, NULL, ThreadWorkingA, NULL);
pthread_create(&p2, NULL, ThreadWorkingB, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
return 0;
}

程序运行时,每次都不能运行到number=100,这是因为线程A修改了cur数据之后,CPU被挂起,导致没有及时修改number数据,不能及时更新到物理内存中,线程B此时从物理内存中读取到的数据不是最新状态,只能从旧数据开始累加。

线程同步方式

多个线程访问共享资源的时候,很容易出现问题,因此需要进行线程同步,常用的线程同步方式有四种:

  1. 互斥锁
  2. 读写锁
  3. 条件变量
  4. 信号量

共享资源/临界资源:

  • 全局数据区
  • 堆区变量

确定临界资源之后,与临界资源相关的上下文代码块被称为临界区,确定好临界区之后,就可以进行线程同步了。

  1. 在临界区代码块起始位置,添加加锁函数,对临界区上锁
  2. 在临界区代码块的结束位置,添加解锁函数,对临界区解锁
  3. 通过锁机制可以保证临界区代码最多只能同时有一个线程访问,即并行访问变成了串行访问

互斥锁

互斥锁类型

1
pthread_mutex_t mutex;

创建的锁对象中保存了这把锁的状态信息:锁定还是打开,如果是锁定状态还记录了给这把锁加锁的线程信息(线程ID)。一个互斥锁变量,只能被一个线程锁定,被锁定之后,其他线程再对互斥锁变量加锁,就会被阻塞,直到这把锁被解锁。一般情况下,一个共享资源对应一把互斥锁,锁的个数和线程个数无关。

互斥锁初始化

1
int pthread_mutex_init(pthread_mutex_t *resrict mutex, const pthread_mutexattr_t *restrict attr);
  1. restrict
    • 关键字,用来修饰指针,只有这个关键字修饰的指针可以访问指向的内存空间,其他指针不行
  2. 参数:
    • mutex:互斥锁变量的地址
    • attr:互斥锁的属性,一般使用默认属性,指定为NULL
  3. 返回值
    • 函数调用成功返回0,调用失败返回对应的错误号

释放互斥锁资源

1
int pthread_mutex_destroy(pthread_mutex_t *mutex);

参数:

  • mutex:互斥锁的地址

返回值:

  • 如果函数调用成功返回0,否则返回错误号

加锁

1
int pthread_mutex_lock(pthread_mutex_t *mutex);

这个函数的调用,首先会判断参数mutex互斥锁中的状态是不是锁定状态:

  • 如果这个锁没有被锁定,是打开状态,则这个线程可以加锁成功,并在这个锁中记录是哪个线程加锁成功了
  • 如果这个锁处于被锁定状态,还没有被开锁,那其他线程用这个锁的时候,就会加锁失败,并阻塞在这把锁上
  • 当这把锁被开锁之后,那些阻塞在这把锁上的线程就解除阻塞,并通过竞争的方式抢占这把锁,没抢到的线程继续阻塞

尝试加锁

1
int pthread_mutex_trylock(pthread_mutex_t *mutex);

调用这个函数对互斥锁变量加锁有两种情况:

  1. 如果这把锁没有被锁定,是打开的,则线程加锁成功
  2. 如果锁变量被锁住了,加锁失败,直接返回错误码,并且调用这个函数加锁的线程不会被阻塞

解锁

1
int pthread_mutex_unlock(pthread_mutex_t *mutex);

不是所有线程都可以对互斥锁解锁,而是哪个线程加的锁,哪个线程才能解锁成功。

互斥锁使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>

#define MAX 50
int number = 0;
pthread_mutex_t mutex;

void* ThreadWorkingA(void* arg){
for(int i=0; i < MAX; i++){
pthread_mutex_lock(&mutex);
int cur = number;
cur++;
number = cur;
printf("thread A id = %ld, nummber = %d\n", pthread_self(), number);
pthread_mutex_unlock(&mutex);
usleep(5);
}
return NULL;
}

void* ThreadWorkingB(void* arg){
for(int i=0; i < MAX; i++){
pthread_mutex_lock(&mutex);
int cur = number;
cur++;
usleep(10);
number = cur;
printf("thread B id = %ld, nummber = %d\n", pthread_self(), number);
pthread_mutex_unlock(&mutex);
}
return NULL;
}

int main(){
pthread_t p1, p2;
struct timeval start, end;
gettimeofday(&start, NULL);

pthread_mutex_init(&mutex, NULL);
pthread_create(&p1, NULL, ThreadWorkingA, NULL);
pthread_create(&p2, NULL, ThreadWorkingB, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
pthread_mutex_destroy(&mutex);

gettimeofday(&end, NULL);
long seconds = end.tv_sec - start.tv_sec;
long micros = seconds * 1000000 + end.tv_usec - start.tv_usec;
printf("Elapsed time: %ld microseconds\n", micros);
return 0;
}

加锁前:Elapsed time: 5095 microseconds
加锁后:Elapsed time: 10915 microseconds

死锁

死锁:所有线程都被阻塞,并且线程的阻塞无法解开(可以解锁的线程也被阻塞了)。
造成死锁的场景如下:

  1. 加锁之后忘记解锁;
  2. 重复加锁;
  3. 程序中有多个共享资源,互相加锁,导致互相被阻塞

解决方法:

  1. 对共享资源访问完之后,一定要解锁,或者在加锁的时候使用trylock()
  2. 如果程序中有多把锁,可以控制对锁的访问顺序,或者在对其他互斥锁进行加锁操作之前,先释放当前线程拥有的互斥锁
  3. 引入专门用于死锁检测的模块

读写锁

读写锁是互斥锁的升级版,在做读操作的时候可以提高程序的执行效率,如果所有的线程都是读操作,那么读是并行的。

  1. 读锁是共享的
  2. 写锁是独占的
  3. 使用读写锁分别对两个临界区加了读锁和写锁,两个线程要同时访问这两个临界区,访问写锁临界区的线程继续运行,访问读锁的临界区的线程阻塞。因为写锁比读锁的优先级高。

读写锁类型

1
pthread_rwlock_t rwlock;

读写锁中保存了这把锁的状态信息:

  • 锁的状态:锁定/打开
  • 锁定的是什么操作:读操作/写操作,使用读写锁锁定了写操作之后,需要先解锁才能执行写操作,反之亦然。不能使用一把锁同时进行读锁定和写锁定。
  • 哪个线程将这把锁锁上的

读写锁初始化

1
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

参数:

  • rwlock:读写锁的地址,传出参数
  • attr:读写锁属性,一般使用默认属性,指定为NULL

释放读写锁占用的系统资源

1
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

参数:

  • rwlock:读写锁的地址

返回值:

  • 如果函数调用成功返回0,否则返回错误号

读操作加锁

1
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

调用该函数的时候,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用该函数依然可以加锁成功,因为读锁是共享的;但是如果读写锁已经锁定了写操作,那么调用这个函数的线程就会被阻塞。

写操作加锁

1
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作或者写操作,调用这个函数的线程就会被阻塞。

尝试读操作加锁

1
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

这个函数可以有效的避免死锁,如果加读锁失败,不会阻塞当前线程,直接返回错误号。
调用该函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功;如果读写锁已经锁定了写操作,调用这个函数加锁失败,对应的线程不会被阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。

尝试写操作加锁

1
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作或者写操作,调用这个函数加锁失败,但是线程不会阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。

读/写锁解锁

1
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

读写锁的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/time.h>

#define count 50
int number = 0;
pthread_rwlock_t rwlock;
// pthread_mutex_t mutex;

void* WriteThread(void* arg){
for (int i = 0; i < count; i++)
{
pthread_rwlock_wrlock(&rwlock);
// pthread_mutex_lock(&mutex);
int cur = number;
cur ++;
usleep(5);
number = cur;
printf("++写操作完毕,number = %d, tid = %ld\n", number, pthread_self());
// pthread_mutex_unlock(&mutex);
pthread_rwlock_unlock(&rwlock);
}
return NULL;
}

void* ReadThread(void* arg){
for (int i = 0; i < count; i++)
{
pthread_rwlock_rdlock(&rwlock);
// pthread_mutex_lock(&mutex);
printf("--读操作完毕,number = %d, tid = %ld\n", number, pthread_self());
// pthread_mutex_unlock(&mutex);
pthread_rwlock_unlock(&rwlock);
usleep(5);
}
return NULL;
}

int main(){
struct timeval start, end;
gettimeofday(&start, NULL);
pthread_rwlock_init(&rwlock, NULL);
// pthread_mutex_init(&mutex, NULL);
pthread_t wtid[3];
pthread_t rtid[5];
for(int i=0; i<3; i++){
pthread_create(&wtid[i], NULL, WriteThread, NULL);
}

for(int i=0; i<5; i++){
pthread_create(&rtid[i], NULL, ReadThread, NULL);
}

for(int i=0; i<3; i++){
pthread_join(wtid[i], NULL);
}

for(int i=0; i<5; i++){
pthread_join(rtid[i], NULL);
}

// pthread_mutex_destroy(&mutex);
pthread_rwlock_destroy(&rwlock);

gettimeofday(&end, NULL);
long seconds = end.tv_sec - start.tv_sec;
long micros = seconds*1000000 + end.tv_usec - start.tv_usec;
printf("Elapsed time: %ld microseconds\n", micros);
return 0;
}

w=3, r=20
读写锁运行时间:读写锁 - Elapsed time: 42484 microseconds
互斥锁运行时间:互斥锁 - Elapsed time: 44594 microseconds

w=3, r=100
读写锁 - Elapsed time: 123020 microseconds
互斥锁 - Elapsed time: 125475 microseconds

w=0, r=100
读写锁 - Elapsed time: 108057 microseconds
互斥锁 - Elapsed time: 113061 microseconds

w=0, r=1000
读写锁 - Elapsed time: 1006307 microseconds
互斥锁 - Elapsed time: 939342 microseconds

w=0, r=1000(把延时加到锁里面,对比非常明显)
读写锁 - Elapsed time: 977554 microseconds
互斥锁 - Elapsed time: 4813041 microseconds

无论是读写锁还是互斥锁,它们在操作系统层面都有开销。这个开销包括上下文切换、锁的获取和释放等。如果不将延时放入上锁的代码内,系统层面的开销占据了主导地位,使得读写锁的性能优势不明显。

条件变量

  1. 条件变量的主要作用不是处理线程同步,而是进行线程的阻塞。
  2. 条件变量一般需要配合互斥锁来使用。
  3. 假设一共有1-26个线程,共同访问同一把互斥锁,如果其中一个线程加锁成功,那么其余的25个线程访问互斥锁都阻塞,所有线程都只能顺序访问临界区
  4. 条件变量只有满足指定条件下才会阻塞线程,如果条件不满足,多个线程可以同时进入临界区,同时读写资源(也会出现共享资源中的数据混乱)

条件变量类型

1
pthread_cond_t cond;

里面记录了被条件变量阻塞的线程的线程信息,以便在解除阻塞的时候使用。

条件变量初始化

1
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

参数:

  • cond:条件变量的地址,传出参数
  • attr:条件变量的属性,一般使用默认属性,指定为NULL

条件变量销毁并释放资源

1
int pthread_cond_destroy(pthread_cond_t *cond);

参数:

  • cond:条件变量的地址

条件变量的线程阻塞函数

1
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

参数:

  • cond:条件变量的地址
  • mutex:互斥锁的地址

函数在阻塞线程的时候,需要一个互斥锁参数,这个互斥锁的功能主要是进行线程同步,让线程顺序进入临界区,避免出现共享资源的数据混乱,该函数会对互斥锁做以下几件事情:

  1. 在阻塞线程的时候,如果线程已经对互斥锁mutex上锁,那么该函数会将这把锁打开,从而避免死锁
  2. 当线程解除阻塞的时候,函数内部会帮助这个线程再次将这个mutex互斥锁上锁,继续向下访问临界区

条件变量的线程阻塞函数(定时解除)

1
int pthread_cond_timewait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

将线程阻塞一定的时间,时间到达之后,线程解除阻塞。
其中struct timespec的结构为:

1
2
3
4
5
// 表示的时间为从1971.1.1到某个时间点的时间
struct timespec {
time_t tv_sec; // 秒
long tv_nsec; // 纳秒
}

timespec参数的赋值方式比较麻烦:

1
2
3
4
time_t mytime = time(NULL); // 1971.1.1 0:0:0到当前的总秒数
struct timespec tmsp;
tmsp.tv_sec = time(NULL) + 100; // 线程阻塞100s
tmsp.tv_nsec = 0;

条件变量的唤醒阻塞函数

1
2
3
4
5
// 唤醒阻塞在条件变量上的线程(至少一个,总数不定,相当于随机唤醒几个被阻塞在条件变量上的线程)
int pthread_cond_signal(pthread_cond_t *cond);

// 唤醒被阻塞在条件变量上的线程,被阻塞的线程全部解除阻塞
int pthread_cond_broadcast(pthread_cond_t *cond);

上述两个唤醒阻塞函数的区别是,第一个是随机唤醒几个被阻塞在条件变量上的线程,第二个是唤醒所有被阻塞在条件变量上的线程。因为条件变量搭配互斥锁使用,很多情况下,即使将阻塞在条件变量上的互斥锁全部唤醒,当其中一个线程抢占到互斥锁之后,其余线程也会立马进行阻塞状态。以及如果,当消费者远远大于生产者的数目,当生产者供不应求的时候,也没必要唤醒全部被条件变量阻塞的线程,因为唤醒之后,又会立马因为供不应求,再次陷入阻塞。

条件变量的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/time.h>

#define N 5
pthread_cond_t cond;
pthread_mutex_t mutex;
int total = 0;

// 链表(仓库)
struct Node
{
int number;
struct Node* next;
};

// 指向头节点的指针
struct Node* HEAD = NULL;

// 生产者
void* producer(void* arg){
while(1){ // 一直生产
pthread_mutex_lock(&mutex);
// 创建新节点(生产)
struct Node *newNode = (struct Node*)malloc(sizeof(struct Node));
newNode -> next = HEAD;
newNode ->number = ++total;
HEAD = newNode;
printf("+++producer, number = %d, tid = %ld\n", newNode->number, pthread_self());
pthread_mutex_unlock(&mutex);
pthread_cond_broadcast(&cond);
sleep(1);
}
return NULL;
}

// 消费者
void* consumer(void* arg){
while (1)
{
pthread_mutex_lock(&mutex);

// 这里能不能改成if判断???
// 不能。有一个消费者线程通过 pthread_cond_wait()加锁成功, 其余没有加锁成功的线程继续阻塞
// 加锁成功的线程向下运行,并成功删除一个节点,然后解锁
// 没有加锁成功的线程解除阻塞继续抢这把锁,另外一个子线程加锁成功
// 但是这个线程删除链表节点的时候链表已经为空了, 后边访问这个空节点的时候就会出现段错误。
while (HEAD == NULL)
{
pthread_cond_wait(&cond, &mutex);
}
struct Node *newNode = HEAD;
printf("--consumer: number: %d, tid = %ld\n", newNode->number, pthread_self());
HEAD = newNode -> next;
free(newNode);
pthread_mutex_unlock(&mutex);
sleep(1);
}
return NULL;
}

int main(){
pthread_cond_init(&cond, NULL);
pthread_mutex_init(&mutex, NULL);
// 五个生产者
pthread_t ptid[N];
for (int i = 0; i < N; i++)
{
pthread_create(&ptid[i], NULL, producer, NULL);
}
// 五个消费者
pthread_t ctid[N];
for (int i = 0; i < N; i++)
{
pthread_create(&ctid[i], NULL, consumer, NULL);
}
// 释放线程资源
for (int i = 0; i < N; i++)
{
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
// 释放条件变量和锁的资源
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
return 0;
}

信号量

对于信号量,我的理解是,他代表一种资源,如果信号量不为0,即代表有剩余资源,线程可以继续运行;如果信号量为0,即代表资源为空,线程阻塞。
信号量的主要作用也是阻塞线程,并不能保证线程安全,如果要保证线程安全,需要信号量和互斥锁一起使用。

  1. 信号量的头文件为:<semaphore.h>
  2. 信号量的类型为:sem_t sem;

信号量初始化

1
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  • sem:信号量变量地址
  • pshared:
    • 0:线程同步
    • 非0:进程同步
  • value:初始化当前信号量拥有的资源数,如果资源数为0,线程就会被阻塞

信号量的资源释放

1
int sem_destroy(sem_t *sem);

参数:

  • 信号量变量的地址

信号量资源消耗(-1)函数

1
int sem_wait(sem_t *sem);
  • 当sem中的资源数大于0,线程不会阻塞,线程会占用sem中的一个资源,并且资源数减1
  • 当sem中的资源数减为0时,资源被耗尽,线程阻塞

信号量资源增加(+1)函数

1
int sem_post(sem_t *sem);
  • 调用该函数,会将sem中的资源数加1
  • 如果有线程在调用sem_wait,sem_trywait,sem_timedwait时因为sem中的资源数为0被阻塞了,这时这些线程会解除阻塞,获取到资源之后继续向下运行。

其他信号量资源消耗函数

1
2
3
4
5
// 尝试获取资源
int sem_trywait(sem_t *sem);

// 定时阻塞获取资源
int sem_timewait(sem_t *sem, const struct timespec *abs_timeout);

第一个函数:

  • 当sem中的资源数大于0,线程不会阻塞,线程会占用sem中的一个资源,并且资源数减1
  • 当sem中的资源数减为0时,资源被耗尽,但是线程不会阻塞,而是直接返回错误号,可以加判断语句,用于处理获取资源失败的情况

第二个函数:

  • 当sem中的资源数大于0,线程不会阻塞,线程会占用sem中的一个资源,并且资源数减1
  • 当sem中的资源数减为0时,资源被耗尽,线程开始阻塞,当阻塞指定时长之后,线程解除阻塞

获取信号量资源数函数

1
int sem_getvalue(sem_t *sem, int *sval);

参数:

  • sem:信号量变量的地址
  • sval:传出参数,传出信号量剩余资源

信号量使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>

#define N 5
sem_t psem;
sem_t csem;
pthread_mutex_t mutex;
int total = 0;

struct Node
{
int number;
struct Node* next;
};
struct Node * head = NULL;

void* producer(void* arg)
{
while(1)
{
sem_wait(&psem);
pthread_mutex_lock(&mutex);
struct Node* pnew = (struct Node*)malloc(sizeof(struct Node));
pnew->number = ++total;
pnew->next = head;
head = pnew;
printf("+++producer, number = %d, tid = %ld\n", pnew->number, pthread_self());
pthread_mutex_unlock(&mutex);
sem_post(&csem);
sleep(1);
}
return NULL;
}

void* consumer(void* arg)
{
while(1)
{
sem_wait(&csem);
pthread_mutex_lock(&mutex);
struct Node* pnode = head;
printf("--consumer: number: %d, tid = %ld\n", pnode->number, pthread_self());
head = pnode->next;
free(pnode);
pthread_mutex_unlock(&mutex);
sem_post(&psem);
sleep(1);
}
return NULL;
}

int main()
{
sem_init(&psem, 0, 5);
sem_init(&csem, 0, 0);
pthread_mutex_init(&mutex, NULL);
pthread_t ptid[N];
pthread_t ctid[N];

for(int i=0; i<N; ++i)
{
pthread_create(&ptid[i], NULL, producer, NULL);
}

for(int i=0; i<N; ++i)
{
pthread_create(&ctid[i], NULL, consumer, NULL);
}


for(int i=0; i<N; ++i)
{
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}

sem_destroy(&psem);
sem_destroy(&csem);
pthread_mutex_destroy(&mutex);

return 0;
}

如果生产者和消费者使用的信号量总资源数为1,那么不会出现生产者线程和消费者线程同时访问共享资源的情况,不管生产者和消费者线程有多少个,它们都是顺序执行的。


线程同步
http://example.com/2025/02/22/OperatingSystem/线程同步/
作者
ZhangHangming
发布于
2025年2月22日
许可协议