Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/cmd/coshell/schedule.c
1808 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 1990-2011 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Glenn Fowler <[email protected]> *
18
* *
19
***********************************************************************/
20
#pragma prototyped
21
/*
22
* Glenn Fowler
23
* AT&T Bell Laboratories
24
*
25
* remote coshell server scheduling support
26
*/
27
28
#include "service.h"
29
30
#define W_CPU 80 /* additional cpu weight % */
31
#define R_IDLE 4 /* idle range factor */
32
#define W_IDLE (X_RANK/15) /* idle weight */
33
#define W_JOB 2 /* job weight */
34
#define T_LOAD 10 /* load truncation */
35
#define W_LOAD (X_RANK/75) /* load weight */
36
#define C_RANK 90 /* compressed after this % */
37
#define H_RANK 85 /* rank hysterisis % */
38
#define X_RANK RANK /* maximum rank w/o toss */
39
#define H_TEMP 50 /* temp hysterisis % */
40
#define W_TEMP 4 /* temp window bit size */
41
#define R_USER (3*60*60) /* user activity range */
42
#define W_USER (X_RANK/600) /* user activity weight */
43
44
#define CPU(n,v) ((v)*100/(10+((n)-1)*W_CPU/10)/10)
45
#define PCT(n,p) (((n)/100)*(p))
46
#define RNK(p,a) (((p)->mode&SHELL_DISABLE)?INT_MAX:((p)->flags&IGN)?(INT_MAX/2):((((p)->stat.load/(p)->scale)+((p)->running*BIAS+CPU((p)->cpu,(a)->bias)))*(p)->bias*(((p)==state.home&&(p)->cpu==1)?400:100)/(p)->rating))
47
48
#define IDLE(m,i) ((m)>=((i)<=(state.maxidle)?(i):(state.maxidle)))
49
50
/*
51
* allocate and copy string v into s
52
*/
53
54
static char*
55
dupstring(char* s, const char* v)
56
{
57
if (v)
58
{
59
if (!(s = newof(s, char, strlen(v) + 1, 0)))
60
error(3, "out of space [dupstring]");
61
strcpy(s, v);
62
}
63
else if (s)
64
{
65
free(s);
66
s = 0;
67
}
68
return(s);
69
}
70
71
/*
72
* search for name in the shell table
73
* op is a combination of {DEF,GET,JOB,NEW,SET}
74
* a points to optional attribute return value
75
* d points to optional default attributes
76
*/
77
78
Coshell_t*
79
search(int op, char* name, register Coattr_t* a, Coattr_t* d)
80
{
81
register Coshell_t* sp;
82
register Coshell_t* ap;
83
Coshell_t* cp;
84
Coshell_t* mp;
85
Coshell_t* xp;
86
int bypass;
87
int matched;
88
int xm;
89
int n;
90
int nopen;
91
int noverride;
92
unsigned long sv;
93
unsigned long v;
94
unsigned long addr;
95
Coattr_t attr;
96
97
static unsigned long dt;
98
static Coshell_t* dp;
99
static unsigned long scan;
100
static time_t tt;
101
102
sp = state.shell;
103
104
/*
105
* extract the attributes
106
*/
107
108
if (!a) a = &attr;
109
attributes(name, a, d);
110
if (!(op & JOB))
111
{
112
if (a->global.set)
113
{
114
if (op & DEF) a->global.set &= ~state.set;
115
if (op & (NEW|SET)) state.set |= a->global.set;
116
if (a->global.set & SETBUSY)
117
state.busy = a->global.busy;
118
if (a->global.set & SETDISABLE)
119
state.disable = a->global.disable;
120
if (a->global.set & SETGRACE)
121
state.grace = a->global.grace;
122
if (a->global.set & SETIDENTIFY)
123
state.identify = dupstring(state.identify, a->global.identify);
124
if (a->global.set & SETMAXIDLE)
125
state.maxidle = a->global.maxidle;
126
if (a->global.set & SETMAXLOAD)
127
state.maxload = a->global.maxload;
128
if (a->global.set & SETMIGRATE)
129
state.migrate = dupstring(state.migrate, a->global.migrate);
130
if (a->global.set & SETPERCPU)
131
state.percpu = a->global.percpu;
132
if (a->global.set & SETPERHOST)
133
state.perhost = a->global.perhost;
134
if (a->global.set & SETPERSERVER)
135
state.perserver = a->global.perserver;
136
if (a->global.set & SETPERUSER)
137
state.peruser = a->global.peruser;
138
if (a->global.set & SETPOOL)
139
state.pool = a->global.pool;
140
if (a->global.set & SETPROFILE)
141
state.profile = dupstring(state.profile, a->global.profile);
142
if (a->global.set & SETREMOTE)
143
{
144
pathrepl(a->global.remote, 0, state.home->type, "%s");
145
state.remote = dupstring(state.remote, a->global.remote);
146
}
147
if (a->global.set & SETSCHEDULE)
148
{
149
name = a->global.schedule;
150
if (state.scheduler.fd > 0)
151
close(state.scheduler.fd);
152
if ((state.scheduler.fd = csopen(name, 0)) < 0 && *name != '/')
153
{
154
sfprintf(state.string, "/dev/tcp/share/%s/trust", name);
155
if (!(name = sfstruse(state.string)))
156
error(3, "out of space");
157
state.scheduler.fd = csopen(name, 0);
158
}
159
if (state.scheduler.fd > 0) state.scheduler.name = dupstring(state.scheduler.name, name);
160
else error(2, "%s: cannot open scheduler", name);
161
}
162
if (a->global.set & SETSHELL)
163
{
164
pathrepl(a->global.shell, 0, state.home->type, "%s");
165
state.sh = dupstring(state.sh, a->global.shell);
166
}
167
if (a->global.set & SETFILE)
168
return(info(op, a->global.file));
169
if (a->global.set & (SETBUSY|SETMAXIDLE|SETPERCPU|SETPERHOST|SETPERSERVER|SETPERUSER))
170
jobcheck(NiL);
171
}
172
if ((op & (DEF|NEW)) && a->set && !(a->set & SETNAME))
173
return(info(op, NiL));
174
}
175
176
/*
177
* check previous entries
178
*/
179
180
if (streq(CS_HOST_LOCAL, a->name) || (op & (DEF|JOB)) == (DEF|JOB))
181
{
182
if (!sp) goto empty;
183
strcpy(a->name, state.home->name);
184
a->set |= SETNAME;
185
}
186
else if (streq("server", a->name))
187
{
188
if (!sp) goto empty;
189
strcpy(a->name, sp->name);
190
a->set |= SETNAME;
191
}
192
if (op & JOB)
193
{
194
if (!(a->set & SETBIAS)) a->bias = BIAS;
195
if (state.scheduler.fd > 0)
196
{
197
return(&state.wait);
198
}
199
cp = mp = sp = xp = 0;
200
xm = 0;
201
sv = ~0;
202
nopen = noverride = 0;
203
ap = state.shellnext;
204
scan++;
205
if (cs.time - (unsigned long)tt > LOST)
206
{
207
tt = cs.time;
208
state.tm = tmmake(&tt);
209
if (!state.tm->tm_wday)
210
state.tm->tm_wday = 7;
211
}
212
213
/*
214
* shell scheduling
215
*
216
* cp head of close list
217
* dp worst open from last time [close after dt]
218
* mp best closed match
219
* sp best open match [value is sv]
220
* xp worst open
221
*
222
* non-busy shells failing idle criteria are marked for close
223
*/
224
225
do
226
{
227
if ((matched = match(ap, a, op)) && ap->access && !ap->home && !miscmatch(ap, ap->access))
228
{
229
matched = 0;
230
ap->mode |= SHELL_DENIED;
231
}
232
else ap->mode &= ~SHELL_DENIED;
233
message((-6, "search: %s name=%s misc=%s matched=%d", ap->name, (a->set & SETNAME) ? a->name : "*", ((a->set | op) & (SETMISC|DEF|NEW|SET)) == SETMISC ? a->misc : "*", matched));
234
if (!(scan & ((1<<W_TEMP) - 1))) ap->temp >>= W_TEMP;
235
if (ap->fd)
236
{
237
if (matched) nopen += ap->cpu;
238
if (ap->fd > 0)
239
{
240
if (!ap->home && (!xp || ap->temp < PCT(xp->temp, H_TEMP) || (ap->mode & SHELL_DENIED) || matched && xm && PCT(ap->temp, H_TEMP) < xp->temp && ap->rank > xp->rank))
241
{
242
xp = ap;
243
xm = matched;
244
}
245
}
246
else if (cs.time > ap->start + LOST)
247
{
248
shellclose(ap, -1);
249
ap->stat.up = -LOST;
250
ap->update = cs.time + 2 * LOST;
251
continue;
252
}
253
if (ap->update <= cs.time && ap->errors < ERRORS) update(ap);
254
if (ap != state.shell && (ap->override || !IDLE(ap->stat.idle, ap->idle) && (!ap->bypass || !(bypass = miscmatch(ap, ap->bypass)))))
255
{
256
if (matched) noverride++;
257
if (cs.time > ap->override)
258
{
259
if (!ap->running && !ap->home)
260
{
261
ap->mode |= SHELL_CLOSE;
262
if (!cp) cp = ap;
263
}
264
else if (!ap->override)
265
{
266
ap->override = cs.time - 1;
267
state.override++;
268
}
269
}
270
if (ap->home) ap->override = cs.time + HOME;
271
if (sp) continue;
272
}
273
}
274
if (matched)
275
{
276
if (ap->update <= cs.time && ap->errors < ERRORS) update(ap);
277
ap->temp += (((unsigned long)1)<<(CHAR_BIT * sizeof(ap->temp) - W_TEMP));
278
if (ap->fd > 0)
279
{
280
v = RNK(ap, a);
281
if (v < sv && ap->running < (state.perhost ? state.perhost : ap->cpu * state.percpu) && (!state.maxload || (ap->stat.load / ap->scale) < state.maxload) && (!sp || bypass || IDLE(ap->stat.idle, ap->idle) || !IDLE(sp->stat.idle, sp->idle)))
282
{
283
sv = v;
284
sp = ap;
285
}
286
}
287
else if (!ap->fd && (!mp || ap->rank < mp->rank) && (IDLE(ap->stat.idle, ap->idle) || ap->home || ap->bypass && miscmatch(ap, ap->bypass) || ((a->set | op) & (SETMISC|DEF|NEW|SET)) == SETMISC)) mp = ap;
288
}
289
} while ((ap = ap->next) != state.shellnext);
290
if (mp && (!sp || nopen < state.pool + noverride && RNK(mp, a) < PCT(sv, H_RANK))) sp = mp;
291
if (!sp && (op & DEF)) sp = state.home;
292
message((-4, "open=%d override=%d sp=%s mp=%s xp=%s dp=%s dt=%s", nopen, noverride, sp ? sp->name : "*", mp ? mp->name : "*", xp ? xp->name : "*", dp ? dp->name : "*", dp && dt > cs.time ? fmtelapsed(dt - cs.time, 1) : "*"));
293
if (xp && xp != sp)
294
{
295
if (!xp->running && (nopen - xp->cpu >= state.pool + noverride || state.open - xp->cpu >= state.fdtotal / 2 || xp == dp && cs.time > dt && (!xm || !mp || mp->rank < PCT(xp->rank, H_RANK))))
296
{
297
dp = 0;
298
xp->mode &= ~SHELL_CLOSE;
299
shellclose(xp, -1);
300
}
301
else if (xp != dp)
302
{
303
dp = xp;
304
dt = cs.time + 2 * UPDATE;
305
}
306
}
307
if (cp) do if (cp->mode & SHELL_CLOSE)
308
{
309
cp->mode &= ~SHELL_CLOSE;
310
if (cp != sp) shellclose(cp, -1);
311
} while ((cp = cp->next) != state.shellnext);
312
if (sp)
313
{
314
if (dp == sp) dp = 0;
315
if (sp->override || !IDLE(sp->stat.idle, sp->idle)) sp->mode |= SHELL_OVERRIDE;
316
else sp->mode &= ~SHELL_OVERRIDE;
317
state.shellnext = sp->next;
318
goto found;
319
}
320
if (nopen) return(&state.wait);
321
}
322
else if (sp) do
323
{
324
if (match(sp, a, op)) goto found;
325
} while ((sp = sp->next) != state.shell);
326
327
/*
328
* a->name may be an alias
329
*/
330
331
if (!(a->set & SETNAME) || !(addr = csaddr(a->name))) return(0);
332
if (sp = state.shell) do
333
{
334
if (sp->addr == addr) goto found;
335
} while ((sp = sp->next) != state.shell);
336
empty:
337
if (!(op & NEW) && (op & (DEF|GET))) return(0);
338
if (state.check.host && !strmatch(a->name, state.check.host) && !strmatch(csntoa(addr), state.check.host)) return(0);
339
340
/*
341
* add a new entry
342
*/
343
344
if (++state.shelln > state.shellc)
345
{
346
if (state.shellv) free(state.shellv);
347
state.shellc = roundof(state.shelln + 1, 32);
348
if (!(state.shellv = newof(0, Coshell_t*, state.shellc, 0)))
349
error(3, "out of space [shellv]");
350
}
351
if (!(sp = newof(0, Coshell_t, 1, 0)))
352
error(3, "out of space [%s]", a->name);
353
strcpy(sp->name, a->name);
354
if (state.shell)
355
{
356
n = 0;
357
ap = state.shell;
358
for (;;)
359
{
360
n++;
361
sp->rating += ap->rating;
362
if (ap->next == state.shell) break;
363
ap = ap->next;
364
}
365
sp->rating /= n;
366
ap->next = sp;
367
sp->next = state.shell;
368
}
369
else
370
{
371
sp->next = sp;
372
sp->rating = RATING;
373
sp->home++;
374
}
375
sp->flags = op;
376
sp->type[0] = '*';
377
sp->bias = BIAS;
378
sp->scale = sp->cpu = 1;
379
sp->addr = addr;
380
found:
381
switch (op & (DEF|JOB|NEW|SET))
382
{
383
case DEF:
384
a->set &= ~sp->flags;
385
break;
386
case DEF|JOB:
387
case JOB:
388
if (sp != state.shell && (sp->override || !IDLE(sp->stat.idle, sp->idle)))
389
{
390
if (!sp->override) state.override++;
391
sp->override = cs.time + (sp->home ? HOME : OVERRIDE);
392
}
393
/*FALLTHROUGH*/
394
default:
395
return(sp);
396
case NEW:
397
if (!(a->set & SETIDLE))
398
{
399
a->set |= SETIDLE;
400
a->idle = 0;
401
sp->idle_override = sp->idle;
402
}
403
sp->flags &= ~DEF;
404
/*FALLTHROUGH*/
405
case SET:
406
sp->flags |= a->set;
407
if (a->global.set & (SETIDLE|SETLOAD|SETUPDATE|SETUSERS))
408
{
409
if (a->global.set & SETIDLE) sp->stat.idle = a->stat.idle;
410
if (a->global.set & SETLOAD) sp->stat.load = a->stat.load;
411
if (a->global.set & SETUPDATE) sp->update = a->stat.up;
412
if (a->global.set & SETUSERS) sp->stat.users = a->stat.users;
413
}
414
break;
415
case DEF|NEW:
416
a->set &= ~sp->flags;
417
sp->flags |= a->set;
418
break;
419
}
420
if (a->set & SETACCESS) sp->access = dupstring(sp->access, a->access);
421
if (a->set & SETBYPASS) sp->bypass = dupstring(sp->bypass, a->bypass);
422
if (a->set & SETBIAS) sp->bias = a->bias;
423
if (a->set & SETIGNORE)
424
{
425
if (a->ignore) sp->flags |= IGN;
426
else sp->flags &= ~IGN;
427
}
428
if (a->set & SETMISC)
429
{
430
sp->flags &= ~SETMISC;
431
miscadd(sp, a->misc);
432
}
433
if (a->set & SETRATING) sp->rating = a->rating;
434
if (a->set & SETREMOTE) strcpy(sp->remote, a->remote);
435
if (a->set & SETSCALE) sp->scale = a->scale;
436
if (a->set & SETSHELL) strcpy(sp->shell, a->shell);
437
if (a->set & SETTYPE) strcpy(sp->type, a->type);
438
if (a->set & SETCPU)
439
{
440
if (sp->fd) state.open += a->cpu - sp->cpu;
441
sp->cpu = a->cpu;
442
if (!(sp->flags & SETSCALE)) sp->scale = sp->cpu;
443
}
444
if (a->set & SETIDLE)
445
{
446
if (!(sp->idle = a->idle))
447
{
448
if (sp->override)
449
{
450
sp->override = 0;
451
state.override--;
452
sp->update = 0;
453
}
454
else if (sp->update > cs.time + UPDATE) sp->update = 0;
455
}
456
if (sp->running) jobcheck(sp);
457
}
458
if (!sp->update && !(op & GET) || (a->set & (SETBIAS|SETCPU|SETIDLE|SETIGNORE|SETRATING|SETSCALE)) && sp->update <= cs.time + UPDATE) update(sp);
459
return(sp);
460
}
461
462
/*
463
* update shell status
464
*/
465
466
void
467
update(register Coshell_t* sp)
468
{
469
register long n;
470
471
sp->mode &= ~SHELL_DISABLE;
472
if (csstat(sp->name, &sp->stat))
473
{
474
sp->stat.up = 0;
475
sp->stat.idle = -1;
476
sp->stat.load = LOAD;
477
sp->stat.users = 0;
478
sp->errors++;
479
}
480
else
481
{
482
if (sp->stat.up < 0)
483
{
484
n = X_RANK;
485
if (sp == state.shell)
486
{
487
sp->stat.up = 0;
488
n -= W_USER;
489
}
490
else if (sp->fd) shellclose(sp, -1);
491
}
492
else
493
{
494
n = 0;
495
if (!sp->scale)
496
sp->scale = 1;
497
if (!sp->rating)
498
sp->rating = 1;
499
n += W_LOAD * (((sp->stat.load / sp->scale) + CPU(sp->cpu, W_JOB * LOAD)) / T_LOAD) * sp->bias / sp->rating;
500
if (state.maxidle)
501
{
502
if (sp->idle)
503
{
504
if (sp->stat.idle < sp->idle) n += W_IDLE;
505
else if (sp->stat.idle < R_IDLE * sp->idle) n += W_IDLE * (R_IDLE * sp->idle - sp->stat.idle) / ((R_IDLE - 1) * sp->idle);
506
}
507
else if (sp->stat.users && sp->stat.idle < R_USER)
508
n += W_USER * (R_USER - sp->stat.idle) / R_USER;
509
}
510
if (n > PCT(X_RANK, C_RANK))
511
{
512
if (n >= 2 * X_RANK) n = X_RANK - 1;
513
else n = PCT(X_RANK, C_RANK) + (n - PCT(X_RANK, C_RANK)) / (2 * X_RANK - PCT(X_RANK, C_RANK) - 1) * (X_RANK - PCT(X_RANK, C_RANK) - 1);
514
}
515
}
516
sp->rank = n * 100 + (TOSS>>7) % 100;
517
sp->errors = 0;
518
if (sp->stat.up < 0) sp->update = cs.time + CS_STAT_DOWN;
519
else
520
{
521
sp->update = cs.time + UPDATE;
522
if (sp->fd > 0 && (2 * sp->stat.up) < (cs.time - sp->start)) shellclose(sp, -1);
523
if (!sp->fd && !IDLE(sp->stat.idle, sp->idle)) sp->update += sp->idle - sp->stat.idle;
524
}
525
message((-4, "%s: %s=%s idle=%s load=%s users=%d rank=%s", sp->name, sp->stat.up < 0 ? "down" : "up", fmtelapsed(sp->stat.up < 0 ? -sp->stat.up : sp->stat.up, 1), fmtelapsed(sp->stat.idle, 1), fmtfloat(sp->stat.load), sp->stat.users, fmtfloat(sp->rank)));
526
}
527
}
528
529
/*
530
* gather host info from file
531
*/
532
533
Coshell_t*
534
info(int op, char* file)
535
{
536
register Sfio_t* fp;
537
register char* s;
538
struct stat st;
539
540
static char* apath;
541
static time_t atime;
542
543
if (!file && op == SET)
544
{
545
if (!(file = apath))
546
{
547
if (!(file = sfstrrsrv(state.string, PATH_MAX)) || !pathaccess(csvar(CS_VAR_TRUST, 1), csvar(CS_VAR_SHARE, 0), CS_SVC_ACCESS, PATH_REGULAR, file, PATH_MAX) || !(file = strdup(file)))
548
{
549
state.access = cs.time + ACCESS_SEARCH;
550
return(0);
551
}
552
apath = file;
553
}
554
if (!(fp = sfopen(NiL, file, "r")))
555
{
556
free(apath);
557
apath = 0;
558
state.access = cs.time + ACCESS_SEARCH;
559
return(0);
560
}
561
state.access = cs.time + ACCESS_UPDATE;
562
if (fstat(sffileno(fp), &st) || atime == st.st_mtime)
563
{
564
sfclose(fp);
565
return(0);
566
}
567
message((-2, "%sscanning access info file %s", atime ? "re" : "", file));
568
atime = st.st_mtime;
569
}
570
else if (!(fp = csinfo(file, NiL)))
571
{
572
error(ERROR_SYSTEM|2, "%s: not found", file ? file : "<local host info>");
573
return(0);
574
}
575
if ((op & NEW) && (sfset(fp, 0, 0) & SF_STRING))
576
op &= ~DEF;
577
while (s = sfgetr(fp, '\n', 1))
578
search(op, s, NiL, NiL);
579
sfclose(fp);
580
return(state.shell);
581
}
582
583
/*
584
* compare shells by name
585
*/
586
587
int
588
byname(const char* a, const char* b)
589
{
590
return(strcoll(((Coshell_t*)a)->name, ((Coshell_t*)b)->name));
591
}
592
593
/*
594
* compare shells by rank from best to worst
595
*/
596
597
int
598
byrank(const char* a, const char* b)
599
{
600
if (((Coshell_t*)a)->rank < ((Coshell_t*)b)->rank) return(-1);
601
if (((Coshell_t*)a)->rank > ((Coshell_t*)b)->rank) return(1);
602
return(0);
603
}
604
605
/*
606
* compare shells by temperature from hottest to coolest
607
*/
608
609
int
610
bytemp(const char* a, const char* b)
611
{
612
if (((Coshell_t*)a)->temp > ((Coshell_t*)b)->temp) return(-1);
613
if (((Coshell_t*)a)->temp < ((Coshell_t*)b)->temp) return(1);
614
return(0);
615
}
616
617