cha30.线程:线程同步

读书笔记

条件变量

一般搭配一个条件和一个互斥量使用

经典的使用方式如下

  • 对于需要在某某条件下运行的线程,先对mutex加锁,以读取condition
    • 若满足,则在预期状态下执行后续操作,结束后对mutex解锁
    • 若不满足,则pthread_cond_wait
      • 此时该函数会先解锁mutex(允许其他进程获取mutex以修改状态)
      • 陷入阻塞,直到其他线程调用pthread_cond_signalpthread_cond_broadcast唤醒
      • 唤醒后,获取mutex锁,以检查条件是否满足,满足则执行后续,不满足继续调用wait阻塞(故此处需要用while
  • 对于可以改变某某条件的线程
    • 获取mutex
    • 改变condition
    • signal/broadcast
    • unlock(unlock与上一步顺序可调换)
1
2
3
4
5
6
7
8
9
10
11
12
// one thread
pthread_mutex_lock(&mutex);
while(!condition)
pthread_cond_wait(&cond, &mutex);
/* condition matched, execute task */
pthread_mutex_unlock(&mutex);

// other thread
pthread_mutex_lock(&mutex);
condition = xxxx; // change condition
pthread_cond_signal(&cond); // or pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex); // 以上两行顺序任意,
  • broadcastsignal的选择
    • 如果所有线程都需要在同一条件下互斥的执行,那么选择signalbroadcast会唤醒所有线程且大家条件都相同,所有只会有一个线程获得mutex
    • 如果条件不同,如一个线程为condition == STAT1,另一个为condition == STAT2,那么适合broadcast
    • 条件变量的作用是通知某某条件的改变

30.1

30-1.修改程序清单30-1 ( thread_incr.c)中的程序,以便线程起始函数在每次循环中都
能输出 glob 的当前值以及能对线程做唯一标识的标识符。可将线程的这一唯一标识指定为创建线程的函数pthread_create()的调用参数。对于这一程序,需要将线程起始函数的参数改为指针,指向包含线程唯一标识和循环次数限制的数据结构。运行该程序,将输出重定向至一文件,查看内核在调度两线程交替执行时glob 的变化情况。

ok,无聊捏

30.2

实现一组线程安全的函数,以更新和搜索一个不平衡二叉树。此函数库应该包含如
下形式的函数(目的明显):

1
2
3
4
initialize(tree);
add(tree,char *key,void *value);
delete(tree,char *key)
Boolean lookup(char *key,void**value)

上述函数原型中,tree是一个指向根节点的结构(为此需要足义一个合P的绐构)。例的每个节点保存有一个键-值对。还需为树中每个节点定义一数据结构,其中应包含互斥量,以
确保同时仅有一个线程可以访问该节点。initialize()、add()和 lookup()函数的实现相对简单。delete()的实现需要较为深入的考虑。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
//
// Created by root on 6/28/23.
//

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <stdarg.h>
#include <string.h>
#include <errno.h>

#define CHECK(x, exitline, ...) do { if(!(x)) { \
fprintf(stderr, "CHECK: "); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
exitline; \
} \
} while(0)

#define CHECKERR(x, exitline) CHECK(x, exitline, "Error: %s\n", strerror(errno))
typedef int K;
typedef int V;
typedef struct ConcurrentTreeStruct{
pthread_mutex_t mutex;
K *key;
V *value;
struct ConcurrentTreeStruct *left;
struct ConcurrentTreeStruct *right;
} *ConcurrentTreeNode;



typedef struct ConcurrentTree{
ConcurrentTreeNode dummy;
int (*compare)(K *key1, K *key2);
void (*destroyKey)(K *key);
void (*destroyValue)(V *value);
} *ConcurrentTree;



ConcurrentTreeNode initCTreeNode(K *key, V *value, ConcurrentTreeNode left, ConcurrentTreeNode right) {
ConcurrentTreeNode tree = (ConcurrentTreeNode)malloc(sizeof(struct ConcurrentTreeStruct));
CHECKERR(tree, return NULL);
tree->left = left;
tree->right = right;
tree->key = key;
tree->value = value;
CHECKERR(pthread_mutex_init(&tree->mutex, NULL) == 0, do {free(tree); return NULL; } while(0));
return tree;
}
ConcurrentTree initCTree(
int (*compare)(K *key1, K *key2),
void (*destroyKey)(K *key),
void (*destroyValue)(V *value)
) {
ConcurrentTree tree = (ConcurrentTree)malloc(sizeof(struct ConcurrentTree));
CHECKERR(tree, return NULL);
tree->dummy = initCTreeNode(NULL, NULL, NULL, NULL);
CHECK(tree->dummy, do { free(tree); return NULL; } while(0), "");
tree->compare = compare;
tree->destroyKey = destroyKey;
tree->destroyValue = destroyValue;
return tree;
}
int add(ConcurrentTree tree, K *key, V *value) {
CHECK(tree, return -1, "");
CHECKERR(pthread_mutex_lock(&tree->dummy->mutex) == 0, return -1);
pthread_mutex_t *old = &tree->dummy->mutex;
ConcurrentTreeNode *treeNode = &tree->dummy->left;
while(*treeNode){
pthread_mutex_t *lock = &(*treeNode)->mutex;
CHECKERR(pthread_mutex_lock(&(*treeNode)->mutex) == 0, exit(5));
int cmp = tree->compare(key, (*treeNode)->key);
if (cmp < 0) { // key小于根
treeNode = &(*treeNode)->left;
} else {
treeNode = &(*treeNode)->right;
}
CHECKERR(pthread_mutex_unlock(old) == 0, exit(5));
old = lock;
}
*treeNode = initCTreeNode(key, value, NULL, NULL);
CHECKERR(pthread_mutex_unlock(old) == 0, exit(5));
return 0;
}

void delCTreeNode(ConcurrentTreeNode tree, void (*destroyKey)(K *key), void (*destroyValue)(V *value)) {
if(!tree) return;
destroyKey(tree->key);
destroyValue(tree->value);
pthread_mutex_destroy(&tree->mutex);
tree->key = NULL;
tree->value = NULL;
}

int del(ConcurrentTree tree, K *key) {
CHECK(tree, return -1, "");
if(!tree->dummy->left) return -1;
CHECKERR(pthread_mutex_lock(&tree->dummy->mutex) == 0, return -1);
CHECKERR(pthread_mutex_lock(&tree->dummy->left->mutex) == 0, do { pthread_mutex_unlock(&tree->dummy->mutex); return -1;} while(0));
pthread_mutex_t *parent_lock = &tree->dummy->mutex;
pthread_mutex_t *child_lock =&tree->dummy->left->mutex;
ConcurrentTreeNode *parentNode = &tree->dummy->left;
ConcurrentTreeNode treeNode = tree->dummy->left;

while(treeNode) {
// printf("cmp %d, %d\n", *key, *treeNode->key);
int cmp = tree->compare(key, treeNode->key);
if(cmp == 0) {
if(treeNode->left) CHECKERR(pthread_mutex_lock(&treeNode->left->mutex) == 0, exit(5));
if(treeNode->right) CHECKERR(pthread_mutex_lock(&treeNode->right->mutex) == 0, exit(5));
ConcurrentTreeNode left = treeNode->left;
ConcurrentTreeNode right = treeNode->right;
if(!left && !right) {
*parentNode = NULL;
} else if(!left) {
*parentNode = right;
} else if(!right) {
*parentNode = left;
} else {
*parentNode = left;
}
CHECKERR(pthread_mutex_unlock(parent_lock) == 0, exit(5));
if(left && right) {
while (left->right) {
ConcurrentTreeNode old = left;
left = left->right;
if(left) CHECKERR(pthread_mutex_lock(&left->mutex) == 0, exit(5));
CHECKERR(pthread_mutex_unlock(&old->mutex) == 0, exit(5));
}
left->right = right;
}
if(left)CHECKERR(pthread_mutex_unlock(&left->mutex) == 0, exit(5));
if(right)CHECKERR(pthread_mutex_unlock(&right->mutex) == 0, exit(5));

CHECKERR(pthread_mutex_unlock(child_lock) == 0, exit(5));
delCTreeNode(treeNode, tree->destroyKey, tree->destroyValue);
free(treeNode);
return 0;
} else if(cmp < 0) {
CHECKERR(pthread_mutex_unlock(parent_lock) == 0, exit(5));
if(treeNode->left) CHECKERR(pthread_mutex_lock(&treeNode->left->mutex) == 0, exit(5));
parent_lock = &treeNode->mutex;
parentNode = &treeNode->left;
treeNode = treeNode->left;
child_lock = treeNode ? &treeNode->mutex : NULL;
} else {
CHECKERR(pthread_mutex_unlock(parent_lock) == 0, exit(5));
if(treeNode->right) CHECKERR(pthread_mutex_lock(&treeNode->right->mutex) == 0, exit(5));
parent_lock = &treeNode->mutex;
parentNode = &treeNode->right;
treeNode = treeNode->right;
child_lock = treeNode ? &treeNode->mutex : NULL;
}
}
CHECKERR(pthread_mutex_unlock(parent_lock) == 0, exit(5));
if(child_lock) CHECKERR(pthread_mutex_unlock(child_lock) == 0, exit(5));
return -1;
}

int lookup(ConcurrentTree tree, K *key, V **value) {
CHECK(tree, return -1, "tree cannot be null");
CHECK(key, return -1, "key cannot be null");
CHECK(value, return -1, "value cannot be null");
CHECKERR(pthread_mutex_lock(&tree->dummy->mutex) == 0, return -1);
if(tree->dummy->left)CHECKERR(pthread_mutex_lock(&tree->dummy->left->mutex) == 0, do { pthread_mutex_unlock(&tree->dummy->mutex); return -1;} while(0));
ConcurrentTreeNode treeNode = tree->dummy->left;
CHECKERR(pthread_mutex_unlock(&tree->dummy->mutex) == 0, exit(5));
while(treeNode) {
int cmp = tree->compare(key, treeNode->key);
if(cmp == 0) {
*value = treeNode->value;
CHECKERR(pthread_mutex_unlock(&treeNode->mutex) == 0, exit(5));
return 0;
} else if(cmp < 0) {
ConcurrentTreeNode old = treeNode;
treeNode = treeNode->left;
if(treeNode) CHECKERR(pthread_mutex_lock(&treeNode->mutex) == 0, do { pthread_mutex_unlock(&old->mutex); return -1;} while(0)); //先获取新锁,再放弃旧锁
CHECKERR(pthread_mutex_unlock(&old->mutex) == 0, exit(5));
} else {
ConcurrentTreeNode old = treeNode;
treeNode = treeNode->right;
if(treeNode) CHECKERR(pthread_mutex_lock(&treeNode->mutex) == 0, do { pthread_mutex_unlock(&old->mutex); return -1;} while(0));
CHECKERR(pthread_mutex_unlock(&old->mutex) == 0, exit(5));
}
}
return -1;
}

void destroyCTreeNode(ConcurrentTreeNode tree, void (*destroyKey)(K *key), void (*destroyValue)(V *value)) {
if(!tree) return;
destroyCTreeNode(tree->left, destroyKey, destroyValue);
destroyCTreeNode(tree->right, destroyKey, destroyValue);
delCTreeNode(tree, destroyKey, destroyValue);
free(tree);
}

void destroyCTree(ConcurrentTree tree) {
CHECK(tree, return, "tree cannot be null\n");
destroyCTreeNode(tree->dummy->left, tree->destroyKey, tree->destroyValue);
free(tree->dummy);
tree->dummy = NULL;
}
struct ThreadRet {
int read;
int add;
int del;
};
struct ThreadArg {
ConcurrentTree tree;
int threadID;
};
int cmpInt(int *a, int *b) {
if(*a == *b) {
return 0;
} else if(*a > *b) {
return 1;
} else {
return -1;
}
}
void destroyInt(int *a) {
if(a) {
free(a);
}
}
int maxKey = 0;
pthread_mutex_t mutex_mk;
void *threadfn(void *args) {
ConcurrentTree tree = ((struct ThreadArg *)args)->tree;
int tid = ((struct ThreadArg *)args)->threadID;
CHECKERR(tree, return NULL);
struct ThreadRet *ret = malloc(sizeof(struct ThreadRet));
CHECKERR(ret, return NULL);
ret->read = ret->add = ret->del = 0;
int opNum = 1000 + rand() % 1001;
int *key = (int *)malloc(sizeof(int));
while (opNum--) {
int *val;
CHECKERR(key, return ret);
switch (rand()%3) {
case 0:
printf("Thread-%d, try add\n", tid);
val = (int *)malloc(sizeof(int));
CHECKERR(val, return ret);
*val = rand()%200000;

CHECKERR(pthread_mutex_lock(&mutex_mk) == 0,return ret);
maxKey++;
*key = maxKey;
CHECKERR(pthread_mutex_unlock(&mutex_mk) == 0,return ret);

add(tree, key, val);
ret->add++;
printf("Thread-%d, add (%d, %d)\n", tid, *key, *val);
key = (int *)malloc(sizeof(int));
break;
case 1:
printf("Thread-%d, try read\n", tid);
CHECKERR(pthread_mutex_lock(&mutex_mk) == 0,return ret);
if(maxKey > 0)*key = rand()%maxKey + 1;
else *key = -1;
CHECKERR(pthread_mutex_unlock(&mutex_mk) == 0,return ret);
// printf("Thread-%d, try read %d\n", tid, *key);
if(lookup(tree, key, &val) == 0) printf("Thread-%d, read (%d, %d)\n", tid, *key, *val);
else printf("Thread-%d, fail to read (%d)\n", tid, *key);
ret->read++;
break;
case 2:
printf("Thread-%d, try del\n", tid);
CHECKERR(pthread_mutex_lock(&mutex_mk) == 0,return ret);
if(maxKey > 0) {
*key = maxKey;
}
else *key = -1;
CHECKERR(pthread_mutex_unlock(&mutex_mk) == 0,return ret);
// printf("Thread-%d, try del %d\n", tid, *key);
if(del(tree, key) == 0) {
printf("Thread-%d, del (%d)\n", tid, *key);
CHECKERR(pthread_mutex_lock(&mutex_mk) == 0,return ret);
maxKey--;
CHECKERR(pthread_mutex_unlock(&mutex_mk) == 0, return ret);
}
else printf("Thread-%d, fail to del (%d)\n", tid, *key);
ret->del++;
break;
}
}
free(key);
free(args);
return ret;
}

int main(int argc, char *argv[]) {
CHECK(argc > 1, return 1, "Usage: %s threadCnt", argv[0]);
int threadCnt = atoi(argv[1]);
pthread_t *thread = malloc(threadCnt * sizeof(pthread_t));
CHECKERR(thread, return 2);
CHECKERR(pthread_mutex_init(&mutex_mk, NULL) == 0,return 3);
ConcurrentTree tree = initCTree(cmpInt, destroyInt, destroyInt);
for(int i = 0; i < threadCnt; i++) {
struct ThreadArg *args = malloc(sizeof(struct ThreadArg));
args->threadID = i+1;
args->tree = tree;
CHECKERR(pthread_create(&thread[i], NULL, threadfn, args) == 0,return 1);
}
for(int i = 0; i < threadCnt; i++) {
struct ThreadRet *ret;
CHECKERR(pthread_join(thread[i], (void **)&ret) == 0,return 1);
if(!ret) {
printf("Summary: Thread-%d occured error, returned NULL\n", i+1);
} else {
printf("Summary: Thread-%d add=%d, read=%d, del=%d\n", i+1, ret->add, ret->read, ret->del);
free(ret);
}
}
CHECKERR(pthread_mutex_destroy(&mutex_mk) == 0,return 3);
destroyCTree(tree);
free(tree);
free(thread);
return 0;
}

用四个线程跑了几次,反正都没有死锁过。还没有仔细看过正确性

valgrind

valgrind跑了很多次,正常情况下全部内存都被free了,没有泄漏

asan

asan跑了几次,每次都有错误。用gdb也看不到调用栈,后来clion很给力,打印出了调用栈

不是实现有问题,是在输出log的时候出错了,原理如下

A线程给树里加入一个节点,然后A打印他的值。但在这之前B又把他删除,他的key,value都被free了,这个时候就不对了。

作者

Meow Meow Liu

发布于

2023-06-29

更新于

2024-04-23

许可协议

评论