cha47.System V 信号量

47.2 47.3 47.4

  • 使用信号量,实现进程间的同步
  • 验证SEM_UNDO是否会改变sempid
  • 实现P V操作,实现testP(在程序清单47-l0给出的代码(binary_sems.c)中增加一个reserveSemNBO函数来使用PC_NOWAIT标记实现有条件的预留操作。)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <stdbool.h>
#include <stddef.h>
#include <syslog.h>
#include <wait.h>


#define DEBUG_LEVEL LOG_DEBUG

#ifndef DEBUG_LEVEL
#define DEBUG_LEVEL LOG_INFO
#endif
FILE *logfile = NULL;
bool syslog_enable = false;

#define alloc_sprintf(__alloc_sprintf_str, __format...) do { \
int __alloc_sprintf_len = snprintf(NULL, 0, __format); \
(__alloc_sprintf_str) = malloc(__alloc_sprintf_len+1); \
if(__alloc_sprintf_str != NULL) \
snprintf(__alloc_sprintf_str, __alloc_sprintf_len+1, __format); \
} while(0)

#define logger(level, msg...) do { \
if(level <= DEBUG_LEVEL) { \
fprintf(logfile, msg); \
fprintf(logfile, "\n"); \
} \
if(syslog_enable) { \
char *data; \
alloc_sprintf(data, msg); \
syslog(level, "%s", data); \
safe_free(data); \
} \
} while(0)

#define COND_RET(x, ret, msg...) \
do { \
if(!(x)) { \
if(errno == 0)logger(LOG_ERR, "%s:%d\nunmet condition:\"%s\"\n", __FILE__, __LINE__, #x); \
else logger(LOG_ERR, "%s:%d\nerror: %s\nunmet condition:\"%s\"\n", __FILE__, __LINE__,strerror(errno), #x); \
logger(LOG_ERR, msg); \
logger(LOG_ERR, "\n"); \
ret \
} \
} while(0)

#define CHECK(x, msg...) COND_RET(x, return -1;, msg)
#define CHECK_EXIT(x, msg...) COND_RET(x, exit(EXIT_FAILURE);, msg)
#define CHECK_LOG(x, msg...) COND_RET(x, ;, msg)

#define safe_free(__safe_free_ptr) do { if(__safe_free_ptr) { free(__safe_free_ptr); (__safe_free_ptr) = NULL;} } while(0)

#define BUFFER_SIZE 4096


key_t sem_key = 0;
int sem_id = 0;
void rmsem() {
CHECK_LOG(semctl(sem_id, 0, IPC_RMID) != -1, "");
}

union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
#if defined(__linux__)
struct seminfo *__buf;
#endif
};

pid_t taskid = 0;

int safe_atoi(const char *num) {
const char *ptr = num;
while (ptr && *ptr) {
CHECK_EXIT(*ptr >= '0' && *ptr <= '9', "");
ptr++;
}
return atoi(num);
}

int incrsem(int semnum, short incr, short flg) {
struct sembuf sembuf;
sembuf = (struct sembuf){
.sem_num=semnum,
.sem_flg=flg,
.sem_op=incr
};
return semop(sem_id, &sembuf, 1);
}
void siginthandler(int sig) {
rmsem();
}

int initAvai(int semnum) {
union semun arg;
arg.val = 1;
if(semctl(sem_id, semnum, SETVAL, arg) == -1) return -1;
return incrsem(semnum, 1, 0);
}


int initInUse(int semnum) {
union semun arg;
arg.val = 0;
if(semctl(sem_id, semnum, SETVAL, arg) == -1) return -1;
return incrsem(semnum, 0, 0);
}


int P(int semnum) {
int ret;
do {
ret = incrsem(semnum, -1, 0);
} while (ret == -1 && errno == EINTR);
return ret;
}

int testP(int semnum) { // return 1 if sem would be blocked, return 0 if sem would not be blocked.
int ret = incrsem(semnum, -1, IPC_NOWAIT);
if(ret == -1) {
if(errno == EAGAIN)return 1;
else return ret;
}
return 0;
}

int V(int semnum) {
return incrsem(semnum, 1, 0);
}

void init(int argc, char *argv[]) {
logfile = stderr;
sem_key = ftok(argv[0], 'x');
// taskid = safe_atoi(argv[1]); // 1, 2
taskid = fork();
logger(LOG_INFO, "PID = %ld, taskID = %d", (long)getpid(), taskid);
CHECK_EXIT(taskid != -1, "");
union semun arg;
if((sem_id = semget(sem_key, 3, IPC_CREAT | IPC_EXCL | 0666)) != -1) {
atexit(rmsem);
signal(SIGINT, siginthandler);
arg.array = (unsigned short *)&(unsigned short[]){0, 0, 0}; // sem[0] = 1, sem[1] = 0
CHECK_EXIT(semctl(sem_id, 0, SETALL, arg) != -1, "");
CHECK_EXIT(initInUse(1) != -1, "");
CHECK_EXIT(initAvai(0) != -1, "");
} else {
if(errno != EEXIST) {
CHECK_EXIT(false, "");
}
sem_id = semget(sem_key, 3, 0666);
}
struct semid_ds ds;
arg.buf = &ds;
CHECK_EXIT(semctl(sem_id, 0, IPC_STAT, arg) != -1, "");
while (ds.sem_otime == 0) {
logger(LOG_INFO, "waiting for sem init");
sleep(1);
CHECK_EXIT(semctl(sem_id, 0, IPC_STAT, arg) != -1, "");
}
logger(LOG_INFO, "finish init. PID = %ld, taskID = %d", (long)getpid(), taskid);
}

pid_t printsem(int semnum) {
pid_t pid;
CHECK_EXIT((pid = semctl(sem_id, semnum, GETPID)) != -1, "");
logger(LOG_INFO, "semnum=%d, last semop pid=%ld", semnum, (long) pid);
return pid;
}


int main(int argc, char *argv[]) {
init(argc, argv);
switch (taskid) {
case 0:
CHECK_EXIT(P(2) != -1, "");
CHECK_EXIT(incrsem(0, -1, SEM_UNDO) != -1, "");
CHECK_EXIT(V(1) != -1, "");
CHECK_EXIT(P(2) != -1, "");
break;
default:
printsem(0);
CHECK_EXIT(V(2) != -1, "");
CHECK_EXIT(P(1) != -1, "");
printsem(0);
CHECK_EXIT(incrsem(0, 1, 0) != -1, "");
printsem(0);
CHECK_EXIT(V(2) != -1, "");
wait(NULL);
pid_t lastpid = printsem(0);
if(lastpid == taskid) {
logger(LOG_INFO, "IPC_UNDO will change last pid");
} else if (lastpid == getpid()) {
logger(LOG_INFO, "IPC_UNDO will not change last pid");
} else {
logger(LOG_INFO, "other process change sem");
}

break;
}
}

47.6

使用命名管道实现一个二元信号量协议。提供函数来预留、释放以及有条件地预留信号量。

需要共享fd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
//
// Created by root on 9/13/23.
//

#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <stdbool.h>
#include <stddef.h>
#include <syslog.h>
#include <wait.h>
#include <sys/stat.h>


#define DEBUG_LEVEL LOG_DEBUG

#ifndef DEBUG_LEVEL
#define DEBUG_LEVEL LOG_INFO
#endif
FILE *logfile = NULL;
bool syslog_enable = false;
void __attribute__ ((constructor)) init() {
logfile = stderr;
}

#define alloc_sprintf(__alloc_sprintf_str, __format...) do { \
int __alloc_sprintf_len = snprintf(NULL, 0, __format); \
(__alloc_sprintf_str) = malloc(__alloc_sprintf_len+1); \
if(__alloc_sprintf_str != NULL) \
snprintf(__alloc_sprintf_str, __alloc_sprintf_len+1, __format); \
} while(0)

#define logger(level, msg...) do { \
if(level <= DEBUG_LEVEL) { \
fprintf(logfile, msg); \
fprintf(logfile, "\n"); \
} \
if(syslog_enable) { \
char *data; \
alloc_sprintf(data, msg); \
syslog(level, "%s", data); \
safe_free(data); \
} \
} while(0)

#define COND_RET(x, ret, msg...) \
do { \
if(!(x)) { \
if(errno == 0)logger(LOG_ERR, "%s:%d\nunmet condition:\"%s\"\n", __FILE__, __LINE__, #x); \
else logger(LOG_ERR, "%s:%d\nerror: %s\nunmet condition:\"%s\"\n", __FILE__, __LINE__,strerror(errno), #x); \
logger(LOG_ERR, msg); \
logger(LOG_ERR, "\n"); \
ret \
} \
} while(0)

#define CHECK(x, msg...) COND_RET(x, return -1;, msg)
#define CHECK_EXIT(x, msg...) COND_RET(x, exit(EXIT_FAILURE);, msg)
#define CHECK_LOG(x, msg...) COND_RET(x, ;, msg)

#define safe_free(__safe_free_ptr) do { if(__safe_free_ptr) { free(__safe_free_ptr); (__safe_free_ptr) = NULL;} } while(0)

#define BUFFER_SIZE 4096


key_t sem_key = 0;
int sem_id = 0;

pid_t taskid = 0;

int safe_atoi(const char *num) {
const char *ptr = num;
while (ptr && *ptr) {
CHECK_EXIT(*ptr >= '0' && *ptr <= '9', "");
ptr++;
}
return atoi(num);
}
typedef struct {
char id[L_tmpnam];
int rfd, wfd;
}PvFifo_t;
#include <time.h>
PvFifo_t *newPvFifo() {
PvFifo_t *pvFifo = malloc(sizeof(PvFifo_t));
char *id;
alloc_sprintf(id, "PvFifo%d", rand());
strcpy(pvFifo->id, id);
pvFifo->rfd = pvFifo->wfd = -1;
if(mkfifo(id, 0666) == -1) {
CHECK_LOG(false, "");
return NULL;
}
logger(LOG_INFO, "new fifo: %s", id);
return pvFifo;
}

int P(PvFifo_t *pvFifo) {
CHECK(pvFifo != NULL, "pvFifo is NULL");
pvFifo->wfd = open(pvFifo->id, O_WRONLY);
close(pvFifo->wfd);
close(pvFifo->rfd);
pvFifo->wfd = -1;
return 0;
}


int V(PvFifo_t *pvFifo) {
CHECK(pvFifo != NULL, "pvFifo is NULL");
pvFifo->rfd = open(pvFifo->id, O_RDONLY | O_NONBLOCK);
return 0;
}
PvFifo_t *initAvai() {
PvFifo_t *pvFifo = newPvFifo();
if(!pvFifo) {
CHECK_LOG(false, "");
} else if(V(pvFifo) != 0) {
CHECK_LOG(false, "");
safe_free(pvFifo);
}
logger(LOG_INFO, "init :%s", pvFifo->id);
return pvFifo;
}


PvFifo_t *initInUse() {
return newPvFifo();
}

int destroy(PvFifo_t *id) {
if(id->rfd != -1) close(id->rfd);
if(id->wfd != -1) close(id->wfd);
return unlink(id->id);
}
#define CRITICAL_LEN 1024
int critial_cnt = -1;
int criticalArea[CRITICAL_LEN];
PvFifo_t *id = NULL;
void rmPvFifo() {
if(id) destroy(id);
}

pthread_t producer_t, consumer_t;
void exithandler(int sig) {
pthread_kill(producer_t, SIGKILL);
pthread_kill(consumer_t, SIGKILL);
if(sig == SIGHUP)exit(0);
else exit(1);
}

void *consumer(void *args) {
while(1) {
int n;
bool ok = false;
CHECK_EXIT(P(id) != -1, "");
if(critial_cnt >= 0) {
n = criticalArea[critial_cnt--];
ok = true;
}
CHECK_EXIT(V(id) != -1, "");
if(ok) logger(LOG_INFO, "consume: %d", n);
else sleep(1);
}
}

void *producer(void *arg) {
atexit(rmPvFifo);
while(1) {
int n;
bool ok = false;
CHECK_EXIT(P(id) != -1, "");
if(critial_cnt < CRITICAL_LEN - 1) {
critial_cnt++;
n = critial_cnt;
criticalArea[critial_cnt] = n;
ok = true;
}
CHECK_EXIT(V(id) != -1, "");
if(ok) logger(LOG_INFO, "produce: %d", n);
else sleep(1);
}
}

int main(int argc, char **argv) {
srand(time(NULL));
logfile = stderr;
id = initAvai();
CHECK_EXIT(id != NULL, "");
signal(SIGHUP, exithandler);
signal(SIGTERM, exithandler);
signal(SIGALRM, exithandler);
alarm(10);
pthread_create(&producer_t, NULL, producer, NULL);
pthread_create(&consumer_t, NULL, consumer, NULL);
// pthread_exit(NULL);
pthread_join(producer_t, NULL);
pthread_join(consumer_t, NULL);
}

不需要共享fd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
//
// Created by root on 9/13/23.
//

#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <stdbool.h>
#include <stddef.h>
#include <syslog.h>
#include <wait.h>
#include <sys/stat.h>


#define DEBUG_LEVEL LOG_DEBUG

#ifndef DEBUG_LEVEL
#define DEBUG_LEVEL LOG_INFO
#endif
FILE *logfile = NULL;
bool syslog_enable = false;
void __attribute__ ((constructor)) init() {
logfile = stderr;
}

#define alloc_sprintf(__alloc_sprintf_str, __format...) do { \
int __alloc_sprintf_len = snprintf(NULL, 0, __format); \
(__alloc_sprintf_str) = malloc(__alloc_sprintf_len+1); \
if(__alloc_sprintf_str != NULL) \
snprintf(__alloc_sprintf_str, __alloc_sprintf_len+1, __format); \
} while(0)

#define logger(level, msg...) do { \
if(level <= DEBUG_LEVEL) { \
fprintf(logfile, msg); \
fprintf(logfile, "\n"); \
} \
if(syslog_enable) { \
char *__data; \
alloc_sprintf(__data, msg); \
syslog(level, "%s", __data); \
safe_free(__data); \
} \
} while(0)

#define COND_RET(x, ret, msg...) \
do { \
if(!(x)) { \
if(errno == 0)logger(LOG_ERR, "%s:%d\nunmet condition:\"%s\"\n", __FILE__, __LINE__, #x); \
else logger(LOG_ERR, "%s:%d\nerror: %s\nunmet condition:\"%s\"\n", __FILE__, __LINE__,strerror(errno), #x); \
logger(LOG_ERR, msg); \
logger(LOG_ERR, "\n"); \
ret \
} \
} while(0)

#define CHECK(x, msg...) COND_RET(x, return -1;, msg)
#define CHECK_EXIT(x, msg...) COND_RET(x, exit(EXIT_FAILURE);, msg)
#define CHECK_LOG(x, msg...) COND_RET(x, ;, msg)

#define safe_free(__safe_free_ptr) do { if(__safe_free_ptr) { free(__safe_free_ptr); (__safe_free_ptr) = NULL;} } while(0)

#define BUFFER_SIZE 4096


key_t sem_key = 0;
int sem_id = 0;

pid_t taskid = 0;

int safe_atoi(const char *num) {
const char *ptr = num;
while (ptr && *ptr) {
CHECK_EXIT(*ptr >= '0' && *ptr <= '9', "");
ptr++;
}
return atoi(num);
}
typedef struct {
char id[L_tmpnam];
int rfd, wfd;
}PvFifo_t;

int openPvfifo(char *id, int rw) {
int fd = open(id, rw | O_NONBLOCK);
CHECK(fd != -1, "");
int flag;
if((flag = fcntl(fd, F_GETFL)) == -1) {
CHECK_LOG(false, "");
close(fd);
return -1;
}
flag &= ~O_NONBLOCK;
if(fcntl(fd, F_SETFL, flag) == -1) {
CHECK_LOG(false, "");
close(fd);
return -1;
}
return fd;
}
PvFifo_t *newPvFifo(char *id) {
PvFifo_t *pvFifo = malloc(sizeof(PvFifo_t));
strcpy(pvFifo->id, id);
if(mkfifo(id, 0666) == -1) {
if(errno != EEXIST) {
CHECK_LOG(false, "");
return NULL;
}
}
pvFifo->rfd = openPvfifo(id, O_RDONLY);
pvFifo->wfd = -1;
logger(LOG_INFO, "new fifo: %s", id);
return pvFifo;
}

int P(PvFifo_t *pvFifo) {
CHECK(pvFifo != NULL, "pvFifo is NULL");
int ret = 0;
bool buf;
if((ret = read(pvFifo->rfd, &buf, sizeof(bool)) != sizeof(bool))) {
CHECK_LOG(false, "");
} else ret = 0;
return ret;
}


int V(PvFifo_t *pvFifo) {
CHECK(pvFifo != NULL, "pvFifo is NULL");
pvFifo->wfd = openPvfifo(pvFifo->id, O_WRONLY);
CHECK(pvFifo->wfd != -1, "");
int ret = 0;
if((ret = write(pvFifo->wfd, &(bool[]){true}, sizeof(bool)) != sizeof(bool))) {
CHECK_LOG(false, "");
} else ret = 0;
close(pvFifo->wfd);
return ret;
}
PvFifo_t *initAvai(char *id) {
PvFifo_t *pvFifo = newPvFifo(id);
if(!pvFifo) {
CHECK_LOG(false, "");
} else if(V(pvFifo) != 0) {
CHECK_LOG(false, "");
safe_free(pvFifo);
}
logger(LOG_INFO, "init :%s", pvFifo->id);
return pvFifo;
}


PvFifo_t *initInUse(char *id) {
return newPvFifo(id);
}

int destroy(PvFifo_t *id) {
if(id->rfd != -1) close(id->rfd);
if(id->wfd != -1) close(id->wfd);
return unlink(id->id);
}
PvFifo_t *id = NULL;
void rmPvFifo() {
if(id) destroy(id);
}

char *fn = NULL;
void exithandler(int sig) {
if(fn) unlink(fn);
if(sig == SIGHUP)exit(0);
else exit(1);
}
int comm_file;
#define MAX 65536
void *consumer(void *args) {
while(1) {
CHECK_EXIT(P(id) != -1, "");
int offset;
CHECK_EXIT(pread(comm_file, &offset, sizeof(int), 0) == sizeof(int), "");
if(offset > 1) {
int data;
CHECK_EXIT(pread(comm_file, &data, sizeof(int), (offset - 1) * sizeof(int)) == sizeof(int), "");
logger(LOG_INFO, "consume: %d", data);
offset--;
CHECK_EXIT(pwrite(comm_file, &offset, sizeof(int), 0) == sizeof(int), "");
} else sleep(1);
CHECK_EXIT(V(id) != -1, "");
}
}

void *producer(void *arg) {
atexit(rmPvFifo);
while(1) {
int offset;
CHECK_EXIT(P(id) != -1, "");
CHECK_EXIT(pread(comm_file, &offset, sizeof(int), 0) == sizeof(int), "");
if(offset < MAX) {
int data = offset;
CHECK_EXIT(pwrite(comm_file, &data, sizeof(int), offset * sizeof(int)) == sizeof(int), "");
offset++;
CHECK_EXIT(pwrite(comm_file, &offset, sizeof(int), 0) == sizeof(int), "");
logger(LOG_INFO, "produce: %d", data);
} else sleep(1);
CHECK_EXIT(V(id) != -1, "");
}
}

int main(int argc, char **argv) {
CHECK_EXIT(argc > 1, "");
id = initAvai("meoww");
CHECK_EXIT(id != NULL, "");
signal(SIGHUP, exithandler);
signal(SIGTERM, exithandler);
signal(SIGINT, exithandler);
signal(SIGALRM, exithandler);
alloc_sprintf(fn, "%s-comm", argv[0]);
comm_file = open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
if(comm_file == -1) {
if(errno == EEXIST) {
comm_file = open(fn, O_RDWR, 0666);
} else {
CHECK_LOG(false, "");
}
} else {
CHECK_EXIT(pwrite(comm_file, &(int[]){1}, sizeof(int), 0) == sizeof(int), "");
}
if(!strcmp(argv[1], "producer")) {
producer(NULL);
} else {
consumer(NULL);
}
}

47.5

在VMS操作系统上,Digital提供了一种类似于二元信号量的同步方法,它被称为事件标记(event flag)。一个事件标记可以取两个值clear和set,并且在其之上可以执行下面4种操作:setEventFlag来设置标记;clearEventFlag来清除标记;waitForEventFlag阻塞直到标记被设置;getFlagState获取标记的当前状态。使用System V信号量为事件标记设计一种实现。这个实现要求上面每个函数都接收两个参数:一个是信号量标识符,一个是信号量序号。(在考虑waitForEventFlag操作时将会发现为clear和set状态取值不是一件容易的事情。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
//
// Created by root on 9/13/23.
//

#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <stdbool.h>
#include <stddef.h>
#include <syslog.h>
#include <wait.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/sem.h>


#define DEBUG_LEVEL LOG_DEBUG

#ifndef DEBUG_LEVEL
#define DEBUG_LEVEL LOG_INFO
#endif
#define STRING_MSG "MSG"
FILE *logfile = NULL;
bool syslog_enable = false;
void __attribute__ ((constructor)) init() {
syslog_enable = false;
logfile = stderr;
}

#define alloc_sprintf(__alloc_sprintf_str, __format...) do { \
int __alloc_sprintf_len = snprintf(NULL, 0, __format); \
(__alloc_sprintf_str) = malloc(__alloc_sprintf_len+1); \
if(__alloc_sprintf_str != NULL) \
snprintf(__alloc_sprintf_str, __alloc_sprintf_len+1, __format); \
} while(0)

#define logger(level, msg...) do { \
if(level <= DEBUG_LEVEL) { \
fprintf(logfile, "[%ld] ", (long) getpid()); \
fprintf(logfile, msg); \
fprintf(logfile, "\n"); \
} \
if(syslog_enable) { \
char *data; \
alloc_sprintf(data, msg); \
syslog(level, "%s", data); \
safe_free(data); \
} \
} while(0)

#define COND_RET(x, ret, msg...) \
do { \
if(!(x)) { \
if(errno != 0) logger(LOG_ERR, "Error(%d): %s", errno, strerror(errno)); \
logger(LOG_ERR, "%s:%d", __FILE__, __LINE__); \
logger(LOG_ERR, "unmet condition:\"%s\"", #x); \
logger(LOG_ERR, msg); \
ret; \
} \
errno = 0; \
} while(0)

#define CHECK(x, msg...) COND_RET(x, return -1;, msg)
#define CHECK_EXIT(x, msg...) COND_RET(x, exit(EXIT_FAILURE);, msg)
#define CHECK_LOG(x, msg...) COND_RET(x, ;, msg)

#define safe_free(__safe_free_ptr) do { if(__safe_free_ptr) { free(__safe_free_ptr); (__safe_free_ptr) = NULL;} } while(0)

#define BUFFER_SIZE 4096

union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
#if defined(__linux__)
struct seminfo *__buf;
#endif
};

typedef struct {
int semid;
}EventFlag_t;

int incrsem(int sem_id, int semnum, short incr, short flg) {
struct sembuf sembuf;
sembuf = (struct sembuf){
.sem_num=semnum,
.sem_flg=flg,
.sem_op=incr
};
return semop(sem_id, &sembuf, 1);
}

int P(int sem_id) {
return incrsem(sem_id, 0, -1, 0);
}

int V(int sem_id) {
return incrsem(sem_id, 0, 1, 0);
}

int waitFor(int sem_id) {
return incrsem(sem_id, 1, 0, 0);
}

int getsem(int sem_id, int semnum, short *n) {
union semun arg;
int ret = semctl(sem_id, semnum, GETVAL, arg);
*n = ret;
return ret;
}

int setsem(int sem_id, int semnum, short n) {
union semun arg;
arg.val = n;
return semctl(sem_id, semnum, SETVAL, arg);
}

int notifyAll(int sem_id) {
return setsem(sem_id, 1, 0);
}
EventFlag_t *newEventFlag(const char *file, char x) {
EventFlag_t eventFlag;
key_t key = ftok(file, x);
eventFlag.semid = semget(key, 4, IPC_CREAT | IPC_EXCL | 0666); // sem[0] as mutex, sem[1] as notifier, sem[2]|sem[3] as flag
union semun arg;
if(eventFlag.semid == -1) {
if(errno == EEXIST) {
eventFlag.semid = semget(key, 4, 0666);
COND_RET(eventFlag.semid != -1, return NULL, STRING_MSG);
struct semid_ds ds;
arg.buf = &ds;
COND_RET(semctl(eventFlag.semid, 0, IPC_STAT, arg) != -1, return NULL, STRING_MSG);
while (ds.sem_otime == 0) {
logger(LOG_INFO, "waiting for sem init");
sleep(1);
COND_RET(semctl(eventFlag.semid, 0, IPC_STAT, arg) != -1, return NULL, STRING_MSG);
}
logger(LOG_INFO, "semget get old");
} else {
CHECK_LOG(false, STRING_MSG);
return NULL;
}
} else {
arg.array = (unsigned short *)&(unsigned short[]){0, 0, 0, 0}; // sem[0] = 1, sem[1] = 0
if(semctl(eventFlag.semid, 0, SETALL, arg) == -1) {
CHECK_LOG(false, STRING_MSG);
CHECK_LOG(semctl(eventFlag.semid, 0, IPC_RMID) != -1, STRING_MSG);
return NULL;
}
if(incrsem(eventFlag.semid, 0,1,0) == -1) {
CHECK_LOG(false, STRING_MSG);
CHECK_LOG(semctl(eventFlag.semid, 0, IPC_RMID) != -1, STRING_MSG);
return NULL;
}
logger(LOG_INFO, "semget create new");
}
return memcpy(malloc(sizeof(EventFlag_t)), &eventFlag, sizeof(EventFlag_t));
}

int setEventFlag(EventFlag_t *eventFlag, int flag) {
CHECK(P(eventFlag->semid) != -1, STRING_MSG);
short currentFlag;

CHECK(getsem(eventFlag->semid, 2, &currentFlag) != -1, STRING_MSG);
CHECK(setsem(eventFlag->semid, 2, currentFlag | (flag & 0xffff)) != -1, STRING_MSG);

CHECK(getsem(eventFlag->semid, 3, &currentFlag) != -1, STRING_MSG);
CHECK(setsem(eventFlag->semid, 3, currentFlag | ((flag >> 16) & 0xffff)) != -1, STRING_MSG);

CHECK(notifyAll(eventFlag->semid) != -1, STRING_MSG);
CHECK(V(eventFlag->semid) != -1, STRING_MSG);
return 0;
}

int clearEventFlag(EventFlag_t *eventFlag, int flag) {
CHECK(P(eventFlag->semid) != -1, STRING_MSG);
short currentFlag;

CHECK(getsem(eventFlag->semid, 2, &currentFlag) != -1, STRING_MSG);
CHECK(setsem(eventFlag->semid, 2, currentFlag & ~(flag & 0xffff)) != -1, STRING_MSG);

CHECK(getsem(eventFlag->semid, 3, &currentFlag) != -1, STRING_MSG);
CHECK(setsem(eventFlag->semid, 3, currentFlag & ~((flag >> 16) & 0xffff)) != -1, STRING_MSG);
// CHECK(notifyAll(eventFlag->semid) != -1, STRING_MSG);
CHECK(V(eventFlag->semid) != -1, STRING_MSG);
return 0;
}

int __getEventFlag(EventFlag_t *eventFlag, int *flag) {
if(flag) {
short currentFlag = 0;
int ret = 0;
CHECK(getsem(eventFlag->semid, 2, &currentFlag) != -1, STRING_MSG);
ret |= currentFlag;
CHECK(getsem(eventFlag->semid, 3, &currentFlag) != -1, STRING_MSG);
ret |= currentFlag << 16;
*flag = ret;
return 0;
}
return -1;
}

int getEventFlag(EventFlag_t *eventFlag, int *flag) {
CHECK(P(eventFlag->semid) != -1, STRING_MSG);
CHECK_LOG(__getEventFlag(eventFlag, flag) != -1, STRING_MSG);
CHECK(V(eventFlag->semid) != -1, STRING_MSG);
return 0;
}
#define waitForMethod(eventFlag, flag, method) do { \
int currentFlag;\
CHECK(P(eventFlag->semid) != -1, STRING_MSG);\
__getEventFlag(eventFlag, &currentFlag); \
while (method) {\
CHECK(V(eventFlag->semid) != -1, STRING_MSG);\
CHECK(waitFor(eventFlag->semid) != -1, STRING_MSG);\
CHECK(P(eventFlag->semid) != -1, STRING_MSG);\
__getEventFlag(eventFlag, &currentFlag); \
}\
CHECK(V(eventFlag->semid) != -1, STRING_MSG);\
} while(0)

int waitForAny(EventFlag_t *eventFlag, int flag) {
waitForMethod(eventFlag, flag, (currentFlag & flag) == 0);
return 0;
}
int waitForAll(EventFlag_t *eventFlag, int flag) {
waitForMethod(eventFlag, flag, (currentFlag & flag) != flag);
return 0;
}
void destroyEventFlag(EventFlag_t **eventFlag) {
if(eventFlag) {
if(*eventFlag) {
semctl((*eventFlag)->semid, 0, IPC_RMID);
}
safe_free(*eventFlag);
}
}
EventFlag_t *eventFlag = NULL;

int safe_atoi(const char *str) {
if(!str) {
CHECK_LOG(false, "safe atoi, str is null");
fflush(NULL);
raise(SIGABRT);
}
const char *p = str;
while(*p) {
if(*p > '9' || *p < '0') {
CHECK_LOG(false, "safe atoi, not valid char: %c", *p);
fflush(NULL);
raise(SIGABRT);
}
p++;
}
return atoi(str);
}


void printFlag(int flag) {
logger(LOG_INFO, "printFlag");
int mask = 1;
if(mask & flag) logger(LOG_INFO, "%d", 0);
for(int cnt = 1; cnt < 32; cnt++) {
mask <<= 1;
if(mask & flag) logger(LOG_INFO, "%d", cnt);
}
}

#define printINT(info, x) logger(LOG_INFO, "("#info")."#x": %d", (info).x)
#define printUINT(info, x) logger(LOG_INFO, "("#info")."#x": %ld", (info).x)
#define printTIME(ds, x) logger(LOG_INFO, "("#ds")."#x": %s", ctime(&(ds).x))

int printInfo(int id) {
union semun arg;
struct seminfo info;
struct semid_ds ds;
arg.__buf = &info;
CHECK(semctl(id, 0, SEM_INFO, arg) != -1, STRING_MSG);
printINT(info, semmap);
printINT(info, semmni);
printINT(info, semmns);
printINT(info, semmnu);
printINT(info, semmsl);
printINT(info, semopm);
printINT(info, semume);
printINT(info, semusz);
printINT(info, semvmx);
printINT(info, semaem);
arg.buf = &ds;
CHECK(semctl(id, 0, IPC_STAT, arg) != -1, STRING_MSG);
printUINT(ds, sem_nsems);
printTIME(ds, sem_otime);
printTIME(ds, sem_ctime);
return 0;
}

int wrapPrintFlag(int flag, int (*func)(EventFlag_t *, int)) {
int currentFlag = 0;
CHECK_LOG(getEventFlag(eventFlag, &currentFlag) != -1, STRING_MSG);
printFlag(currentFlag);
CHECK_LOG((*func)(eventFlag, flag) != -1, STRING_MSG);
CHECK_LOG(getEventFlag(eventFlag, &currentFlag) != -1, STRING_MSG);
printFlag(currentFlag);
return 0;
}

#define FLAG_RANGE(flg) ((16 <= (flg) && (flg) < 31) || (0 <= (flg) && (flg) < 15))
#define XXSTRING(s) XSTRING(s) //与层数相关
#define XSTRING(s) STRING(s)
#define STRING(s) #s
int main(int argc, char **argv) {
CHECK_EXIT(argc > 1, "Usage: %s destroy|waitAny|waitAll|add|clear|get [flag]", argv[0]);
eventFlag = newEventFlag(argv[0], 'x');
CHECK_EXIT(eventFlag != NULL, STRING_MSG);
int flag = 0;
for(int i = 2; i < argc; i++) {
int flg = safe_atoi(argv[i]);
if(!FLAG_RANGE(flg)){
CHECK_LOG(false, "Usage: %s", XXSTRING(FLAG_RANGE(flg)));
continue;
}
flag |= (1 << flg);
}
printInfo(eventFlag->semid);
logger(LOG_INFO, "start %s", argv[1]);
if(!strcmp(argv[1], "waitAny")) {
CHECK_LOG(wrapPrintFlag(flag, waitForAny) != -1, STRING_MSG);
} else if(!strcmp(argv[1], "waitAll")) {
CHECK_LOG(wrapPrintFlag(flag, waitForAll) != -1, STRING_MSG);
} else if(!strcmp(argv[1], "add")) {
CHECK_LOG(wrapPrintFlag(flag, setEventFlag) != -1, STRING_MSG);
} else if(!strcmp(argv[1], "clear")) {
CHECK_LOG(wrapPrintFlag(flag, clearEventFlag) != -1, STRING_MSG);
} else if(!strcmp(argv[1], "get")) {
CHECK_LOG(getEventFlag(eventFlag, &flag) != -1, STRING_MSG);
printFlag(flag);
} else if(!strcmp(argv[1], "destroy")) {
destroyEventFlag(&eventFlag);
} else {
CHECK_EXIT(false, "Usage: %s waitAny|waitAll|add|clear|get [flag]", argv[0]);
}
logger(LOG_INFO, "end %s", argv[1]);
}
作者

Meow Meow Liu

发布于

2023-09-13

更新于

2024-04-23

许可协议

评论