Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/cmd/coshell/job.c
1808 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 1990-2011 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Glenn Fowler <[email protected]> *
18
* *
19
***********************************************************************/
20
#pragma prototyped
21
/*
22
* Glenn Fowler
23
* AT&T Bell Laboratories
24
*
25
* remote coshell server job and connection support
26
*/
27
28
#include "service.h"
29
30
/*
31
* drop a connection
32
*/
33
34
void
35
drop(register int fd)
36
{
37
register int i;
38
register int n;
39
Coshell_t* sp;
40
Cojob_t* jp;
41
Sfio_t* tp;
42
43
switch (state.con[fd].type)
44
{
45
case USER:
46
if (state.con[fd].info.user.running)
47
for (jp = state.job; jp <= state.jobmax; jp++)
48
if (jp->fd == fd && jp->pid)
49
{
50
jp->fd = 0;
51
jobkill(jp, SIGKILL);
52
}
53
/*FALLTHROUGH*/
54
case INIT:
55
for (i = 0; i < elementsof(state.con[fd].info.user.fds); i++)
56
if ((n = state.con[fd].info.user.fds[i]) != fd && n >= 0 && state.con[n].type)
57
{
58
close(n);
59
state.con[n].type = 0;
60
}
61
if (sp = state.con[fd].info.user.home)
62
sp->home--;
63
if (state.con[fd].info.user.pump)
64
free(state.con[fd].info.user.pump);
65
if (state.con[fd].info.user.expr)
66
free(state.con[fd].info.user.expr);
67
break;
68
case PASS:
69
if (tp = state.con[fd].info.pass.serialize)
70
{
71
state.con[fd].info.pass.serialize = 0;
72
cswrite(state.con[fd].info.pass.fd, sfstrbase(tp), sfstrtell(tp));
73
sfstrclose(tp);
74
}
75
if ((jp = state.con[fd].info.pass.job) && jp->pid)
76
{
77
if (--jp->ref <= 0)
78
jobdone(jp);
79
else if (!jp->lost)
80
jp->lost = cs.time + UPDATE;
81
}
82
break;
83
case POLL:
84
error(ERROR_SYSTEM|3, "lost connect stream");
85
break;
86
case SHELL:
87
sp = state.con[fd].info.shell;
88
sp->fd = -1;
89
shellclose(sp, -1);
90
break;
91
}
92
state.con[fd].type = 0;
93
state.con[fd].error = 0;
94
csfd(fd, CS_POLL_CLOSE);
95
}
96
97
/*
98
* check the job table for hogs running on busy shells
99
* or jobs queued on hung shells
100
* if only!=0 then only that shell is checked
101
*/
102
103
void
104
jobcheck(Coshell_t* only)
105
{
106
register Cojob_t* jp;
107
register Coshell_t* sp;
108
char* s;
109
110
for (jp = state.job; jp <= state.jobmax; jp++)
111
if (jp->pid && ((sp = jp->shell) == only || !only))
112
{
113
if (jp->pid > 0)
114
{
115
if (sp->update <= cs.time && sp->errors < ERRORS) update(sp);
116
if (jp->lost && jp->lost < cs.time)
117
{
118
message((-4, "jobcheck: %s: job %d pid %d lost", sp->name, jp->rid, jp->pid));
119
jp->sig = SIGKILL;
120
jobdone(jp);
121
continue;
122
}
123
if (sp->idle)
124
{
125
if (sp->stat.idle < state.busy && (!sp->bypass || !miscmatch(sp, sp->bypass)))
126
{
127
if (!jp->busy && state.grace) jp->busy = cs.time + state.grace;
128
else if (jp->busy < cs.time)
129
{
130
if (state.migrate)
131
{
132
int n;
133
134
error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d %s", sp->name, jp - state.job, jp->pid, state.migrate);
135
n = sfprintf(state.string, "job=%d; pid=%d; host=%s; type=%s; %s\n", jp - state.job, jp->pid, sp->name, sp->type, state.migrate);
136
if (s = sfstruse(state.string))
137
cswrite(jp->shell->fd, s, n);
138
else
139
error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "out of space");
140
jp->sig = SIGKILL;
141
jobdone(jp);
142
}
143
else
144
{
145
#ifdef SIGSTOP
146
error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d stopped", sp->name, jp - state.job, jp->pid);
147
jobkill(jp, SIGSTOP);
148
#else
149
error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "%s: job=%d pid=%d is a hog", sp->name, jp - state.job, jp->pid);
150
#endif
151
jp->busy = HOG;
152
}
153
}
154
continue;
155
}
156
}
157
if (jp->busy == HOG)
158
{
159
#ifdef SIGCONT
160
error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "job %d pid %d restarted on %s", jp - state.job, jp->pid, sp->name);
161
jobkill(jp, SIGCONT);
162
#else
163
error(ERROR_OUTPUT|2, state.con[jp->fd].info.user.fds[2], "job %d pid %d is no longer hogging %s", jp - state.job, jp->pid, sp->name);
164
jp->busy = 0;
165
#endif
166
}
167
}
168
else if (jp->cmd)
169
{
170
if (jp->pid == QUEUE && (sp->fd > 0 || sp == &state.wait))
171
{
172
if (state.running < (state.perserver + state.jobwait) && state.con[jp->fd].info.user.running < (state.peruser + 1) && (sp == &state.wait || sp->running < ((state.perhost ? state.perhost : sp->cpu * state.percpu) + 1)))
173
shellexec(jp, jp->cmd, jp->fd);
174
}
175
else if (cs.time > jp->start + LOST)
176
{
177
message((-4, "jobcheck: %s: possibly hung %s", sp->name, fmtelapsed(cs.time - sp->start, 1)));
178
shellclose(sp, -1);
179
}
180
}
181
}
182
if (state.jobwait || state.shellwait) cswakeup(state.wakeup = UPDATE * 1000L);
183
else if (!state.busy || !state.running) cswakeup(state.wakeup = 0L);
184
else if (!only) cswakeup(state.wakeup = UPDATE * 1000L);
185
}
186
187
/*
188
* kill job with sig
189
*/
190
191
void
192
jobkill(Cojob_t* jp, int sig)
193
{
194
int n;
195
char buf[128];
196
197
if (jp->pid)
198
{
199
jp->sig = sig;
200
jp->busy = 0;
201
if (jp->pid > 0 && jp->shell->fd > 0)
202
{
203
n = sfsprintf(buf, sizeof(buf), "kill -%s -%d\n", fmtsignal(-sig), jp->pid);
204
cswrite(jp->shell->fd, buf, n);
205
message((-2, "killpg -%s %s.%d", fmtsignal(-sig), jp->shell->name, jp->pid));
206
}
207
if (sig == SIGKILL) jobdone(jp);
208
}
209
}
210
211
/*
212
* job jp is done
213
*/
214
215
void
216
jobdone(register Cojob_t* jp)
217
{
218
register int n;
219
char buf[64];
220
221
jp->pid = 0;
222
jp->ref = 0;
223
if (jp->cmd)
224
{
225
free(jp->cmd);
226
jp->cmd = 0;
227
state.jobwait--;
228
}
229
if (jp->shell == &state.wait) state.joblimit--;
230
else if (jp->shell->running)
231
{
232
jp->shell->running--;
233
if (!--state.running)
234
{
235
state.real += cs.time - state.clock;
236
cswakeup(state.wakeup = 0L);
237
}
238
}
239
if (jp->fd > 0)
240
{
241
jp->shell->user += jp->user;
242
jp->shell->sys += jp->sys;
243
state.con[jp->fd].info.user.running--;
244
state.user += jp->user;
245
state.sys += jp->sys;
246
if (state.disable && (jp->sig == SIGKILL || jp->status > 128 && (jp->status % 128) == SIGKILL))
247
{
248
jp->shell->mode |= SHELL_DISABLE;
249
jp->shell->update = cs.time + state.disable;
250
}
251
#ifdef SIGCONT
252
if (jp->sig && jp->sig != SIGCONT)
253
#else
254
if (jp->sig)
255
#endif
256
jp->status = jp->sig + 128;
257
n = sfsprintf(buf, sizeof(buf), "x %d %d %s %s\n", jp->rid, jp->status, fmtelapsed(jp->user, CO_QUANT), fmtelapsed(jp->sys, CO_QUANT));
258
if (cswrite(state.con[jp->fd].info.user.fds[0], buf, n) != n)
259
drop(jp->fd);
260
}
261
if (state.joblimit) jobcheck(NiL);
262
}
263
264