Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/cmd/coshell/event.c
1808 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 1990-2011 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Glenn Fowler <[email protected]> *
18
* *
19
***********************************************************************/
20
#pragma prototyped
21
22
/*
23
* Glenn Fowler
24
* AT&T Research
25
*
26
* shared event daemon
27
*/
28
29
#define EVENT_MAJOR 1
30
#define EVENT_MINOR 0
31
32
static const char usage[] =
33
"[-?\n@(#)$Id: event (AT&T Research) 2007-06-05 $\n]"
34
USAGE_LICENSE
35
"[+NAME?event - shared event client and server]"
36
"[+DESCRIPTION?\bevent\b is a shared event client and server. Events are "
37
"stored in a persistent database named by the \aname\a operand. Each "
38
"event has a name, an expiration, and a binary status \braised\b or "
39
"\bnot-raised\b. A non-existent event is \bnot-raised\b. Events may be "
40
"raised, deleted, cleared, tested and waited for. If no \brequest\b "
41
"operands are specified then requests are prompted for, with an "
42
"\bEVENT>\b prompt, and read from the standard input. Multiple command "
43
"line requests must be separated by \b:\b. In the following events "
44
"operands are matched by \bksh\b(1) patterns. The client requests are:]"
45
"{"
46
"[+all \aconnection\a?Raise all pending events for the "
47
"\aconnection\a. Mainly for debugging.]"
48
"[+clear \aevent\a ...?Mark \aevent\a not-raised but do not "
49
"delete from the database. This allows the events to be matched "
50
"by patterns.]"
51
"[+delete \aevent\a ...?Delete \aevent\a.]"
52
"[+exit?Close the client connection.]"
53
"[+hold [ \aevent\a ...]]?If \aevent\a operands are specified "
54
"then clients are not notified about the those events until they "
55
"are explicitly released by \brelease\b \aevent\a ... If no "
56
"events are specified then all current and future events will be "
57
"unconditionally held until a \brelease\b with no event "
58
"operands.]"
59
"[+info?List the server status pending events by client "
60
"connection. The list is terminated by a \bdone\b message.]"
61
"[+list [ \apattern\a ]]?Start an event dictionary scan and list "
62
"the first event. If \apattern\a is specified then only events "
63
"matching \apattern\a are listed.]"
64
"[+next?List the next event in the \blist\b event scan. The list "
65
"is terminated by a \bdone\b message.]"
66
"[+quit?Equivalent to exit.]"
67
"[+raise \aevent\a ...?Raise \aevent\a ...]"
68
"[+release [ \aevent\a ...]]?If \aevent\a operands are specified "
69
"then they are released from a previous \bhold\b \aevent\a ... "
70
"If no \aevent\a operands are specified then any previous "
71
"unconditional \bhold\b is turned off.]"
72
"[+set \aoption\a ...?]"
73
"[+stop?Terminate the server. Persistent data is preserved.]"
74
"[+test \aevent\a?Determine the \aevent\a status.]"
75
"[+wait \aevent\a?Wait for \aevent\a status to be \braised\b.]"
76
"}"
77
"[+?The \b--cs\b, \b--expire\b, \b--initialize\b, and \b--log\b options "
78
"apply to the initial service command, and the \b--expire\b, \b--log\b, "
79
"\b--newer\b, \b--older\b, and \b--quiet\b options apply to client "
80
"requests.]"
81
"[c:cs|connect-stream?Use \aconnect-stream\a instead of the default.]"
82
":[connect-stream:=/dev/tcp/local/event]"
83
"[e:expire?Set the current event expiration to the \bdate\b(1) or "
84
"\bcron\b(1) expression \adate-expression\a.]:[date-expression]"
85
"[i:initialize?Initialize the service if it is not already running.]"
86
"[l!:log?Log server activity to \astate-name\a.log, where \astate-name\a "
87
"is the state path name sans suffix.]"
88
"[n:newer?Match events newer than \adate\a. If \b--older\b is also "
89
"specified then only event times > newer and < older match.]:[date]"
90
"[o:older?Match events older than \adate\a. If \b--newer\b is also "
91
"specified then only event times > newer and < older match.]:[date]"
92
"[q:quiet?Suppress request confirmation messages.]"
93
"\n"
94
"\nname [ request [ options ] [ arg ... ] ] [ : request ... ]\n"
95
"\n"
96
"[+CAVEATS?Expirations, logging and the \bset\b request are not "
97
"implemented yet.]"
98
"[+SEE ALSO?\bcoshell\b(1), \bcs\b(1), \bnmake\b(1), \bdbm\b(3), "
99
"\bndbm\b(3), \bgdbm\b(3)]"
100
;
101
102
static const char command[] = "event";
103
104
static const char ident_key[] = "'//\t<(IDENT)>\t\\\\'";
105
static const char ident_name[] = "EVEN";
106
107
#define IDENT_SWAP 0x01020304
108
#define IDENT_VERSION ((EVENT_MAJOR<<16)|(EVENT_MINOR))
109
110
#define EVENT(s) (*((char*)(s))!=ident_key[0])
111
#define log _log /* gnu builtin? you've got to be kidding */
112
113
#include <ast.h>
114
#include <cdt.h>
115
#include <css.h>
116
#include <ctype.h>
117
#include <debug.h>
118
#include <error.h>
119
#include <namval.h>
120
#include <ls.h>
121
#include <regex.h>
122
#include <stdarg.h>
123
#include <swap.h>
124
#include <tok.h>
125
#include <tm.h>
126
#include <ast_ndbm.h>
127
128
#if !_use_ndbm
129
130
int
131
main(int argc, char** argv)
132
{
133
NoP(argc);
134
NoP(argv);
135
error(3, "<ndbm.h> library required");
136
return 1;
137
}
138
139
#else
140
141
#define KEY_MAX 64 /* max key length + 1 */
142
143
struct Key_s; typedef struct Key_s Key_t;
144
145
#define DATA_clear 0x00000001 /* explicit clear */
146
#define DATA_hold 0x00000002 /* explicit hold */
147
148
typedef uint32_t Number_t;
149
150
typedef struct Data_s /* event data */
151
{
152
Number_t expire; /* expiration seconds since epoch */
153
Number_t time; /* last raise time */
154
Number_t raise; /* total # raise requests */
155
Number_t flags; /* DATA_* flags */
156
} Data_t;
157
158
typedef struct Event_s /* event bucket */
159
{
160
Dtlink_t link; /* dictionary link */
161
unsigned int waiting; /* # clients waiting */
162
Data_t data; /* event persistent data */
163
char name[256]; /* event name */
164
} Event_t;
165
166
typedef struct Waiting_s /* pending event bucket */
167
{
168
Dtlink_t link; /* dictionary link */
169
int id; /* wait id */
170
Event_t* event; /* event bucket pointer */
171
} Waiting_t;
172
173
typedef struct Connection_s /* client connection state */
174
{
175
Dtlink_t link; /* list link */
176
Csid_t id; /* connection id */
177
Dt_t* waiting; /* pending events */
178
datum list; /* list finger */
179
int fd; /* connection fd */
180
int all; /* list all vs. list match */
181
int code; /* request exit code */
182
int quiet; /* suppress response log messages */
183
unsigned long newer; /* list --newer time */
184
unsigned long older; /* list --older time */
185
regex_t re; /* list request pattern */
186
} Connection_t;
187
188
typedef struct Request_s /* static request info */
189
{
190
const char* name; /* request name */
191
int index; /* REQ_* index */
192
int min; /* min #args */
193
int max; /* max #args */
194
} Request_t;
195
196
typedef struct State_s /* program state */
197
{
198
Cssdisc_t disc; /* css discipline */
199
Dtdisc_t condisc; /* connection dictionary discipline */
200
Dtdisc_t eventdisc; /* event dictionary discipline */
201
Dtdisc_t waitdisc; /* pending events dictionary discipline */
202
unsigned int active; /* number of active clients */
203
int hold; /* hold announcements */
204
int log; /* log activity */
205
int major; /* db major version */
206
int minor; /* db major version */
207
int swap; /* datum <=> native int32_ swap */
208
unsigned long expire; /* expiration in seconds */
209
DBM* dbm; /* dbm handle */
210
Dt_t* connections; /* active connection list */
211
Dt_t* events; /* outstanding events dictionary */
212
char* service; /* service connect stream path */
213
char* path; /* event db path */
214
Sfio_t* logf; /* log buffer stream */
215
Sfio_t* usrf; /* usr buffer stream */
216
Sfio_t* tmp; /* tmp buffer stream */
217
char* cmd[1024]; /* request command argv */
218
char req[8 * 1024]; /* request buffer */
219
} State_t;
220
221
#define REQ_all 1
222
#define REQ_clear 2
223
#define REQ_delete 3
224
#define REQ_exit 4
225
#define REQ_hold 5
226
#define REQ_info 6
227
#define REQ_list 7
228
#define REQ_next 8
229
#define REQ_raise 9
230
#define REQ_release 10
231
#define REQ_set 11
232
#define REQ_stop 12
233
#define REQ_test 13
234
#define REQ_wait 14
235
236
static const Request_t requests[] =
237
{
238
"a*ll", REQ_all, 1, 1,
239
"c*lear", REQ_clear, 1, -1,
240
"d*elete", REQ_delete, 1, -1,
241
"e*xit", REQ_exit, 0, 0,
242
"hold", REQ_hold, 0, -1,
243
"i*nfo", REQ_info, 0, 0,
244
"l*ist", REQ_list, 0, 1,
245
"n*ext", REQ_next, 0, 0,
246
"q*uit", REQ_exit, 0, 0,
247
"r*aise", REQ_raise, 1, -1,
248
"release", REQ_release, 0, -1,
249
"s*et", REQ_set, 0, 0,
250
"stop", REQ_stop, 0, 0,
251
"t*est", REQ_test, 1, 1,
252
"w*ait", REQ_wait, 1, 1,
253
};
254
255
/*
256
* generate a user response and log message
257
*/
258
259
static void
260
log(State_t* state, Connection_t* con, int type, const char* format, ...)
261
{
262
va_list ap;
263
char* s;
264
265
va_start(ap, format);
266
if (format)
267
sfvprintf(state->tmp, format, ap);
268
va_end(ap);
269
if (type)
270
{
271
if (!(s = sfstruse(state->tmp)))
272
error(ERROR_SYSTEM|3, "out of space");
273
if (type != 'I' && state->log && state->logf)
274
sfprintf(state->logf, "%s (%03d) %c %s\n", fmttime("%K", time(NiL)), con ? con->fd : 0, type, s);
275
if (con && type != 'R' && type != 'S')
276
{
277
if (type != 'L' || !con->quiet)
278
sfprintf(state->usrf, "%c %s\n", type, s);
279
if (type == 'W')
280
con->code |= 1;
281
else if (type == 'E')
282
con->code |= 2;
283
}
284
}
285
}
286
287
/*
288
* accept a new connection
289
*/
290
291
static int
292
acceptf(Css_t* css, Cssfd_t* fp, Csid_t* ip, char** av, Cssdisc_t* disc)
293
{
294
register State_t* state = (State_t*)disc;
295
register Connection_t* con;
296
297
if (!(con = newof(0, Connection_t, 1, 0)))
298
return -1;
299
fp->data = con;
300
con->id = *ip;
301
con->waiting = 0;
302
con->fd = fp->fd;
303
state->active++;
304
dtinsert(state->connections, con);
305
log(state, con, 'S', "accept connection -- %d active", state->active);
306
return fp->fd;
307
}
308
309
/*
310
* notify connections waiting on ep
311
*/
312
313
static int
314
notify(State_t* state, Event_t* ep)
315
{
316
Connection_t* cp;
317
Waiting_t* wp;
318
char* s;
319
size_t n;
320
321
for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
322
if (cp->waiting && (wp = (Waiting_t*)dtmatch(cp->waiting, &ep)))
323
{
324
if (wp->id >= 0)
325
{
326
log(state, cp, 'x', "%d 0", wp->id);
327
n = sfstrtell(state->usrf);
328
if (!(s = sfstruse(state->usrf)))
329
error(ERROR_SYSTEM|3, "out of space");
330
write(cp->fd, s, n);
331
}
332
else if (!cp->quiet)
333
log(state, cp, 'L', "%s raised", ep->name);
334
n = ep->waiting == 1;
335
dtdelete(cp->waiting, wp);
336
if (n)
337
break;
338
}
339
return 0;
340
}
341
342
/*
343
* post pending event name for connection
344
*/
345
346
static int
347
post(State_t* state, Connection_t* con, const char* name, int id)
348
{
349
Event_t* ep;
350
Waiting_t* wp;
351
352
if (!con->waiting && !(con->waiting = dtopen(&state->waitdisc, Dtset)))
353
{
354
error(ERROR_SYSTEM|3, "out of space [waiting]");
355
return -1;
356
}
357
if (ep = dtmatch(state->events, name))
358
{
359
if (dtmatch(con->waiting, &ep))
360
return 0;
361
}
362
else if (!(ep = newof(0, Event_t, 1, 0)))
363
{
364
error(ERROR_SYSTEM|3, "out of space [event]");
365
return -1;
366
}
367
else
368
{
369
strcpy(ep->name, name);
370
dtinsert(state->events, ep);
371
}
372
if (!(wp = newof(0, Waiting_t, 1, 0)))
373
{
374
error(ERROR_SYSTEM|3, "out of space [waiting]");
375
return -1;
376
}
377
ep->waiting++;
378
wp->id = id;
379
wp->event = ep;
380
dtinsert(con->waiting, wp);
381
return 0;
382
}
383
384
/*
385
* list server info/state
386
*/
387
388
static int
389
info(State_t* state, Connection_t* con, Css_t* css)
390
{
391
Connection_t* cp;
392
Waiting_t* wp;
393
int n;
394
395
log(state, con, 'I', "info server='%s' version=%d.%d host=%s pid=%d uid=%d gid=%d", fmtident(usage), EVENT_MAJOR, EVENT_MINOR, csname(css->state, 0), getpid(), geteuid(), getegid());
396
log(state, con, 'I', "info active=%d", state->active);
397
for (cp = (Connection_t*)dtfirst(state->connections); cp; cp = (Connection_t*)dtnext(state->connections, cp))
398
if (cp->waiting && (n = dtsize(cp->waiting)) > 0)
399
{
400
log(state, con, 0, "waiting connection=%d count=%d", cp->fd, n);
401
for (wp = (Waiting_t*)dtfirst(cp->waiting); wp; wp = (Waiting_t*)dtnext(cp->waiting, wp))
402
log(state, con, 0, " %s", wp->event->name);
403
log(state, con, 'I', 0);
404
}
405
log(state, con, 'I', "done");
406
return 0;
407
}
408
409
static int request(State_t*, Connection_t*, int, int, char**, unsigned long, unsigned long);
410
411
/*
412
* apply request r to one key
413
*/
414
415
static int
416
apply(State_t* state, Connection_t* con, int id, int index, datum key, datum val, Data_t* dat)
417
{
418
Event_t* e;
419
int n;
420
421
switch (index)
422
{
423
case REQ_clear:
424
dat->flags |= DATA_clear;
425
dat->time = time(NiL);
426
val.dptr = (void*)dat;
427
val.dsize = sizeof(*dat);
428
if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
429
log(state, con, 'L', "%s cleared", key.dptr);
430
else if (!dbm_error(state->dbm))
431
log(state, con, 'W', "%s unchanged", key.dptr);
432
else
433
{
434
dbm_clearerr(state->dbm);
435
log(state, con, 'E', "%s io error", key.dptr);
436
}
437
break;
438
case REQ_delete:
439
if (!dbm_delete(state->dbm, key))
440
{
441
log(state, con, 'L', "%s deleted", key.dptr);
442
return 1;
443
}
444
else if (!dbm_error(state->dbm))
445
log(state, con, 'W', "%s not in db", key.dptr);
446
else
447
{
448
dbm_clearerr(state->dbm);
449
log(state, con, 'E', "%s io error", key.dptr);
450
}
451
break;
452
case REQ_hold:
453
dat->flags |= DATA_hold;
454
dat->time = time(NiL);
455
val.dptr = (void*)dat;
456
val.dsize = sizeof(*dat);
457
if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
458
log(state, con, 'L', "%s held", key.dptr);
459
else if (!dbm_error(state->dbm))
460
log(state, con, 'W', "%s unchanged", key.dptr);
461
else
462
{
463
dbm_clearerr(state->dbm);
464
log(state, con, 'E', "%s io error", key.dptr);
465
}
466
break;
467
case REQ_raise:
468
dat->flags &= ~DATA_clear;
469
dat->time = time(NiL);
470
dat->raise++;
471
val.dptr = (void*)dat;
472
val.dsize = sizeof(*dat);
473
if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
474
{
475
if (!state->hold && (e = (Event_t*)dtmatch(state->events, key.dptr)))
476
notify(state, e);
477
log(state, con, 'L', "%s raised", key.dptr);
478
}
479
else if (!dbm_error(state->dbm))
480
log(state, con, 'W', "%s unchanged", key.dptr);
481
else
482
{
483
dbm_clearerr(state->dbm);
484
log(state, con, 'E', "%s io error", key.dptr);
485
}
486
break;
487
case REQ_release:
488
if (dat->flags & DATA_hold)
489
{
490
dat->flags &= ~DATA_hold;
491
if (dat->raise)
492
{
493
val.dptr = (void*)dat;
494
val.dsize = sizeof(*dat);
495
if (!(n = dbm_store(state->dbm, key, val, DBM_INSERT)) || n > 0 && !dbm_store(state->dbm, key, val, DBM_REPLACE))
496
log(state, con, 'L', "%s released", key.dptr);
497
else if (!dbm_error(state->dbm))
498
log(state, con, 'W', "%s unchanged", key.dptr);
499
else
500
{
501
dbm_clearerr(state->dbm);
502
log(state, con, 'E', "%s io error", key.dptr);
503
}
504
if (e = (Event_t*)dtmatch(state->events, key.dptr))
505
notify(state, e);
506
}
507
else if (!dbm_delete(state->dbm, key))
508
log(state, con, 'L', "%s deleted", key.dptr);
509
else if (!dbm_error(state->dbm))
510
log(state, con, 'W', "%s not in db", key.dptr);
511
else
512
{
513
dbm_clearerr(state->dbm);
514
log(state, con, 'E', "%s io error", key.dptr);
515
}
516
}
517
break;
518
case REQ_test:
519
if (val.dptr && !(dat->flags & DATA_clear))
520
{
521
if (state->hold)
522
log(state, con, 'W', "%s global hold", key.dptr);
523
else if (dat->flags & DATA_hold)
524
log(state, con, 'W', "%s explicit hold", key.dptr);
525
else
526
log(state, con, 'I', "%s raised", key.dptr);
527
}
528
else
529
log(state, con, 'I', "%s not-raised", key.dptr);
530
break;
531
case REQ_wait:
532
if (val.dptr && !state->hold && !(dat->flags & (DATA_clear|DATA_hold)))
533
log(state, con, 'I', "%s raised", key.dptr);
534
else if (post(state, con, key.dptr, id))
535
return -1;
536
break;
537
}
538
return 0;
539
}
540
541
/*
542
* apply request r to args a
543
*/
544
545
static int
546
request(State_t* state, Connection_t* con, int id, int index, char** a, unsigned long older, unsigned long newer)
547
{
548
char* s;
549
int i;
550
Event_t* e;
551
datum key;
552
datum val;
553
Data_t dat;
554
regex_t re;
555
char buf[64];
556
557
while (s = *a++)
558
if (i = regcomp(&re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
559
{
560
regerror(i, &re, buf, sizeof(buf));
561
log(state, con, 'E', "%s: %s", s, buf);
562
}
563
else if (regstat(&re)->re_info & REG_LITERAL)
564
{
565
if (!EVENT(s))
566
{
567
log(state, con, 'E', "%s invalid event name", s);
568
return -1;
569
}
570
key.dptr = (void*)s;
571
key.dsize = strlen(s) + 1;
572
if (key.dsize >= sizeof(e->name))
573
s[(key.dsize = sizeof(e->name)) - 1] = 0;
574
val = dbm_fetch(state->dbm, key);
575
if (val.dptr)
576
{
577
if (val.dsize > sizeof(dat))
578
val.dsize = sizeof(dat);
579
swapmem(state->swap, val.dptr, &dat, sizeof(dat));
580
}
581
else
582
memset(&dat, 0, sizeof(dat));
583
if (apply(state, con, id, index, key, val, &dat))
584
return -1;
585
}
586
else
587
{
588
rescan:
589
for (key = dbm_firstkey(state->dbm); key.dptr; key = dbm_nextkey(state->dbm))
590
if (EVENT(key.dptr) && !regexec(&re, key.dptr, 0, NiL, 0))
591
{
592
val = dbm_fetch(state->dbm, key);
593
if (val.dsize > sizeof(dat))
594
val.dsize = sizeof(dat);
595
swapmem(state->swap, val.dptr, &dat, val.dsize);
596
if ((!older || dat.time < older) && (!newer || dat.time > newer))
597
{
598
if ((i = apply(state, con, id, index, key, val, &dat)) < 0)
599
return -1;
600
if (i > 0)
601
goto rescan;
602
}
603
}
604
}
605
return 0;
606
}
607
608
/*
609
* convert s to a date/time
610
*/
611
612
static unsigned long
613
date(State_t* state, Connection_t* con, const char* s)
614
{
615
unsigned long t;
616
char* e;
617
datum key;
618
datum val;
619
Data_t dat;
620
621
key.dptr = (void*)s;
622
key.dsize = strlen(s) + 1;
623
val = dbm_fetch(state->dbm, key);
624
if (val.dptr)
625
{
626
swapmem(state->swap, val.dptr, &dat, val.dsize);
627
t = dat.time;
628
}
629
else
630
{
631
t = tmdate(s, &e, NiL);
632
if (*e)
633
{
634
log(state, con, 'E', "%s: invalid date/time", s);
635
t = 0;
636
}
637
}
638
return t;
639
}
640
641
/*
642
* service a request
643
*/
644
645
static int
646
actionf(register Css_t* css, register Cssfd_t* fp, Cssdisc_t* disc)
647
{
648
register State_t* state = (State_t*)disc;
649
register Connection_t* con;
650
char* s;
651
char* t;
652
char** a;
653
char** q;
654
Cssfd_t* f;
655
Request_t* r;
656
Event_t* e;
657
Waiting_t* w;
658
Connection_t* x;
659
int n;
660
int err;
661
int id;
662
unsigned long older;
663
unsigned long newer;
664
datum key;
665
datum val;
666
Data_t data;
667
char buf[64];
668
669
switch (fp->status)
670
{
671
case CS_POLL_CLOSE:
672
if (con = (Connection_t*)fp->data)
673
dtdelete(state->connections, con);
674
return 0;
675
case CS_POLL_READ:
676
con = (Connection_t*)fp->data;
677
if ((n = csread(css->state, fp->fd, state->req, sizeof(state->req), CS_LINE)) <= 0)
678
return -1;
679
state->req[--n] = 0;
680
log(state, con, 'R', "%s", state->req);
681
con->code = 0;
682
if (tokscan(state->req, NiL, " %v ", state->cmd, elementsof(state->cmd) - 1) > 0)
683
{
684
id = -1;
685
for (q = state->cmd; (s = *q) && (isalpha(*s) || *s == '_'); q++)
686
{
687
while (isalnum(*++s));
688
if (*s != '=')
689
break;
690
if ((s - *q) == 2 && *(s - 1) == 'd' && *(s - 2) == 'i')
691
id = (int)strtol(s + 1, NiL, 0);
692
}
693
s = *(a = q);
694
if (!(r = (Request_t*)strpsearch(requests, elementsof(requests), sizeof(requests[0]), s, NiL)))
695
log(state, con, 'E', "%s: unknown request", s);
696
else
697
{
698
opt_info.index = 0;
699
newer = older = 0;
700
err = 0;
701
sfstrseek(state->usrf, 0, SEEK_SET);
702
for (;;)
703
{
704
switch (optget(a, usage))
705
{
706
case 'e':
707
if (r->index == REQ_set)
708
{
709
state->expire = strelapsed(opt_info.arg, &t, 1);
710
if (*t)
711
{
712
log(state, con, 'E', "%s: invalid elapsed time expression", opt_info.arg);
713
err = 1;
714
break;
715
}
716
}
717
continue;
718
case 'l':
719
if (r->index == REQ_set)
720
state->log = opt_info.num;
721
continue;
722
case 'n':
723
newer = date(state, con, opt_info.arg);
724
continue;
725
case 'o':
726
older = date(state, con, opt_info.arg);
727
continue;
728
case 'q':
729
con->quiet = opt_info.num;
730
continue;
731
case '?':
732
case ':':
733
log(state, con, 'E', "%s: %s", s, opt_info.arg);
734
err = 1;
735
break;
736
}
737
break;
738
}
739
if (!err)
740
{
741
if (!*(a += opt_info.index))
742
{
743
if (newer || older)
744
{
745
a[0] = "*";
746
a[1] = 0;
747
n = 1;
748
}
749
else
750
n = 0;
751
}
752
else
753
n = a[1] ? 2 : 1;
754
if (r->min && n < r->min)
755
sfprintf(state->usrf, "E %s: at least %d argument%s expected\n", s, r->min, r->min == 1 ? "" : "s");
756
else if (r->max > 0 && n > r->max)
757
log(state, con, 'E', "%s: at most %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
758
else if (r->min == r->max && n != r->max)
759
log(state, con, 'E', "%s: %d argument%s expected", s, r->max, r->max == 1 ? "" : "s");
760
else
761
switch (r->index)
762
{
763
case REQ_all:
764
n = (int)strtol(a[0], &t, 0);
765
if (*t)
766
{
767
log(state, con, 'E', "%s: invalid numeric value", a[0]);
768
break;
769
}
770
else if (!(f = cssfd(css, n, 0)) || !(x = (Connection_t*)f->data))
771
{
772
log(state, con, 'E', "%d: invalid connection index", n);
773
break;
774
}
775
if (x->waiting)
776
{
777
n = x->quiet;
778
x->quiet = 1;
779
a = state->cmd;
780
for (w = (Waiting_t*)dtfirst(x->waiting); w; w = (Waiting_t*)dtnext(x->waiting, w))
781
{
782
if (a >= &state->cmd[elementsof(state->cmd)-1])
783
{
784
*a = 0;
785
if (request(state, x, -1, REQ_raise, a = state->cmd, older, newer))
786
break;
787
}
788
*a++ = w->event->name;
789
log(state, con, 'R', "%s %s", s, w->event->name);
790
}
791
if (a > state->cmd)
792
{
793
*a = 0;
794
request(state, x, -1, REQ_raise, state->cmd, older, newer);
795
}
796
x->quiet = n;
797
}
798
log(state, con, 'I', "done");
799
break;
800
case REQ_clear:
801
case REQ_delete:
802
case REQ_raise:
803
case REQ_test:
804
case REQ_wait:
805
if (request(state, con, id, r->index, a, older, newer))
806
return -1;
807
break;
808
case REQ_exit:
809
cssfd(css, fp->fd, CS_POLL_CLOSE);
810
break;
811
case REQ_info:
812
info(state, con, css);
813
break;
814
case REQ_hold:
815
if (!*a)
816
{
817
state->hold = 1;
818
sfprintf(state->usrf, "I holding\n");
819
}
820
else
821
if (request(state, con, id, r->index, a, older, newer))
822
return -1;
823
break;
824
case REQ_list:
825
con->all = 1;
826
if (s = *a)
827
{
828
if (n = regcomp(&con->re, s, REG_SHELL|REG_AUGMENTED|REG_LEFT|REG_RIGHT))
829
{
830
regerror(n, &con->re, buf, sizeof(buf));
831
log(state, con, 'E', "%s: %s", s, buf);
832
break;
833
}
834
con->all = 0;
835
}
836
con->list = dbm_firstkey(state->dbm);
837
if (!con->list.dptr)
838
{
839
log(state, con, 'I', "empty");
840
break;
841
}
842
con->newer = newer;
843
con->older = older;
844
goto list;
845
case REQ_next:
846
if (!con->list.dptr)
847
{
848
log(state, con, 'W', "next: must execute list first");
849
break;
850
}
851
for (;;)
852
{
853
con->list = dbm_nextkey(state->dbm);
854
if (!con->list.dptr)
855
{
856
log(state, con, 'I', "done");
857
break;
858
}
859
list:
860
if (EVENT(con->list.dptr) && (con->all || !regexec(&con->re, con->list.dptr, 0, NiL, 0)))
861
{
862
val = dbm_fetch(state->dbm, con->list);
863
if (val.dsize > sizeof(data))
864
val.dsize = sizeof(data);
865
swapmem(state->swap, val.dptr, &data, val.dsize);
866
if ((!con->older || data.time < con->older) && (!con->newer || data.time > con->newer))
867
{
868
log(state, con, 'I', "event %s %s %d%s%s", con->list.dptr, fmttime("%K", data.time), data.raise, (data.flags & DATA_clear) ? " CLEAR" : "", (data.flags & DATA_hold) ? " HOLD" : "");
869
break;
870
}
871
}
872
}
873
break;
874
case REQ_release:
875
if (!*a)
876
{
877
state->hold = 0;
878
sfprintf(state->usrf, "I released\n");
879
key = dbm_firstkey(state->dbm);
880
while (key.dptr)
881
{
882
val = dbm_fetch(state->dbm, key);
883
if (val.dsize > sizeof(data))
884
val.dsize = sizeof(data);
885
swapmem(state->swap, val.dptr, &data, val.dsize);
886
if (!(data.flags & (DATA_clear|DATA_hold)) && (e = (Event_t*)dtmatch(state->events, key.dptr)))
887
notify(state, e);
888
key = dbm_nextkey(state->dbm);
889
}
890
}
891
else
892
if (request(state, con, id, r->index, a, older, newer))
893
return -1;
894
break;
895
case REQ_set:
896
break;
897
case REQ_stop:
898
exit(0);
899
break;
900
}
901
}
902
}
903
if (sfstrtell(state->usrf))
904
{
905
if (id >= 0)
906
{
907
sfstrseek(state->usrf, 0, SEEK_SET);
908
log(state, con, 'x', "%d %d", id, con->code);
909
}
910
n = sfstrtell(state->usrf);
911
if (!(s = sfstruse(state->usrf)))
912
error(ERROR_SYSTEM|3, "out of space");
913
if (cswrite(css->state, fp->fd, s, n) != n)
914
return -1;
915
}
916
}
917
return 1;
918
}
919
return 0;
920
}
921
922
/*
923
* handle exceptions
924
*/
925
926
static int
927
exceptf(Css_t* css, unsigned long op, unsigned long arg, Cssdisc_t* disc)
928
{
929
register State_t* state = (State_t*)disc;
930
931
switch (op)
932
{
933
case CSS_CLOSE:
934
if (state->dbm)
935
{
936
dbm_close(state->dbm);
937
state->dbm = 0;
938
}
939
return 0;
940
case CSS_INTERRUPT:
941
error(ERROR_SYSTEM|3, "%s: interrupt exit", fmtsignal(arg));
942
return 0;
943
case CSS_WAKEUP:
944
error(2, "wakeup");
945
return 0;
946
}
947
error(ERROR_SYSTEM|3, "poll error op=0x%08x arg=0x%08x", op, arg);
948
return -1;
949
}
950
951
/*
952
* free connection
953
*/
954
955
static void
956
confree(Dt_t* dt, void* obj, Dtdisc_t* disc)
957
{
958
State_t* state = (State_t*)((char*)disc - offsetof(State_t, condisc));
959
Connection_t* con = (Connection_t*)obj;
960
961
NoP(dt);
962
NoP(disc);
963
state->active--;
964
if (con->waiting)
965
dtclose(con->waiting);
966
log(state, con, 'S', "drop connection -- %d active", state->active);
967
free(obj);
968
}
969
970
/*
971
* free event
972
*/
973
974
static void
975
eventfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
976
{
977
NoP(dt);
978
NoP(disc);
979
free(obj);
980
}
981
982
/*
983
* free connection pending event
984
*/
985
986
static void
987
waitfree(Dt_t* dt, void* obj, Dtdisc_t* disc)
988
{
989
State_t* state = (State_t*)((char*)disc - offsetof(State_t, waitdisc));
990
Waiting_t* p = (Waiting_t*)obj;
991
992
NoP(dt);
993
if (--p->event->waiting == 0)
994
dtdelete(state->events, p->event);
995
free(obj);
996
}
997
998
/*
999
* open and verify event db
1000
*/
1001
1002
static void
1003
db(register State_t* state)
1004
{
1005
datum key;
1006
datum val;
1007
Data_t data;
1008
uint32_t u4;
1009
1010
if (!(state->dbm = dbm_open(state->path, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH)))
1011
error(ERROR_SYSTEM|3, "%s: cannot open database for read/write", state->path);
1012
key.dptr = (void*)ident_key;
1013
key.dsize = sizeof(ident_key);
1014
val = dbm_fetch(state->dbm, key);
1015
if (val.dptr)
1016
{
1017
if (val.dsize != sizeof(data))
1018
error(3, "%s: invalid db -- data size %d, expected %d", state->path, val.dsize, sizeof(data));
1019
memcpy(&data, val.dptr, val.dsize);
1020
if (memcmp(&data.expire, ident_name, sizeof(data.expire)))
1021
error(3, "%s: %s: invalid db -- ident mismatch, expected %s", state->path, ident_key, ident_name);
1022
u4 = IDENT_SWAP;
1023
if (state->swap = swapop(&u4, &data.time, 4))
1024
swapmem(state->swap, &data, &data, sizeof(data));
1025
}
1026
else
1027
{
1028
val = dbm_firstkey(state->dbm);
1029
if (val.dptr)
1030
error(3, "%s: %s: invalid db -- ident entry expected", state->path, ident_key);
1031
memset(&data, 0, sizeof(data));
1032
memcpy(&data.expire, ident_name, sizeof(data.expire));
1033
data.time = IDENT_SWAP;
1034
data.raise = IDENT_VERSION;
1035
val.dptr = (void*)&data;
1036
val.dsize = sizeof(data);
1037
if (dbm_store(state->dbm, key, val, DBM_INSERT))
1038
{
1039
dbm_clearerr(state->dbm);
1040
error(3, "%s: %s: db initial ident entry store failed", state->path, ident_key);
1041
}
1042
}
1043
state->major = (data.raise >> 16) & 0xffff;
1044
state->minor = data.raise & 0xffff;
1045
}
1046
1047
/*
1048
* client/server main
1049
*/
1050
1051
int
1052
main(int argc, char** argv)
1053
{
1054
char* s;
1055
Css_t* css;
1056
1057
char* p = 0;
1058
int server = 0;
1059
1060
static State_t state;
1061
1062
NoP(argc);
1063
setlocale(LC_ALL, "");
1064
opt_info.argv = argv;
1065
state.log = 1;
1066
error_info.id = (char*)command;
1067
if (!(state.usrf = sfstropen()) || !(state.tmp = sfstropen()))
1068
error(3, "out of space [buf]");
1069
1070
/*
1071
* check the options
1072
*/
1073
1074
for (;;)
1075
{
1076
switch (optget(argv, usage))
1077
{
1078
case 'c':
1079
p = opt_info.arg;
1080
continue;
1081
case 'e':
1082
state.expire = strelapsed(opt_info.arg, &s, 1);
1083
if (*s)
1084
error(2, "%s: invalid elapsed time expression", opt_info.arg);
1085
continue;
1086
case 'i':
1087
server = 1;
1088
continue;
1089
case 'l':
1090
state.log = opt_info.num;
1091
continue;
1092
case '?':
1093
error(ERROR_USAGE|4, "%s", opt_info.arg);
1094
continue;
1095
case ':':
1096
error(2, "%s", opt_info.arg);
1097
continue;
1098
}
1099
break;
1100
}
1101
if (error_info.errors || !(state.path = *(argv += opt_info.index)))
1102
error(ERROR_USAGE|4, "%s", optusage(NiL));
1103
1104
/*
1105
* get the connect stream path
1106
*/
1107
1108
if (s = strrchr(state.path, '/'))
1109
s++;
1110
else
1111
s = state.path;
1112
if (p)
1113
sfprintf(state.usrf, "%s/%s", p, s);
1114
else
1115
sfprintf(state.usrf, "/dev/tcp/local/%s/%s", error_info.id, s);
1116
if (!(state.service = strdup(sfstruse(state.usrf))))
1117
error(3, "out of space [service]");
1118
1119
/*
1120
* either server or client at this point
1121
*/
1122
1123
if (server)
1124
{
1125
umask(S_IWOTH);
1126
db(&state);
1127
state.condisc.link = offsetof(Event_t, link);
1128
state.condisc.freef = confree;
1129
if (!(state.connections = dtopen(&state.condisc, Dtlist)))
1130
error(ERROR_SYSTEM|3, "out of space [connection dictionary]");
1131
state.eventdisc.link = offsetof(Event_t, link);
1132
state.eventdisc.key = offsetof(Event_t, name);
1133
state.eventdisc.freef = eventfree;
1134
if (!(state.events = dtopen(&state.eventdisc, Dtoset)))
1135
error(ERROR_SYSTEM|3, "out of space [event dictionary]");
1136
state.waitdisc.link = offsetof(Waiting_t, link);
1137
state.waitdisc.key = offsetof(Waiting_t, event);
1138
state.waitdisc.size = sizeof(Event_t*);
1139
state.waitdisc.freef = waitfree;
1140
state.disc.version = CSS_VERSION;
1141
state.disc.flags = CSS_DAEMON|CSS_LOG|CSS_ERROR|CSS_INTERRUPT|CSS_WAKEUP|CSS_PRESERVE;
1142
state.disc.timeout = 60 * 60 * 1000L;
1143
state.disc.acceptf = acceptf;
1144
state.disc.actionf = actionf;
1145
state.disc.errorf = errorf;
1146
state.disc.exceptf = exceptf;
1147
if (!(css = cssopen(state.service, &state.disc)))
1148
return 1;
1149
umask(S_IWOTH);
1150
error_info.id = css->id;
1151
if (state.log)
1152
{
1153
sfprintf(state.tmp, "%s.log", state.path);
1154
if (!(s = sfstruse(state.tmp)))
1155
error(ERROR_SYSTEM|3, "out of space");
1156
if (state.logf = sfopen(NiL, s, "a"))
1157
sfset(state.logf, SF_LINE, 1);
1158
else
1159
error(ERROR_SYSTEM|2, "%s: cannot append log file", s);
1160
}
1161
log(&state, 0, 'S', "start service %s", fmtident(usage));
1162
csspoll(CS_NEVER, 0);
1163
log(&state, 0, 'S', "stop service");
1164
return 1;
1165
}
1166
return csclient(&cs, -1, state.service, "event> ", argv + 1, CS_CLIENT_ARGV|CS_CLIENT_SEP);
1167
}
1168
1169
#endif
1170
1171