Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/cmd/cs/mbb.c
1808 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 1990-2011 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Glenn Fowler <[email protected]> *
18
* *
19
***********************************************************************/
20
#pragma prototyped
21
/*
22
* Glenn Fowler
23
* AT&T Research
24
*/
25
26
static const char usage[] =
27
"[-?\n@(#)$Id: mbb (AT&T Research) 2000-05-09 $\n]"
28
USAGE_LICENSE
29
"[+NAME?mbb - message bulletin board server]"
30
"[+DESCRIPTION?\bmbb\b is a message bulletin board server. Each \bmbb\b"
31
" instance is a session. Each session supports up to 64 message"
32
" channels, labeled from 0 to 63. A client connects to \bmbb\b and"
33
" provides a mask of channels that it is interested in. All subsequent"
34
" messages in the mask are sent to the client. Channel 0 is reserved"
35
" for service control messages.]"
36
"[+?A message is a newline terminated line with length 8K bytes max"
37
" that does not contain the ASCII NUL character. A message must"
38
" be prefixed by its channel number. The server changes this to"
39
" \achannel\a.\aid\a where \aid\a is the server assigned client"
40
" identification number. Messages with invalid or missing channel"
41
" numbers are silently rejected.]"
42
43
"[b:backlog?Every \an\ath client message with length > 1 will be treated as"
44
" if only half the data were sent. This should kick in the message"
45
" backlog logic.]#[n]"
46
"[d:debug?Set the debug trace level to \alevel\a. Higher levels produce more"
47
" output.]#[level]"
48
"[t:timeout?The service will exit after a \atime\a period of client"
49
" inactivity. The default is to run until the system crashes.]:[time]"
50
51
"\n"
52
"\nconnect-stream\n"
53
"\n"
54
55
"[+PROTOCOL?Channel 0 is for service control messages. The server and clients"
56
" may send messages on channel 0, with the exception that client"
57
" messages on channel 0 are not sent to the other clients. The control"
58
" messages are:]{"
59
" [+0 listen \amask\a?[client]] The client is interested in"
60
" channel numbers in the bitmask \amask\a. The default"
61
" \amask\a is \b0xfffffffffffffffe\b; i.e., all but"
62
" channel 0.]"
63
" [+0 join \aid\a?[server]] Client with server assigned id"
64
" number \aid\a has joined the session.]"
65
" [+0 drop \aid\a?[server]] Client with server assigned id"
66
" number \aid\a has dropped from the session.]"
67
" }"
68
69
"[+SEE ALSO?\bcoshell\b(1), \bcs\b(1), \bss\b(1), \bcs\b(3)]"
70
;
71
72
#include <css.h>
73
#include <ctype.h>
74
#include <debug.h>
75
#include <error.h>
76
#include <ls.h>
77
#include <stdarg.h>
78
79
typedef struct Log_s
80
{
81
char name[2]; /* file name */
82
Sfio_t* sp; /* r/w stream */
83
Sfoff_t offset; /* next write offset */
84
size_t blocked; /* blocked connections */
85
} Log_t;
86
87
typedef struct Connection_s
88
{
89
struct Connection_s* next;
90
Cssfd_t* fp;
91
Sfulong_t mask;
92
Sfoff_t blocked[2];
93
} Connection_t;
94
95
typedef struct State_s
96
{
97
Cssdisc_t disc;
98
Connection_t* all;
99
Sfio_t* tmp;
100
Log_t logs[2];
101
int backlog;
102
int count;
103
int log;
104
int logged;
105
} State_t;
106
107
#define ALL (-1)
108
#define CHUNK (4 * 1024)
109
#define HOG (4 * 1024 * 1024)
110
111
#define CHAN_DEFAULT (((Sfulong_t)~0)^1)
112
#define CHAN_VALID(c) ((c)>=0&&(c)<=63)
113
#define CHAN_MASK(c) (((Sfulong_t)1)<<(c))
114
115
static char buf[8 * 1024];
116
117
static ssize_t
118
data(register State_t* state, register Connection_t* to, char* s, size_t n, int force)
119
{
120
if (!force && n > 1 && state->backlog && ++state->count >= state->backlog)
121
{
122
message((-1, "[%d] %d backlog", __LINE__, to->fp->fd));
123
state->count = 0;
124
n /= 2;
125
}
126
return write(to->fp->fd, s, n);
127
}
128
129
static int
130
note(Css_t* css, register Connection_t* to, int log, char* s, size_t n, int force, Cssdisc_t* disc)
131
{
132
register State_t* state = (State_t*)disc;
133
ssize_t z;
134
135
if ((force || to->blocked[log] < 0) && (z = data(state, to, s, n, force)) != n)
136
{
137
if (!force && !state->logged)
138
{
139
state->logged = 1;
140
if (!state->logs[log].sp)
141
{
142
state->logs[log].name[0] = '0' + log;
143
remove(state->logs[log].name);
144
if (!(state->logs[log].sp = sfopen(NiL, state->logs[log].name, "r+")))
145
error(ERROR_SYSTEM|3, "%s: cannot create message log", state->logs[log].name);
146
message((-1, "[%d] %s: create log", __LINE__, state->logs[log].name));
147
}
148
message((-1, "[%d] %s: %d log", __LINE__, state->logs[log].name, to->fp->fd));
149
if (sfwrite(state->logs[log].sp, s, n) != n)
150
error(ERROR_SYSTEM|3, "%s: log file write error", state->logs[log].name);
151
if ((state->logs[log].offset += n) >= HOG && !state->logs[!log].sp)
152
state->log = !log;
153
}
154
if (to->blocked[log] < 0)
155
{
156
message((-1, "[%d] %s: block", __LINE__, state->logs[log].name));
157
state->logs[log].blocked++;
158
}
159
to->blocked[log] = state->logs[log].offset - n + z;
160
message((-1, "[%d] %s: %d offset %I*d", __LINE__, state->logs[log].name, to->fp->fd, sizeof(to->blocked[log]), to->blocked[log]));
161
cssfd(css, to->fp->fd, CS_POLL_READ|CS_POLL_WRITE);
162
return 0;
163
}
164
if (to->blocked[log] >= 0)
165
{
166
message((-1, "[%d] %s: %d unblock", __LINE__, state->logs[log].name, to->fp->fd));
167
to->blocked[log] = -1;
168
if (!--state->logs[log].blocked)
169
{
170
sfclose(state->logs[log].sp);
171
state->logs[log].sp = 0;
172
state->logs[log].offset = 0;
173
remove(state->logs[log].name);
174
message((-1, "[%d] %s: clear", __LINE__, state->logs[log].name));
175
}
176
}
177
return 1;
178
}
179
180
static int
181
dump(Css_t* css, register Connection_t* con, int log, Cssdisc_t* disc)
182
{
183
register State_t* state = (State_t*)disc;
184
char* s;
185
size_t n;
186
int r;
187
188
n = state->logs[log].offset - con->blocked[log];
189
if (n > CHUNK)
190
n = CHUNK;
191
if (sfseek(state->logs[log].sp, con->blocked[log], SEEK_SET) != con->blocked[log])
192
error(ERROR_SYSTEM|3, "%s: cannot seek to %I*d", state->logs[log].name, sizeof(con->blocked[log]), con->blocked[log]);
193
message((-1, "[%d] %s reserve n %I*d offset %I*d", __LINE__, state->logs[log].name, sizeof(n), n, sizeof(con->blocked[log]), con->blocked[log]));
194
if (!(s = sfreserve(state->logs[log].sp, n, 0)))
195
error(ERROR_SYSTEM|3, "%s: cannot reserve %d at %I*d", state->logs[log].name, sizeof(n), n, sizeof(con->blocked[log]), con->blocked[log]);
196
r = note(css, con, log, s, n, 1, disc);
197
if (state->logs[log].sp && sfseek(state->logs[log].sp, state->logs[log].offset, SEEK_SET) != state->logs[log].offset)
198
error(ERROR_SYSTEM|3, "%s: cannot seek to %I*d", state->logs[log].name, sizeof(state->logs[log].offset), state->logs[log].offset);
199
return r;
200
}
201
202
static int
203
post(Css_t* css, Cssdisc_t* disc, Connection_t* from, register Connection_t* to, int channel, const char* format, ...)
204
{
205
State_t* state = (State_t*)disc;
206
char* s;
207
ssize_t n;
208
Sfulong_t m;
209
va_list ap;
210
211
sfprintf(state->tmp, "%d", channel);
212
if (from)
213
sfprintf(state->tmp, ".%d", from->fp->fd);
214
sfputc(state->tmp, ' ');
215
va_start(ap, format);
216
sfvprintf(state->tmp, format, ap);
217
va_end(ap);
218
sfputc(state->tmp, '\n');
219
n = sfstrtell(state->tmp);
220
if (!(s = sfstruse(state->tmp)))
221
error(ERROR_SYSTEM|3, "out of space");
222
m = CHAN_MASK(channel);
223
state->logged = 0;
224
if (!to)
225
{
226
for (to = state->all; to; to = to->next)
227
if ((to->mask & m) && to != from)
228
note(css, to, state->log, s, n, 0, disc);
229
}
230
else if (to->mask & m)
231
note(css, to, state->log, s, n, 0, disc);
232
return 0;
233
}
234
235
static void
236
drop(Css_t* css, Connection_t* con, Cssdisc_t* disc)
237
{
238
register State_t* state = (State_t*)disc;
239
register Connection_t* cp;
240
register Connection_t* pp;
241
242
pp = 0;
243
for (cp = state->all; cp; pp = cp, cp = cp->next)
244
if (cp == con)
245
{
246
if (pp)
247
pp->next = cp->next;
248
else
249
state->all = cp->next;
250
cp->fp->data = 0;
251
free(cp);
252
post(css, disc, cp, NiL, 0, "drop");
253
break;
254
}
255
}
256
257
static int
258
acceptf(Css_t* css, Cssfd_t* fp, Csid_t* ip, char** av, Cssdisc_t* disc)
259
{
260
register State_t* state = (State_t*)disc;
261
register Connection_t* con;
262
int i;
263
264
NoP(ip);
265
NoP(av);
266
if (!(con = newof(0, Connection_t, 1, 0)))
267
return -1;
268
fp->data = con;
269
con->fp = fp;
270
con->mask = CHAN_DEFAULT;
271
for (i = 0; i < elementsof(state->logs); i++)
272
con->blocked[i] = -1;
273
con->next = state->all;
274
state->all = con;
275
post(css, disc, con, NiL, 0, "join");
276
return fp->fd;
277
}
278
279
static int
280
actionf(register Css_t* css, register Cssfd_t* fp, Cssdisc_t* disc)
281
{
282
register State_t* state = (State_t*)disc;
283
register Connection_t* con;
284
register char* s;
285
char* e;
286
int n;
287
int c;
288
Sfulong_t m;
289
Sfulong_t o;
290
291
switch (fp->status)
292
{
293
case CS_POLL_CLOSE:
294
if (!(con = (Connection_t*)fp->data))
295
return -1;
296
drop(css, con, disc);
297
return 0;
298
case CS_POLL_READ:
299
if (!(con = (Connection_t*)fp->data))
300
return -1;
301
if ((n = csread(css->state, fp->fd, buf, sizeof(buf) - 1, CS_LINE)) <= 0)
302
{
303
drop(css, con, disc);
304
return -1;
305
}
306
if (n > 0 && buf[n - 1] == '\n')
307
n--;
308
buf[n] = 0;
309
for (s = buf; isspace(*s); s++);
310
c = (int)strtol(s, &e, 0);
311
if (CHAN_VALID(c) && e > s)
312
{
313
s = e;
314
if (*s == '.')
315
while (isdigit(*++s));
316
for (; isspace(*s); s++);
317
if (c == 0)
318
{
319
for (e = s; *s && !isspace(*s); s++);
320
if (*s)
321
for (*s++ = 0; isspace(*s); s++);
322
switch (*e)
323
{
324
case 'm':
325
if (!strcmp(e, "mask"))
326
{
327
o = con->mask;
328
if (*s)
329
{
330
m = strtoull(s, &e, 0);
331
if (e > s)
332
con->mask = m;
333
}
334
post(css, disc, con, con, 0, "mask 0x%I*x 0x%I*x", sizeof(con->mask), con->mask, sizeof(o), o);
335
}
336
break;
337
case 'q':
338
/* might want privilege check here */
339
if (!strcmp(e, "quit"))
340
exit(0);
341
break;
342
}
343
}
344
else
345
post(css, disc, con, NiL, c, "%s", s);
346
}
347
return 1;
348
case CS_POLL_WRITE:
349
if (!(con = (Connection_t*)fp->data))
350
return -1;
351
if ((con->blocked[!state->log] < 0 || dump(css, con, !state->log, disc)) && con->blocked[state->log] >= 0)
352
dump(css, con, state->log, disc);
353
return 1;
354
}
355
return 0;
356
}
357
358
static int
359
exceptf(Css_t* css, unsigned long op, unsigned long arg, Cssdisc_t* disc)
360
{
361
switch (op)
362
{
363
case CSS_INTERRUPT:
364
error(ERROR_SYSTEM|3, "%s: interrupt exit", fmtsignal(arg));
365
return 0;
366
case CSS_DORMANT:
367
error(2, "service dormant exit");
368
exit(0);
369
}
370
error(ERROR_SYSTEM|3, "poll error op=0x%08x arg=0x%08x", op, arg);
371
return -1;
372
}
373
374
int
375
main(int argc, char** argv)
376
{
377
char* e;
378
State_t state;
379
380
NoP(argc);
381
error_info.id = "mbb";
382
memset(&state, 0, sizeof(state));
383
state.disc.version = CSS_VERSION;
384
state.disc.flags = CSS_DAEMON|CSS_ERROR|CSS_INTERRUPT|CSS_LOG;
385
state.disc.acceptf = acceptf;
386
state.disc.actionf = actionf;
387
state.disc.errorf = errorf;
388
state.disc.exceptf = exceptf;
389
for (;;)
390
{
391
switch (optget(argv, usage))
392
{
393
case 'b':
394
state.backlog = opt_info.num;
395
continue;
396
case 'd':
397
error_info.trace = -opt_info.num;
398
continue;
399
case 't':
400
state.disc.timeout = strelapsed(opt_info.arg, &e, 1);
401
if (*e)
402
error(3, "%s: invalid timeout value", opt_info.arg);
403
state.disc.flags |= CSS_DORMANT;
404
continue;
405
case '?':
406
error(ERROR_USAGE|4, "%s", opt_info.arg);
407
continue;
408
case ':':
409
error(2, "%s", opt_info.arg);
410
continue;
411
}
412
break;
413
}
414
argv += opt_info.index;
415
if (!argv[0] || argv[1])
416
error(ERROR_USAGE|4, "%s", optusage(NiL));
417
if (!(state.tmp = sfstropen()))
418
error(ERROR_SYSTEM|3, "out of space [tmp stream]");
419
if (!cssopen(argv[0], &state.disc))
420
return 1;
421
umask(S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
422
csspoll(CS_NEVER, 0);
423
return 1;
424
}
425
426