Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/cmd/dsslib/sort/sort.c
1810 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 2011-2012 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Glenn Fowler <[email protected]> *
18
* *
19
***********************************************************************/
20
#pragma prototyped
21
22
static const char sort_usage[] =
23
"[+PLUGIN?\findex\f]"
24
"[+DESCRIPTION?The sort query writes the sorted input records to the"
25
" standard output. The unsorted record stream is passed to the"
26
" next query, if any. The sort keys are the \afield\a operands,"
27
" or the raw record data if there are no operands.]"
28
"[c:count?Prepend an integer count field of the number of records that "
29
"compare equal. Records with count less that \athreshhold\a are "
30
"omitted.]#?[threshold:=1]"
31
"[r:reverse|invert?Reverse the sense of comparisons.]"
32
"[u:unique?Keep only the first of multiple records that compare equal on "
33
"all keys.]"
34
"\n"
35
"\n[ field ... ]\n"
36
"\n"
37
"[+CAVEATS?Currently all data is sorted in memory -- spillover to "
38
"temporary files not implemented yet.]"
39
;
40
41
#include <dsslib.h>
42
#include <recsort.h>
43
#include <stk.h>
44
45
struct State_s; typedef struct State_s State_t;
46
47
struct State_s
48
{
49
Rsdisc_t sortdisc;
50
Rsdisc_t uniqdisc;
51
Rskeydisc_t keydisc;
52
Rskey_t* sortkey;
53
Rskey_t* uniqkey;
54
Rs_t* sort;
55
Rs_t* uniq;
56
Cx_t* cx;
57
Sfio_t* op;
58
Sfio_t* sortstack;
59
Sfio_t* uniqstack;
60
Sfio_t* tmp;
61
char* sortbase;
62
char* uniqbase;
63
void* data;
64
Dssfile_t* file;
65
Vmalloc_t* rm;
66
Vmalloc_t* vm;
67
size_t count;
68
};
69
70
extern Dsslib_t dss_lib_sort;
71
72
static ssize_t
73
key(Rs_t* rs, unsigned char* data, size_t datasize, unsigned char* key, size_t keysize, Rsdisc_t* disc)
74
{
75
State_t* state = (State_t*)disc;
76
Rskeyfield_t* field;
77
Cxoperand_t r;
78
unsigned char* k;
79
unsigned char* e;
80
81
k = key;
82
e = k + keysize;
83
for (field = state->sortkey->head; field; field = field->next)
84
{
85
if (cxcast(state->cx, &r, (Cxvariable_t*)field->user, state->cx->state->type_string, state->data, NiL))
86
return -1;
87
k += field->coder(state->sortkey, field, (unsigned char*)r.value.string.data, r.value.string.size, k, e);
88
}
89
return k - key;
90
}
91
92
static ssize_t
93
rev(Rs_t* rs, unsigned char* data, size_t datasize, unsigned char* key, size_t keysize, Rsdisc_t* disc)
94
{
95
State_t* state = (State_t*)disc;
96
97
return state->sortkey->head->coder(state->sortkey, state->sortkey->head, data, datasize, key, key + keysize);
98
}
99
100
static int
101
count(Rs_t* rs, int op, void* data, void* arg, Rsdisc_t* disc)
102
{
103
State_t* state = (State_t*)disc;
104
Rsobj_t* r;
105
Rsobj_t* q;
106
char* s;
107
ssize_t n;
108
109
switch (op)
110
{
111
case RS_POP:
112
break;
113
case RS_WRITE:
114
r = (Rsobj_t*)data;
115
n = 1;
116
for (q = r->equal; q; q = q->right)
117
n++;
118
if (n >= state->count)
119
{
120
n = sfprintf(state->uniqstack, "%I*u %-.*s", sizeof(n), n, r->datalen, r->data);
121
s = stkfreeze(state->uniqstack, 0);
122
if (rsprocess(state->uniq, s, -n) <= 0)
123
{
124
if (state->cx->disc->errorf)
125
(*state->cx->disc->errorf)(state->cx, disc, ERROR_SYSTEM|2, "uniq record process error");
126
return -1;
127
}
128
}
129
return RS_DELETE;
130
default:
131
return -1;
132
}
133
return 0;
134
}
135
136
static int
137
sort_beg(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
138
{
139
char** argv = (char**)data;
140
int errors = error_info.errors;
141
int n;
142
int uniq;
143
char* s;
144
char* t;
145
char* num;
146
State_t* state;
147
Cxvariable_t* variable;
148
Vmalloc_t* vm;
149
char opt[2];
150
151
if (!(vm = vmopen(Vmdcheap, Vmlast, 0)) || !(state = vmnewof(vm, 0, State_t, 1, 0)))
152
{
153
if (vm)
154
vmclose(vm);
155
if (disc->errorf)
156
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "out of space");
157
return -1;
158
}
159
state->vm = vm;
160
state->keydisc.version = RSKEY_VERSION;
161
state->keydisc.errorf = disc->errorf;
162
if (!(state->sortkey = rskeyopen(&state->keydisc, &state->sortdisc)))
163
goto bad;
164
if (!(state->sort = rsnew(state->sortkey->disc)))
165
goto bad;
166
if (!(state->sortstack = stkopen(0)))
167
goto bad;
168
if (!(state->tmp = sfstropen()))
169
goto bad;
170
state->sortbase = stkptr(state->sortstack, 0);
171
if (!(state->file = dssfopen(DSS(cx), "-", state->sortstack, DSS_FILE_WRITE, 0)))
172
goto bad;
173
sfprintf(cx->buf, "%s%s", strchr(dss_lib_sort.description, '['), sort_usage);
174
s = sfstruse(cx->buf);
175
uniq = 0;
176
for (;;)
177
{
178
switch (optget(argv, s))
179
{
180
case 0:
181
break;
182
case 'c':
183
state->count = (size_t)opt_info.number;
184
continue;
185
case 'u':
186
uniq = 1;
187
continue;
188
case '?':
189
if (disc->errorf)
190
{
191
(*disc->errorf)(cx, disc, ERROR_USAGE|4, "%s", opt_info.arg);
192
}
193
else
194
goto bad;
195
continue;
196
case ':':
197
if (disc->errorf)
198
(*disc->errorf)(cx, disc, 2, "%s", opt_info.arg);
199
else
200
goto bad;
201
continue;
202
default:
203
opt[0] = opt_info.option[1];
204
opt[1] = 0;
205
if (rskeyopt(state->sortkey, opt, 1))
206
goto bad;
207
continue;
208
}
209
break;
210
}
211
if (error_info.errors > errors)
212
goto bad;
213
argv += opt_info.index;
214
n = 0;
215
num = state->sortkey->head && state->sortkey->head->rflag ? "nr" : "n";
216
while (s = *argv++)
217
{
218
if (t = strchr(s, '-'))
219
{
220
sfwrite(cx->buf, s, t - s);
221
s = sfstruse(cx->buf);
222
t++;
223
}
224
else
225
t = 0;
226
if (!(variable = cxvariable(cx, s, NiL, disc)))
227
goto bad;
228
if (rskey(state->sortkey, t ? t : cxisnumber(variable->type) ? num : "", 0))
229
goto bad;
230
state->sortkey->tail->user = variable;
231
n = 1;
232
}
233
if (uniq)
234
{
235
state->sortkey->type &= ~RS_DATA;
236
state->sortkey->type |= RS_UNIQ;
237
}
238
if (state->count)
239
{
240
state->sortdisc.events |= RS_WRITE;
241
state->sortdisc.eventf = count;
242
if (!(state->uniqstack = stkopen(0)))
243
goto bad;
244
state->uniqbase = stkptr(state->uniqstack, 0);
245
if (!(state->uniqkey = rskeyopen(&state->keydisc, &state->uniqdisc)))
246
goto bad;
247
if (!(state->uniq = rsnew(state->uniqkey->disc)))
248
goto bad;
249
if (rskey(state->uniqkey, "1n", 0))
250
goto bad;
251
if (rskeyinit(state->uniqkey))
252
{
253
if (disc->errorf)
254
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq key error");
255
goto bad;
256
}
257
if (rsinit(state->uniq, state->uniqkey->meth, state->uniqkey->procsize, state->uniqkey->type, state->uniqkey))
258
{
259
if (disc->errorf)
260
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq initialization error");
261
goto bad;
262
}
263
}
264
if (rskeyinit(state->sortkey))
265
{
266
if (disc->errorf)
267
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort key error");
268
goto bad;
269
}
270
if (n)
271
{
272
state->sortdisc.defkeyf = key;
273
state->sortdisc.key = 1;
274
}
275
else if (state->sortkey->head->rflag)
276
{
277
state->sortdisc.defkeyf = rev;
278
state->sortdisc.key = 1;
279
}
280
if (rsinit(state->sort, state->sortkey->meth, state->sortkey->procsize, state->sortkey->type, state->sortkey))
281
{
282
if (disc->errorf)
283
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort initialization error");
284
goto bad;
285
}
286
state->cx = cx;
287
state->op = expr->op;
288
expr->data = state;
289
return 0;
290
bad:
291
if (state->sort)
292
rsclose(state->sort);
293
if (state->uniq)
294
rsclose(state->uniq);
295
if (state->sortkey)
296
rskeyclose(state->sortkey);
297
if (state->uniqkey)
298
rskeyclose(state->uniqkey);
299
if (state->file)
300
dssfclose(state->file);
301
else if (state->sortstack)
302
stkclose(state->sortstack);
303
if (state->uniqstack)
304
stkclose(state->uniqstack);
305
if (state->tmp)
306
sfstrclose(state->tmp);
307
vmclose(state->vm);
308
return -1;
309
}
310
311
static int
312
sort_act(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
313
{
314
State_t* state = (State_t*)expr->data;
315
Dssrecord_t* record = (Dssrecord_t*)data;
316
char* s;
317
ssize_t n;
318
319
if (dssfwrite(state->file, record))
320
return -1;
321
n = stktell(state->file->io);
322
s = stkfreeze(state->file->io, 0);
323
state->data = data;
324
if (rsprocess(state->sort, s, -n) <= 0)
325
{
326
if (disc->errorf)
327
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort record process error");
328
return -1;
329
}
330
return 0;
331
}
332
333
static int
334
sort_end(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
335
{
336
State_t* state = (State_t*)expr->data;
337
int r;
338
339
r = 0;
340
if (rswrite(state->sort, expr->op, RS_OTEXT))
341
{
342
if (disc->errorf)
343
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort output error");
344
r = -1;
345
}
346
while (rsdisc(state->sort, NiL, RS_POP));
347
if (rsclose(state->sort))
348
{
349
if (disc->errorf)
350
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort close error");
351
r = -1;
352
}
353
if (rskeyclose(state->sortkey))
354
{
355
if (disc->errorf)
356
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort key error");
357
r = -1;
358
}
359
if (state->uniq)
360
{
361
if (rswrite(state->uniq, expr->op, RS_OTEXT))
362
{
363
if (disc->errorf)
364
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq output error");
365
r = -1;
366
}
367
while (rsdisc(state->uniq, NiL, RS_POP));
368
if (rsclose(state->uniq))
369
{
370
if (disc->errorf)
371
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq close error");
372
r = -1;
373
}
374
if (rskeyclose(state->uniqkey))
375
{
376
if (disc->errorf)
377
(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq key error");
378
r = -1;
379
}
380
stkclose(state->uniqstack);
381
}
382
dssfclose(state->file);
383
sfstrclose(state->tmp);
384
vmclose(state->vm);
385
return r;
386
}
387
388
static Cxquery_t queries[] =
389
{
390
{
391
"sort",
392
"sort records to the standard output",
393
CXH,
394
sort_beg,
395
0,
396
sort_act,
397
sort_end
398
},
399
{0}
400
};
401
402
Dsslib_t dss_lib_sort =
403
{
404
"sort",
405
"sort query"
406
"[-1lms5P?\n@(#)$Id: dss sort query (AT&T Research) 2011-10-18 $\n]"
407
USAGE_LICENSE,
408
CXH,
409
0,
410
0,
411
0,
412
0,
413
0,
414
0,
415
&queries[0]
416
};
417
418