Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/cmd/sortlib/sync/sync.c
1810 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 2003-2011 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Glenn Fowler <[email protected]> *
18
* *
19
***********************************************************************/
20
#pragma prototyped
21
22
/*
23
* ibm dfsort discipline
24
*/
25
26
static const char usage[] =
27
"[-1lp0s5P?\n@(#)$Id: dfsort (AT&T Research) 2007-01-25 $\n]"
28
USAGE_LICENSE
29
"[+PLUGIN?sync - IBM dfsort discipline]"
30
"[+DESCRIPTION?The \bsync\b \bsort\b(1) discipline applies an IBM \bDFSORT\b"
31
" control file to the input data. Command line keys are overidden"
32
" by the control file. Auxilliary output files must be named by"
33
" \bout\b\aid\a=\apath\a options.]"
34
"[+?User callout functions (\aexits\a in IBM parlance) must be linked in"
35
" DLLs or shared libraries. If the control file library is not found"
36
" then the name is treated as an environment variable and searched for"
37
" again. If the function \brs_intercept\b exists then it is used as a"
38
" wrapper for the callouts:"
39
" rs_intercept(\acallout\a,Rsobj_t*rec,Rsobj_t*dup,void**state),"
40
" otherwise the callout is called directly:"
41
" \acallout\a(Rsobj_t*rec,Rsobj_t*dup,void**state)."
42
" \bRsobj_t\b and callout return values are defined in \b<recsort.h>\b"
43
" and described in \brecsort\b(3). The callout arguments are:]{"
44
" [+Rsobj_t* rec?The current record.]"
45
" [+Rsobj_t* dup?The record comparing equal to \arec\a just before"
46
" it is discarded.]"
47
" [+void** state?User defined state, initialized to 0 before the first"
48
" callout. The same \astate\a is passed to all callouts.]"
49
"}"
50
"[+?The callout return values are:]{"
51
" [+RS_TERMINATE?Terminate the sort and exit with non-zero exit status.]"
52
" [+RS_DELETE?Delete \arec\a.]"
53
" [+RS_ACCEPT?Accept the possibly modified \arec\a.]"
54
" [+RS_INSERT?Insert a new record pointed to by \arec\a.]"
55
"}"
56
"[C:codeset?The data codeset is \acodeset\a. The codesets"
57
" are:]:[codeset]{\fcodesets\f}"
58
"[c:control?Specifies the control file path name. Control file details may be"
59
" found in the IBM \bDFSORT\b documentation. The control file is read"
60
" as an 80 column punched deck. If no control file is specified then"
61
" the standard input is read.]:[path]"
62
"[d:duplicates?Print a message to the standard error containing the number"
63
" of records with duplicate keys.]"
64
"[j:junk?Print to \afile\a the number of non-SUM field byte differences"
65
" between retained and discarded duplicate records. Each line in the"
66
" report is a field byte offset followed by the number of differences"
67
" for that offset.]:[file]"
68
"[l:list?List control file information on the standard output and exit.]"
69
"[o:out*?\bout\b\aid\a=\apath\a assigns \apath\a to the auxiliary output"
70
" file \aid\a. A leading \b-\b or \b_\b in \aid\a is ignored. File"
71
" paths may also be assigned by exporting \bSORTOF\b\aid\a=\apath\a;"
72
" \b--out\b takes precedence. Unassigned auxiliary output files are"
73
" silently ignored.]:[path]"
74
"[R:reclen|lrecl?Sets the fixed record length to \areclen\a.]#[reclen]"
75
"[+EXAMPLES]{"
76
" [+sort -lsync,control=xyz.ss,out02=out.2?Sorts using the"
77
" control file \bxyz.ss\b and places auxiliary file"
78
" \b02\b in \bout.2\b.]"
79
"}"
80
"[+SEE ALSO?\bsort\b(1), \bDFSORT\b(IBM), \brecsort\b(3)]"
81
"\n\n--library=sync[,option[=value]...]\n\n"
82
;
83
84
#include <ast.h>
85
#include <ctype.h>
86
#include <ccode.h>
87
#include <dirent.h>
88
#include <error.h>
89
#include <recsort.h>
90
#include <ss.h>
91
92
#define CALLOUT(s,f,r,d) ((s)->intercept ? (*(s)->intercept)(f, r, d, &(s)->exitstate) : (*f)(r, d, &(s)->exitstate))
93
94
typedef struct State_s
95
{
96
Rsdisc_t disc;
97
Ss_t* ss;
98
Sfio_t* junk;
99
Sfulong_t* junkcount;
100
size_t junksize;
101
Ssfile_t in;
102
Sfulong_t dupcount;
103
int dups;
104
char tmp[1];
105
} State_t;
106
107
/*
108
* record/report junk dup bytes
109
*/
110
111
static void
112
junk(State_t* state, Rsobj_t* r)
113
{
114
register size_t i;
115
register size_t k;
116
register size_t n;
117
register unsigned char* b;
118
register unsigned char* s;
119
register unsigned char* t;
120
register Ssfield_t* f;
121
register Sfulong_t* z;
122
123
n = state->junksize;
124
z = state->junkcount;
125
if (r)
126
{
127
b = r->data;
128
if (n > r->datalen)
129
n = r->datalen;
130
for (r = r->equal; r; r = r->right)
131
{
132
s = b;
133
t = r->data;
134
for (i = 0; i < n; i++)
135
if (s[i] != t[i])
136
z[i]++;
137
}
138
}
139
else
140
{
141
for (f = state->ss->sum; f; f = f->next)
142
for (i = f->offset - 1, k = f->offset + f->size; i < k; i++)
143
z[i] = 0;
144
for (i = 0; i < n; i++)
145
if (z[i])
146
sfprintf(state->junk, "%4u %8I*u\n", i + 1, sizeof(z[i]), z[i]);
147
sfclose(state->junk);
148
}
149
}
150
151
static int
152
dfsort(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
153
{
154
State_t* state = (State_t*)disc;
155
Ss_t* ss = state->ss;
156
Ssfile_t* fp;
157
Ssfile_t* save;
158
Rsobj_t* rp;
159
Rsobj_t* ep;
160
ssize_t size;
161
int hit;
162
int c;
163
164
switch (op)
165
{
166
case RS_OPEN:
167
if ((rs->type & RS_IGNORE) && (disc->events & (RS_SUMMARY|RS_WRITE)))
168
rs->type &= ~RS_IGNORE;
169
if (ssannounce(ss, rs))
170
return -1;
171
return ss->initexit ? CALLOUT(ss, ss->initexit, NiL, NiL) : 0;
172
case RS_POP:
173
if (state->junk)
174
junk(state, NiL);
175
if (state->dups && state->dupcount && ss->disc->errorf)
176
(*ss->disc->errorf)(NiL, ss->disc, 0, "%I*u duplicate key%s", sizeof(state->dupcount), state->dupcount, state->dupcount == 1 ? "" : "s");
177
if (!ss->doneexit)
178
c = 0;
179
else
180
c = CALLOUT(ss, ss->doneexit, NiL, NiL);
181
return ssclose(ss) ? -1 : c;
182
case RS_READ:
183
if (ss->skip)
184
{
185
ss->skip--;
186
return RS_DELETE;
187
}
188
if (ss->stop == 1)
189
return RS_DELETE;
190
rp = (Rsobj_t*)data;
191
if (ss->expr && (sseval(ss, ss->expr, (char*)rp->data, rp->datalen) > 0) == ss->omit)
192
return RS_DELETE;
193
fp = ss->file;
194
if (!ss->readexit)
195
c = (ss->copy || !fp->next && (rs->type & RS_IGNORE)) ? RS_DELETE : RS_ACCEPT;
196
else if ((c = CALLOUT(ss, ss->readexit, rp, NiL)) < 0 || c == RS_DELETE)
197
return c;
198
if (ss->stop)
199
ss->stop--;
200
if (ss->in)
201
{
202
if ((size = sscopy(ss, &state->in, (char*)rp->data, rp->datalen, state->tmp, ss->insize)) < 0)
203
return -1;
204
rp->data = (unsigned char*)state->tmp;
205
rp->datalen = size;
206
}
207
if (ss->copy)
208
{
209
if (fp->group && fp->group->io && sswrite(ss, fp, (char*)rp->data, rp->datalen) < 0)
210
return -1;
211
break;
212
}
213
return c;
214
case RS_SUMMARY:
215
rp = (Rsobj_t*)data;
216
state->dupcount++;
217
if (!ss->summaryexit)
218
c = RS_ACCEPT;
219
else if ((c = CALLOUT(ss, ss->summaryexit, rp, rp->equal)) < 0 || c == RS_DELETE)
220
return c;
221
if (ss->sum)
222
for (ep = rp->equal; ep; ep = ep->right)
223
if (sssum(ss, ss->sum, (char*)ep->data, rp->datalen, (char*)rp->data))
224
return -1;
225
if (state->junk)
226
junk(state, rp);
227
return c;
228
case RS_WRITE:
229
ep = (Rsobj_t*)data;
230
if (!ss->writeexit)
231
{
232
if (state->dups && (rp = ep->equal))
233
do state->dupcount++; while (rp = rp->right);
234
c = RS_ACCEPT;
235
}
236
else if ((c = CALLOUT(ss, ss->writeexit, ep, NiL)) < 0 || c == RS_DELETE)
237
return c;
238
rp = (Rsobj_t*)arg;
239
fp = ss->file;
240
if (!fp->group->io)
241
size = 0;
242
else if ((size = sscopy(ss, fp, (char*)ep->data, ep->datalen, (char*)rp->data, rp->datalen)) < 0)
243
return -1;
244
if (size > rp->datalen || ss->copy || !fp->next)
245
{
246
rp->datalen = size;
247
return c;
248
}
249
rp->datalen = size;
250
break;
251
default:
252
return -1;
253
}
254
hit = 0;
255
save = 0;
256
while (fp = fp->next)
257
if (fp->group)
258
{
259
if (!fp->expr || (sseval(ss, fp->expr, (char*)rp->data, rp->datalen) > 0) != fp->omit)
260
{
261
hit = 1;
262
if (sswrite(ss, fp, (char*)rp->data, rp->datalen) < 0)
263
return -1;
264
}
265
else if (fp->save)
266
save = fp;
267
}
268
if (save && !hit && sswrite(ss, save, (char*)rp->data, rp->datalen) < 0)
269
return -1;
270
return c;
271
}
272
273
typedef struct Suf_s
274
{
275
char* base;
276
char* suff;
277
} Suf_t;
278
279
static int
280
checkmark(Ss_t* ss, char** v, Ssdisc_t* ssdisc)
281
{
282
char** b;
283
char* s;
284
char* t;
285
char* z;
286
size_t i;
287
size_t j;
288
size_t k;
289
size_t m;
290
size_t n;
291
DIR* dp;
292
struct dirent* ep;
293
Suf_t* sp;
294
295
for (b = v; *b; b++);
296
if (n = b - v)
297
{
298
if (!(sp = newof(0, Suf_t, n, 0)))
299
goto bad;
300
for (i = 0; i < n; i++)
301
{
302
if (s = strrchr(v[i], '/'))
303
s++;
304
else
305
s = v[i];
306
if (!strchr(s, '%'))
307
{
308
sp[i].base = s;
309
sp[i].suff = strrchr(s, '.');
310
}
311
}
312
i = 0;
313
for (;;)
314
{
315
while (i < n && !sp[i].base)
316
i++;
317
if (i >= n)
318
break;
319
if (sp[i].base == v[i])
320
dp = opendir(".");
321
else if (sp[i].base == v[i] + 1)
322
dp = opendir("/");
323
else
324
{
325
*(sp[i].base - 1) = 0;
326
dp = opendir(v[i]);
327
*(sp[i].base - 1) = '/';
328
}
329
k = sp[i].base - v[i];
330
if (dp)
331
while (ep = readdir(dp))
332
if (s = strchr(ep->d_name, '%'))
333
{
334
m = s - ep->d_name;
335
z = strrchr(s, '.');
336
for (j = i; j < n; j++)
337
{
338
if (sp[j].base && (sp[j].base - v[j]) == k && (!k || !memcmp(v[i], v[j], k)) && !memcmp(ep->d_name, sp[j].base, m) && (!sp[j].suff || (sp[j].suff - sp[j].base) < m || z && !strcmp(z, sp[j].suff)))
339
{
340
if (v[j] == sp[j].base)
341
t = ep->d_name;
342
else
343
t = sfprints("%-.*s%s", sp[j].base - v[j], v[j], ep->d_name);
344
if (!(t = strdup(t)))
345
{
346
closedir(dp);
347
goto bad;
348
}
349
v[j] = t;
350
sp[j].base = 0;
351
}
352
}
353
}
354
for (j = i; j < n; j++)
355
if (sp[j].base && (sp[j].base - v[j]) == k && (!k || !memcmp(v[i], v[j], k)))
356
sp[j].base = 0;
357
if (dp)
358
closedir(dp);
359
}
360
free(sp);
361
}
362
return 0;
363
bad:
364
if (sp)
365
free(sp);
366
if (ssdisc->errorf)
367
(*ssdisc->errorf)(NiL, ssdisc, 2, "out of space");
368
return -1;
369
}
370
371
Rsdisc_t*
372
rs_disc(Rskey_t* key, const char* options)
373
{
374
State_t* state;
375
Ss_t* ss;
376
Ssfield_t* dp;
377
char* s;
378
char* t;
379
char* u;
380
char* p;
381
char* junk;
382
char** v;
383
int n;
384
int m;
385
int list;
386
int dups;
387
Recfmt_t f;
388
unsigned long events;
389
Ssdisc_t* ssdisc;
390
391
if (!(ssdisc = newof(0, Ssdisc_t, 1, 0)))
392
{
393
if (key->keydisc->errorf)
394
(*key->keydisc->errorf)(NiL, key->keydisc, ERROR_SYSTEM|2, "out of space");
395
return 0;
396
}
397
ssinit(ssdisc, key->keydisc->errorf);
398
ssdisc->code = key->code;
399
events = RS_OPEN|RS_POP;
400
dups = 0;
401
junk = 0;
402
list = 0;
403
ss = 0;
404
if (options)
405
{
406
for (;;)
407
{
408
switch (optstr(options, usage))
409
{
410
case 0:
411
break;
412
case 'C':
413
if ((ssdisc->code = ccmapid(opt_info.arg)) < 0)
414
{
415
if (ssdisc->errorf)
416
(*ssdisc->errorf)(NiL, ssdisc, 2, "%s: invalid codeset name", opt_info.arg);
417
goto drop;
418
}
419
continue;
420
case 'c':
421
if (!(ss = ssopen(opt_info.arg, ssdisc)))
422
goto drop;
423
continue;
424
case 'd':
425
dups = 1;
426
continue;
427
case 'j':
428
junk = opt_info.arg;
429
continue;
430
case 'l':
431
list = 1;
432
continue;
433
case 'o':
434
if (ssdd(opt_info.name, opt_info.arg, ssdisc))
435
goto drop;
436
continue;
437
case 'R':
438
if (opt_info.num != key->fixed && key->fixed)
439
{
440
if (ssdisc->errorf)
441
(*ssdisc->errorf)(NiL, ssdisc, 2, "%d: fixed record length mismatch -- %d expected", (int)opt_info.num, key->fixed);
442
goto drop;
443
}
444
key->fixed = opt_info.num;
445
continue;
446
case '?':
447
error(ERROR_USAGE|4, "%s", opt_info.arg);
448
goto drop;
449
case ':':
450
error(2, "%s", opt_info.arg);
451
goto drop;
452
}
453
break;
454
}
455
}
456
if (!ss && !(ss = ssopen(NiL, ssdisc)))
457
goto drop;
458
if (ss->merge)
459
{
460
key->merge = 1;
461
if (!key->input[0] && (u = getenv("DDIN")))
462
{
463
s = u;
464
n = 1;
465
m = 0;
466
for (;;)
467
{
468
while (*s == ' ')
469
s++;
470
if (!(t = strchr(s, ' ')))
471
t = s + strlen(s);
472
if (strneq(s, SS_DD_IN, sizeof(SS_DD_IN) - 1) && (p = getenv(sfprints("%-.*s", t - s, s))))
473
{
474
n++;
475
m += strlen(p) + 1;
476
}
477
if (!*t)
478
break;
479
s = t + 1;
480
}
481
if (!(v = vmnewof(ss->vm, 0, char*, n, m)))
482
{
483
if (ssdisc->errorf)
484
(*ssdisc->errorf)(NiL, ssdisc, ERROR_SYSTEM|2, "out of space");
485
goto drop;
486
}
487
s = u;
488
u = (char*)(v + n);
489
n = 0;
490
for (;;)
491
{
492
while (*s == ' ')
493
s++;
494
if (!(t = strchr(s, ' ')))
495
t = s + strlen(s);
496
if (strneq(s, SS_DD_IN, sizeof(SS_DD_IN) - 1) && (p = getenv(sfprints("%-.*s", t - s, s))))
497
{
498
v[n++] = u;
499
u = strcopy(u, p) + 1;
500
}
501
if (!*t)
502
break;
503
s = t + 1;
504
}
505
v[n] = 0;
506
key->input = v;
507
}
508
}
509
if (checkmark(ss, key->input, ssdisc))
510
goto drop;
511
if (key->input[0] && strmatch(key->input[0], SS_MARKED))
512
{
513
ss->mark = 1;
514
if ((s = strrchr(key->input[0], '%')) && (s = strchr(s, '.')))
515
ss->suffix = s;
516
}
517
if (ss->size)
518
ss->format = REC_F_TYPE(ss->size);
519
else if (key->fixed)
520
{
521
ss->size = key->fixed;
522
ss->format = REC_F_TYPE(ss->size);
523
}
524
else
525
{
526
p = 0;
527
ss->format = REC_N_TYPE();
528
for (v = key->input; s = *v; v++)
529
if ((t = strrchr(s, '%')) && !strchr(t, '/'))
530
{
531
ss->mark = 1;
532
f = recstr(t + 1, &u);
533
if (f != ss->format && p && (RECTYPE(f) != REC_variable || RECTYPE(ss->format) != REC_variable || REC_V_ATTRIBUTES(f) != REC_V_ATTRIBUTES(ss->format)))
534
{
535
if (ssdisc->errorf)
536
(*ssdisc->errorf)(NiL, ssdisc, 2, "%s: format %s incompatible with %s format %s", s, fmtrec(f, 0), p, fmtrec(ss->format, 0));
537
goto drop;
538
}
539
p = s;
540
if (RECTYPE(f) != REC_variable || RECTYPE(ss->format) != REC_variable || REC_V_SIZE(ss->format) < REC_V_SIZE(f))
541
ss->format = f;
542
if (!ss->suffix && *u == '.')
543
ss->suffix = u;
544
}
545
}
546
if (!ss->suffix)
547
ss->suffix = "";
548
if ((n = ssio(ss, list)) < 0)
549
goto drop;
550
if (ss->mark)
551
{
552
if (ss->file->format == REC_N_TYPE())
553
ss->file->format = ss->format;
554
if (s = key->output)
555
{
556
if ((t = strrchr(s, '%')) && !strchr(t, '/'))
557
{
558
f = recstr(t + 1, NiL);
559
if (ss->file->format != REC_N_TYPE() && f != ss->file->format && !ss->in && !ss->file->out && (RECTYPE(f) != REC_variable || RECTYPE(ss->file->format) != REC_variable || REC_V_ATTRIBUTES(f) != REC_V_ATTRIBUTES(ss->file->format)))
560
{
561
if (ssdisc->errorf)
562
(*ssdisc->errorf)(NiL, ssdisc, 2, "%s: format %s incompatible with %s format %s", s, fmtrec(f, 0), p, fmtrec(ss->file->format, 0));
563
goto drop;
564
}
565
}
566
else if (ss->file->format != REC_N_TYPE() && !strmatch(s, "/dev/*"))
567
{
568
if ((t = strrchr(s, '.')) && strmatch(t, SS_SUFFIX))
569
s = sfprints("%-.*s%%%s%s", t - s, s, fmtrec(ss->format, 1), t);
570
else
571
s = sfprints("%s%%%s%s", s, fmtrec(ss->format, 1), ss->suffix);
572
if (!(key->output = vmstrdup(ss->vm, s)))
573
{
574
if (ss->disc->errorf)
575
(*ss->disc->errorf)(NiL, ss->disc, ERROR_SYSTEM|2, "out of space");
576
goto drop;
577
}
578
}
579
}
580
}
581
if (ss->copy && !ss->expr && !ss->file->out && !ss->file->next)
582
{
583
key->type |= RS_CAT;
584
key->merge = 0;
585
key->meth = Rscopy;
586
}
587
if (ss->initexit)
588
events |= RS_OPEN;
589
if (ss->expr || ss->copy || ss->readexit || ss->in || ss->skip || ss->stop)
590
events |= RS_READ;
591
if (ss->file->out || n && !ss->copy || ss->writeexit || ss->file->format != ss->format)
592
events |= RS_WRITE;
593
if (list)
594
{
595
sslist(ss, sfstdout);
596
exit(0);
597
}
598
if (!(state = vmnewof(ss->vm, 0, State_t, 1, ss->insize)))
599
{
600
if (ssdisc->errorf)
601
(*ssdisc->errorf)(NiL, ssdisc, ERROR_SYSTEM|2, "out of space");
602
goto drop;
603
}
604
if (ss->in)
605
{
606
state->in.out = ss->in;
607
state->in.size = ss->insize;
608
}
609
if (ss->sum || ss->uniq)
610
{
611
key->type |= RS_UNIQ;
612
if (ss->sum)
613
events |= RS_SUMMARY;
614
}
615
else if (state->dups = dups)
616
events |= RS_WRITE;
617
switch (ss->stable)
618
{
619
case 'N':
620
key->type |= RS_DATA;
621
break;
622
default:
623
key->type &= ~RS_DATA;
624
break;
625
}
626
if (!ss->type)
627
ss->type = 'F';
628
switch (ss->type)
629
{
630
case 'D':
631
break;
632
case 'F':
633
if ((s = sskey(ss, NiL)) && rskey(key, s, 0))
634
goto drop;
635
break;
636
case 'V':
637
case 'B':
638
key->disc->data = recstr("v", NiL);
639
break;
640
}
641
for (dp = ss->sort; dp; dp = dp->next)
642
if ((s = sskey(ss, dp)) && rskey(key, s, 0))
643
goto drop;
644
if (junk)
645
{
646
if (!(state->junksize = ss->size))
647
state->junksize = 64;
648
if (!(state->junkcount = vmnewof(ss->vm, 0, Sfulong_t, state->junksize, 0)))
649
{
650
if (ssdisc->errorf)
651
(*ssdisc->errorf)(NiL, ssdisc, ERROR_SYSTEM|2, "out of space");
652
goto drop;
653
}
654
if (!(state->junk = sfopen(NiL, junk, "w")))
655
{
656
if (ssdisc->errorf)
657
(*ssdisc->errorf)(NiL, ssdisc, ERROR_SYSTEM|2, "%s: cannot write", junk);
658
goto drop;
659
}
660
}
661
state->ss = ss;
662
state->disc.eventf = dfsort;
663
state->disc.events = events;
664
return &state->disc;
665
drop:
666
if (ss)
667
ssclose(ss);
668
if (ssdisc)
669
free(ssdisc);
670
return 0;
671
}
672
673
SORTLIB(sync)
674
675