Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/lib/libcoshell/cowait.c
1808 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 1990-2011 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Glenn Fowler <[email protected]> *
18
* *
19
***********************************************************************/
20
#pragma prototyped
21
/*
22
* Glenn Fowler
23
* AT&T Research
24
*
25
* wait for and return status of job or the next coshell job that completes
26
* job==co for non-blocking wait
27
*/
28
29
#include "colib.h"
30
31
#include <ctype.h>
32
33
/*
34
* cat and remove fd {1,2} serialized output
35
*/
36
37
static void
38
cat(Cojob_t* job, char** path, Sfio_t* op)
39
{
40
Sfio_t* sp;
41
42
if (sp = sfopen(NiL, *path, "r"))
43
{
44
sfmove(sp, op, SF_UNBOUND, -1);
45
sfclose(sp);
46
}
47
else
48
errormsg(state.lib, ERROR_LIBRARY|2, "%s: cannot open job %d serialized output", *path, job->id);
49
remove(*path);
50
free(*path);
51
*path = 0;
52
}
53
54
/*
55
* the number of running+zombie jobs
56
* these would count against --jobs or NPROC
57
*/
58
59
int
60
cojobs(Coshell_t* co)
61
{
62
int any;
63
int n;
64
65
if (co)
66
any = 0;
67
else if (!(co = state.coshells))
68
return -1;
69
else
70
any = 1;
71
n = 0;
72
do
73
{
74
n += co->outstanding;
75
} while (any && (co = co->next));
76
return n;
77
}
78
79
/*
80
* the number of pending cowait()'s
81
*/
82
83
int
84
copending(Coshell_t* co)
85
{
86
int any;
87
int n;
88
89
if (co)
90
any = 0;
91
else if (!(co = state.coshells))
92
return -1;
93
else
94
any = 1;
95
n = 0;
96
do
97
{
98
n += co->outstanding + co->svc_outstanding;
99
} while (any && (co = co->next));
100
return n;
101
}
102
103
/*
104
* the number of completed jobs not cowait()'d for
105
* cowait() always reaps the zombies first
106
*/
107
108
int
109
cozombie(Coshell_t* co)
110
{
111
int any;
112
int n;
113
114
if (co)
115
any = 0;
116
else if (!(co = state.coshells))
117
return -1;
118
else
119
any = 1;
120
n = 0;
121
do
122
{
123
n += (co->outstanding + co->svc_outstanding) - (co->running + co->svc_running);
124
} while (any && (co = co->next));
125
return n;
126
}
127
128
Cojob_t*
129
cowait(register Coshell_t* co, Cojob_t* job, int timeout)
130
{
131
register char* s;
132
register Cojob_t* cj;
133
register Coservice_t* cs;
134
register ssize_t n;
135
char* b;
136
char* e;
137
unsigned long user;
138
unsigned long sys;
139
int active;
140
int any;
141
int id;
142
int loop;
143
int to;
144
int type;
145
char buf[128];
146
147
static unsigned long serial = 0;
148
149
serial++;
150
if (co || job && (co = job->coshell))
151
any = 0;
152
else if (!(co = state.coshells))
153
goto echild;
154
else
155
any = 1;
156
157
/*
158
* first drain the zombies
159
*/
160
161
active = 0;
162
to = timeout >= 0 ? timeout : 60 * 1000;
163
zombies:
164
do
165
{
166
#if 0
167
errormsg(state.lib, 2, "coshell %d zombie wait %lu timeout=%d outstanding=<%d,%d> running=<%d,%d>", co->index, serial, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running);
168
#endif
169
if ((co->outstanding + co->svc_outstanding) > (co->running + co->svc_running))
170
for (cj = co->jobs; cj; cj = cj->next)
171
if (cj->pid == CO_PID_ZOMBIE && (!job || cj == job))
172
{
173
cj->pid = CO_PID_FREE;
174
if (cj->service)
175
co->svc_outstanding--;
176
else
177
co->outstanding--;
178
#if 0
179
errormsg(state.lib, 2, "coshell %d zombie wait %lu timeout=%d outstanding=<%d,%d> running=<%d,%d> reap job %d", co->index, serial, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running, cj->id);
180
#endif
181
return cj;
182
}
183
else if (cj->service && !cj->service->pid)
184
{
185
cj->pid = CO_PID_ZOMBIE;
186
cj->status = 2;
187
cj->service = 0;
188
co->svc_running--;
189
}
190
if (co->running > 0)
191
active = 1;
192
else if (co->svc_running > 0)
193
{
194
n = 0;
195
for (cs = co->service; cs; cs = cs->next)
196
if (cs->pid && kill(cs->pid, 0))
197
{
198
cs->pid = 0;
199
close(cs->fd);
200
cs->fd = -1;
201
n = 1;
202
}
203
if (n)
204
goto zombies;
205
active = 1;
206
}
207
} while (any && (co = co->next));
208
209
/*
210
* reap the active jobs
211
*/
212
213
if (!active)
214
goto echild;
215
if (any)
216
co = state.coshells;
217
do
218
{
219
loop = 0;
220
for (;;)
221
{
222
if (co->flags & CO_DEBUG)
223
{
224
loop++;
225
errormsg(state.lib, 2, "coshell %d wait %lu.%d timeout=%d outstanding=<%d,%d> running=<%d,%d>", co->index, serial, loop, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running);
226
for (cj = co->jobs; cj; cj = cj->next)
227
if (cj->pid != CO_PID_FREE)
228
errormsg(state.lib, 2, "\tjob %d pid=%d status=%d", cj->id, cj->pid, cj->status);
229
}
230
if (co->running <= 0)
231
break;
232
while ((n = sfpoll(&co->msgfp, 1, to)) < 1)
233
{
234
if (n < 0)
235
{
236
if (errno == EINTR)
237
return 0;
238
break;
239
}
240
if (timeout >= 0)
241
break;
242
243
/*
244
* check for a killed job with no status
245
*/
246
247
for (cj = co->jobs; cj; cj = cj->next)
248
if (cj->pid > 0)
249
{
250
n = sfsprintf(buf, sizeof(buf), "kill -0 %d 2>/dev/null || echo k %d `wait %d 2>/dev/null; echo $?` >&$%s\n", cj->pid, cj->id, cj->pid, CO_ENV_MSGFD);
251
write(co->cmdfd, buf, n);
252
break;
253
}
254
}
255
256
/*
257
* get one coshell message
258
*/
259
260
if (!(s = b = sfgetr(co->msgfp, '\n', 1)))
261
break;
262
#if 0
263
errormsg(state.lib, 2, "coshell %d active wait %lu timeout=%d outstanding=<%d,%d> running=<%d,%d>", co->index, serial, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running);
264
#endif
265
266
/*
267
* read and parse a coshell message packet of the form
268
*
269
* <type> <id> <args> <newline>
270
* %c %d %s %c
271
*/
272
273
while (isspace(*s))
274
s++;
275
if (!(type = *s) || type != 'a' && type != 'j' && type != 'k' && type != 'x')
276
goto invalid;
277
while (*++s && !isspace(*s));
278
id = strtol(s, &e, 10);
279
if (*e && !isspace(*e))
280
goto invalid;
281
for (s = e; isspace(*s); s++);
282
283
/*
284
* locate id in the job list
285
*/
286
287
for (cj = co->jobs; cj; cj = cj->next)
288
if (id == cj->id)
289
break;
290
if ((co->flags | (cj ? cj->flags : 0)) & CO_DEBUG)
291
errormsg(state.lib, 2, "coshell %d message \"%c %d %s\"", co->index, type, id, s);
292
if (!cj)
293
{
294
if (type == 'k')
295
continue;
296
errormsg(state.lib, 2, "coshell %d job id %d not found [%s]", co->index, id, b);
297
errno = ESRCH;
298
return 0;
299
}
300
301
/*
302
* now interpret the message
303
*/
304
305
switch (type)
306
{
307
308
case 'a':
309
/*
310
* coexec() ack
311
*/
312
313
if (cj == job)
314
return cj;
315
break;
316
317
case 'j':
318
/*
319
* <s> is the job pid
320
*/
321
322
n = cj->pid;
323
cj->pid = strtol(s, NiL, 10);
324
if (n == CO_PID_WARPED)
325
goto nuke;
326
break;
327
328
case 'k':
329
/*
330
* <s> is a synthesized killed status
331
*/
332
333
if (cj->pid < 0)
334
continue;
335
/*FALLTHROUGH*/
336
337
case 'x':
338
/*
339
* <s> is the job exit code and user,sys times
340
*/
341
342
cj->status = strtol(s, &e, 10);
343
user = sys = 0;
344
for (;;)
345
{
346
if (e <= s)
347
break;
348
for (s = e; isalpha(*s) || isspace(*s); s++);
349
user += strelapsed(s, &e, CO_QUANT);
350
if (e <= s)
351
break;
352
for (s = e; isalpha(*s) || isspace(*s); s++);
353
sys += strelapsed(s, &e, CO_QUANT);
354
}
355
cj->user += user;
356
cj->sys += sys;
357
co->user += user;
358
co->sys += sys;
359
if (cj->out)
360
cat(cj, &cj->out, sfstdout);
361
if (cj->err)
362
cat(cj, &cj->err, sfstderr);
363
if (cj->pid > 0 || cj->service || (co->flags & (CO_INIT|CO_SERVER)))
364
{
365
nuke:
366
if (cj->pid > 0 && type != 'k')
367
{
368
/*
369
* nuke the zombies
370
*/
371
372
n = sfsprintf(buf, sizeof(buf), "wait %d\n", cj->pid);
373
write(co->cmdfd, buf, n);
374
}
375
if (cj->service)
376
co->svc_running--;
377
else
378
co->running--;
379
if (!job || cj == job)
380
{
381
cj->pid = CO_PID_FREE;
382
if (cj->service)
383
co->svc_outstanding--;
384
else
385
co->outstanding--;
386
#if 0
387
errormsg(state.lib, 2, "coshell %d active wait %lu timeout=%d outstanding=<%d,%d> running=<%d,%d> reap job %d", co->index, serial, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running, cj->id);
388
#endif
389
return cj;
390
}
391
cj->pid = CO_PID_ZOMBIE;
392
}
393
else
394
cj->pid = CO_PID_WARPED;
395
break;
396
397
}
398
}
399
} while (any && (co = co->next));
400
return 0;
401
echild:
402
#if 0
403
errormsg(state.lib, 2, "coshell wait ECHILD");
404
#endif
405
errno = ECHILD;
406
return 0;
407
invalid:
408
errormsg(state.lib, 2, "coshell %d invalid message \"%-.*s>>>%s<<<\"", co->index, s - b, b, s);
409
errno = EINVAL;
410
return 0;
411
}
412
413