Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/cmd/sort/main.c
1808 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 1996-2011 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Glenn Fowler <[email protected]> *
18
* Phong Vo <[email protected]> *
19
* Doug McIlroy <[email protected]> *
20
* *
21
***********************************************************************/
22
#pragma prototyped
23
24
/*
25
* sort main
26
*
27
* algorithm and interface
28
*
29
* Glenn Fowler
30
* Phong Vo
31
* AT&T Research
32
*
33
* key coders
34
*
35
* Doug McIlroy
36
* Bell Laboratories
37
*/
38
39
static const char usage[] =
40
"[-n?\n@(#)$Id: sort (AT&T Research) 2010-08-11 $\n]"
41
USAGE_LICENSE
42
"[+NAME?sort - sort and/or merge files]"
43
"[+DESCRIPTION?\bsort\b sorts lines of all the \afiles\a together and "
44
"writes the result on the standard output. The file name \b-\b means the "
45
"standard input. If no files are named, the standard input is sorted.]"
46
"[+?The default sort key is an entire line. Default ordering is "
47
"lexicographic by bytes in machine collating sequence. The ordering is "
48
"affected globally by the locale and/or the following options, one or "
49
"more of which may appear. See \brecsort\b(3) for details.]"
50
"[+?For backwards compatibility the \b-o\b option is allowed in any file "
51
"operand position when neither the \b-c\b nor the \b--\b options are "
52
"specified.]"
53
"[k:key?Restrict the sort key to a string beginning at \apos1\a and "
54
"ending at \apos2\a. \apos1\a and \apos2\a each have the form \am.n\a, "
55
"counting from 1, optionally followed by one or more of the flags "
56
"\bCMbdfginprZ\b; \bm\b counts fields from the beginning of the line and "
57
"\bn\b counts characters from the beginning of the field. If any flags "
58
"are present they override all the global ordering options for this key. "
59
"If \a.n\a is missing from \apos1\a, it is taken to be 1; if missing "
60
"from \apos2\a, it is taken to be the end of the field. If \apos2\a is "
61
"missing, it is taken to be end of line. The second form specifies a "
62
"fixed record length \areclen\a, and the last form specifies a fixed "
63
"field at byte position \aposition\a (counting from 1) of \alength\a "
64
"bytes. The obsolescent \breclen:fieldlen:offset\b (byte offset from 0) "
65
"is also accepted.]:[pos1[,pos2]]|.reclen|.position.length]]]]]"
66
"[K:oldkey?Specified in pairs: \b-K\b \apos1\a \b-K\b \apos2\a, where "
67
"positions count from 0.]# [pos]"
68
"[R:record|recfmt?Sets the record format to \aformat\a; newlines will be "
69
"treated as normal characters. The formats are:]:[format]"
70
"{"
71
"[+d[\aterminator\a]]?Variable length with record \aterminator\a "
72
"character, \b\\n\b by default.]"
73
"[+[f]]\areclen\a?Fixed record length \areclen\a.]"
74
"[+v[op...]]?Variable length. \bh4o0z2bi\b (4 byte IBM V format "
75
"descriptor) if \aop\a are omitted. \aop\a may be a combination "
76
"of:]"
77
"{"
78
"[+h\an\a?Header size is \an\a bytes (default 4).]"
79
"[+o\an\a?Size offset in header is \an\a bytes (default "
80
"0).]"
81
"[+z\an\a?Size length is \an\a bytes (default "
82
"min(\bh\b-\bo\b,2)).]"
83
"[+b?Size is big-endian (default).]"
84
"[+l?Size is little-endian (default \bb\b).]"
85
"[+i?Record length includes header (default).]"
86
"[+n?Record length does not include header (default "
87
"\bi\b).]"
88
"}"
89
"[+%?If the record format is not otherwise specified, and the "
90
"any input file name, from left to right, ends with "
91
"\b%\b\aformat\a or \b%\b\aformat\a\b.\b* then the record format "
92
"is set to \aformat\a. In addition, the \b-o\b path, if "
93
"specified and if it does not contain \b%\b and if it names a "
94
"regular file, is renamed to contain the input \b%\b\aformat\a.]"
95
"[+-?The first block of the first input file is sampled to check "
96
"for \bv\b variable length and \bf\b fixed length format "
97
"records. Not all formats are detected. \bsort\b exits with an "
98
"error diagnostic if the record format cannot be determined from "
99
"the sample.]"
100
"}"
101
"[b:ignorespace|ignore-leading-blanks?Ignore leading white space (spaces "
102
"and tabs) in field comparisons.]"
103
"[d:dictionary?`Phone directory' order: only letters, digits and white "
104
"space are significant in string comparisons.]"
105
"[E:codeset|convert?The field data codeset is \acodeset\a or the field "
106
"data must be converted from the \afrom\a codeset to the \ato\a codeset. "
107
"The codesets are:]:[codeset|from::to]"
108
"{\fcodesets\f}"
109
"[f:fold|ignorecase?Fold lower case letters onto upper case.]"
110
"[h:scaled|human-readable?Compare numbers scaled with IEEE 1541-2002 "
111
"suffixes.]"
112
"[i:ignorecontrol?Ignore characters outside the ASCII range 040-0176 in "
113
"string comparisons.]"
114
"[J:shuffle|jumble?Do a random shuffle of the sort keys. \aseed\a "
115
"specifies a pseudo random number generator seed. A \aseed\a of 0 "
116
"generates a seed based on time and pid.]#[seed]"
117
"[n:numeric?An initial numeric string, consisting of optional white "
118
"space, optional sign, and a nonempty string of digits with optional "
119
"decimal point, is sorted by value.]"
120
"[g:floating?Numeric, like \b-n\b, with \be\b-style exponents allowed.]"
121
"[p:bcd|packed-decimal?Compare packed decimal (bcd) numbers with "
122
"trailing sign.]"
123
"[M:months?Compare as month names. The first three characters after "
124
"optional white space are folded to lower case and compared. Invalid "
125
"fields compare low to \bjan\b.]"
126
"[r:reverse|invert?Reverse the sense of comparisons.]"
127
"[t:tabs?`Tab character' separating fields is \achar\a.]:[tab-char]"
128
"[c:check?Check that the single input file is sorted according to the "
129
"ordering rules; give no output on the standard output. If the input "
130
"is out of sort then write one diagnostic line on the standard error "
131
"and exit with code \b1\b.]"
132
"[C:silent-check?Like \b--check\b except no diagnostic is written.]"
133
"[j:processes|nproc|jobs?Use up to \ajobs\a separate processes to sort "
134
"the input. The current implementation still uses one process for the "
135
"final merge phase; improvements are planned.]#[processes]"
136
"[m:merge?Merge; the input files are already sorted.]"
137
"[u:unique?Unique. Keep only the first of multiple records that compare "
138
"equal on all keys. Implies \b-s\b.]"
139
"[s:stable?Stable sort. When all keys compare equal, preserve input "
140
"order. The default is \b--nostable\b (\aunstable\a sort): when all "
141
"keys compare equal, break the tie by using the entire record, ignoring "
142
"all but the \b-r\b option.]"
143
"[o:output?Place output in the designated \afile\a instead of on the "
144
"standard output. This file may be the same as one of the inputs. The "
145
"\afile\a \b-\b names the standard output. The option may appear among "
146
"the file arguments, except after \b--\b.]:[output]"
147
"[l:library?Load the external sort discipline \alibrary\a with optional "
148
"comma separated \aname=value\a arguments. Libraries are loaded, in left "
149
"to right order, after the sort method has been "
150
"initialized.]:[library[,name=value...]]]"
151
"[T:tempdir?Put temporary files in \atempdir\a.]:[tempdir:=/usr/tmp]"
152
"[L:list?List the available sort methods. See the \b-x\b option.]"
153
"[x:method?Specify the sort method to apply:]:[method:=rasp]"
154
"{\fmethods\f} [v:verbose?Trace the sort progress on the standard "
155
"error.]"
156
"[P:plugins?List plugin information for each \afile\a operand in "
157
"--\astyle\a on the standard error. If no \afile\a operands are "
158
"specified then the first instance of each \bsort\b plugin installed on "
159
"\b$PATH\b or a sibling dir on \b$PATH\b is listed. The special "
160
"\astyle\a \blist\b lists a line on the standard output for each plugin "
161
"with the name, a tab character, and plugin specific command line "
162
"options parameterized by \b${style}\b (suitable for \beval\b'ing in "
163
"\bsh\b(1).)]:[style:=list|man|html|nroff|usage]"
164
"[Z:zd|zoned-decimal?Compare zoned decimal (ZD) numbers with embedded "
165
"trailing sign.]"
166
"[z:size|zip?Suggest using the specified number of bytes of internal "
167
"store to tune performance. Power of 2 and power of 10 size suffixes are "
168
"accepted. Type is a single character and may be one of:]:[type[size]]]"
169
"{"
170
"[+a?Buffer alignment.]"
171
"[+b?Input reserve buffer size.]"
172
"[+c?Input chunk size; sort chunks of this size and disable "
173
"merge.]"
174
"[+i?Input buffer size.]"
175
"[+m?Maximum number of intermediate merge files.]"
176
"[+p?Input sort size; sort chunks of this size before merge.]"
177
"[+o?Output buffer size.]"
178
"[+r?Maximum record size.]"
179
"[+I?Decompress the input if it is compressed.]"
180
"[+O?\bgzip\b(1) compress the output.]"
181
"}"
182
"[X:test?Enables implementation defined test code. Some or all of these "
183
"may be disabled.]:[test]"
184
"{"
185
"[+dump?List detailed information on the option settings.]"
186
"[+io?List io file paths.]"
187
"[+keys?List the canonical key for each record.]"
188
"[+read?Force input file read by disabling memory mapping.]"
189
"[+show?Show setup information and exit before sorting.]"
190
"[+test?Immediatly exit with status 0; used to verify this "
191
"implementation]"
192
"}"
193
"[D:debug?Sets the debug trace level. Higher levels produce more "
194
"output.]# [level]"
195
"[S|y?Equivalent to \b-zp\b\asize\a; if \asize\a has no suffix then \bki\b "
196
"is assumed.]:[size]"
197
198
"\n"
199
"\n[ file ... ]\n"
200
"\n"
201
202
"[+?+\apos1\a -\apos2\a is the classical alternative to \b-k\b, with "
203
"counting from 0 instead of 1, and pos2 designating next-after-last "
204
"instead of last character of the key. A missing character count in "
205
"\apos2\a means 0, which in turn excludes any \b-t\b tab character from "
206
"the end of the key. Thus +1 -1.3 is the same as \b-k\b 2,2.3 and +1r -3 "
207
"is the same as \b-k\b 2r,3.]"
208
"[+?Under option \b-t\b\ax\a fields are strings separated by \ax\a; "
209
"otherwise fields are non-empty strings separated by white space. White "
210
"space before a field is part of the field, except under option \b-b\b. "
211
"A \bb\b flag may be attached independently to \apos1\a and \apos2\a.]"
212
"[+?When there are multiple sort keys, later keys are compared only "
213
"after all earlier keys compare equal. Except under option \b-s\b, lines "
214
"with all keys equal are ordered with all bytes significant. \b-S\b "
215
"turns off \b-s\b, the last occurrence, left-to-right, takes affect.]"
216
"[+?Sorting is done by a method determined by the \b-x\b option. \b-L\b "
217
"lists the available methods. rasp (radix+splay-tree) is the default and "
218
"current all-around best.]"
219
"[+?Single-letter options may be combined into a single string, such as "
220
"\b-cnrt:\b. The option combination \b-di\b and the combination of "
221
"\b-n\b with any of \b-diM\b are improper. Posix argument conventions "
222
"are supported.]"
223
"[+?Options \b-b\b, \b-c\b, \b-d\b, \b-f\b, \b-i\b, \b-k\b, \b-m\b, "
224
"\b-n\b, \b-o\b, \b-r\b, \b-t\b, and \b-u\b are in the Posix and/or "
225
"X/Open standards.]"
226
227
"[+DIAGNOSTICS?\asort\a comments and exits with non-zero status for "
228
"various trouble conditions and for disorder discovered under option "
229
"\b-c\b.]"
230
"[+SEE ALSO?\bcomm\b(1), \bjoin\b(1), \buniq\b(1), \brecsort\b(3)]"
231
"[+CAVEATS?The never-documented default \apos1\a=0 for cases such as "
232
"\bsort -1\b has been abolished. An input file overwritten by \b-o\b is "
233
"not replaced until the entire output file is generated in the same "
234
"directory as the input, at which point the input is renamed.]"
235
;
236
237
#include <sfio_t.h>
238
#include <ast.h>
239
#include <error.h>
240
#include <ctype.h>
241
#include <fs3d.h>
242
#include <ls.h>
243
#include <option.h>
244
#include <recsort.h>
245
#include <recfmt.h>
246
#include <sfdcgzip.h>
247
#include <vmalloc.h>
248
#include <wait.h>
249
#include <iconv.h>
250
#include <dlldefs.h>
251
252
#define INMIN (1024) /* min input buffer size */
253
#define INBRK (64*INMIN) /* default heap increment */
254
#define INMAX (1024*INBRK) /* max input buffer size */
255
#define INREC (16*INMIN) /* record begin chunk size */
256
257
#define TEST_dump 0x80000000 /* dump the state before sort */
258
#define TEST_io 0x40000000 /* dump io files */
259
#define TEST_keys 0x20000000 /* dump keys */
260
#define TEST_read 0x10000000 /* force sfread() */
261
#define TEST_show 0x08000000 /* show but don't do */
262
#define TEST_reserve 0x04000000 /* force sfreserve() */
263
264
#define pathstdin(s) (!(s)||streq(s,"-")||streq(s,"/dev/stdin")||streq(s,"/dev/fd/0"))
265
#define pathstdout(s) (!(s)||streq(s,"-")||streq(s,"/dev/stdout")||streq(s,"/dev/fd/1"))
266
267
typedef struct Part_s
268
{
269
Sfdisc_t disc; /* sfio discipline */
270
off_t offset; /* file offset */
271
off_t size; /* total size at offset */
272
off_t remain; /* read size remaining */
273
} Part_t;
274
275
typedef struct Job_s
276
{
277
off_t offset; /* file part offset */
278
off_t size; /* file part size */
279
size_t chunk; /* file part chunk */
280
int intermediates; /* number of intermediate files */
281
} Job_t;
282
283
typedef struct Sort_s
284
{
285
Rskeydisc_t disc; /* rskey discipline */
286
Rs_t* rec; /* rsopen() context */
287
Rskey_t* key; /* rskeyopen() context */
288
Rsdefkey_f defkeyf; /* real defkeyf if TEST_keys */
289
Sfio_t* tp; /* TEST_keys tmp stream */
290
Sfio_t* op; /* output stream */
291
Job_t* jobs; /* multi-proc job table */
292
char* overwrite; /* -o input overwrite tmp file */
293
char* buf; /* input buffer */
294
Sfio_t* opened; /* fileopen() peek stream */
295
size_t cur; /* input buffer index */
296
size_t hit; /* input buffer index overflow */
297
size_t end; /* max input buffer index */
298
size_t bufsize; /* input reserve buffer size */
299
off_t total; /* total size of single file */
300
unsigned long test; /* test bit mask */
301
int child; /* in child process */
302
int chunk; /* chunk the input (no merge) */
303
int hadstdin; /* already has - on input */
304
int map; /* sfreserve() input */
305
int mfiles; /* multi-stage files[] count */
306
int nfiles; /* files[] count */
307
int xfiles; /* max files[] count */
308
int preserve; /* rename() tmp output to input */
309
int single; /* one input file */
310
int verbose; /* trace main actions */
311
int zip; /* sfdcgzip SF_* flags */
312
Sfio_t* files[OPEN_MAX > 68 ? 64 : (OPEN_MAX-4)];
313
} Sort_t;
314
315
/*
316
* optget() info discipline function
317
*/
318
319
static int
320
optinfo(Opt_t* op, Sfio_t* sp, const char* s, Optdisc_t* dp)
321
{
322
register iconv_list_t* ic;
323
register int n;
324
325
if (streq(s, "codesets"))
326
{
327
n = 0;
328
for (ic = iconv_list(NiL); ic; ic = iconv_list(ic))
329
if (ic->ccode >= 0)
330
n += sfprintf(sp, "[%c:%s?%s]", ic->match[ic->match[0] == '('], ic->name, ic->desc);
331
return n;
332
}
333
else if (streq(s, "methods"))
334
return rskeylist(NiL, sp, 1);
335
return 0;
336
}
337
338
/*
339
* handle RS_VERIFY event
340
*/
341
342
static int
343
verify(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
344
{
345
if (op == RS_VERIFY)
346
error(3, "disorder at record %lld", (Sflong_t)((Rsobj_t*)data)->order);
347
return 0;
348
}
349
350
static int
351
verify_silent(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
352
{
353
if (op == RS_VERIFY)
354
exit(1);
355
return 0;
356
}
357
358
/*
359
* return read stream for path
360
*/
361
362
static Sfio_t*
363
fileopen(register Sort_t* sp, const char* path)
364
{
365
Sfio_t* fp;
366
367
if (fp = sp->opened)
368
sp->opened = 0;
369
else
370
{
371
if (pathstdin(path))
372
{
373
if (sp->hadstdin)
374
error(3, "%s: can only read once", path);
375
sp->hadstdin = 1;
376
fp = sfstdin;
377
}
378
else if (!(fp = sfopen(NiL, path, "r")))
379
error(ERROR_SYSTEM|3, "%s: cannot open", path);
380
if (rsfileread(sp->rec, fp, path))
381
{
382
if (fp != sfstdin)
383
sfclose(fp);
384
return 0;
385
}
386
sfset(fp, SF_SHARE, 0);
387
if (sp->zip & SF_READ)
388
sfdcgzip(fp, 0);
389
}
390
return fp;
391
}
392
393
/*
394
* prevent ERROR_USAGE|4 messages from exiting
395
*/
396
397
static void
398
noexit(int code)
399
{
400
}
401
402
/*
403
* list info for one plugin on the standard error
404
*/
405
406
static void
407
showlib(Sort_t* sp, Rskey_t* kp, const char* name, const char* style)
408
{
409
char* args;
410
char buf[128];
411
412
if (style)
413
sfsprintf(args = buf, sizeof(buf), "%s,%s", name, style);
414
else
415
args = (char*)name;
416
if (!rslib(sp->rec, kp, args, RS_IGNORE) && !style)
417
sfprintf(sfstdout, "%s\t--library=%s,${style}\n", name, name);
418
}
419
420
/*
421
* list info for all [selected] plugins on the standard error
422
*/
423
424
static int
425
showplugins(Sort_t* sp, Rskey_t* kp, const char* style)
426
{
427
Dllscan_t* dls;
428
Dllent_t* dle;
429
char* name;
430
void (*oexit)(int);
431
432
if (streq(style, "list"))
433
style = 0;
434
else
435
{
436
oexit = error_info.exit;
437
error_info.exit = noexit;
438
}
439
if (*kp->input)
440
while (name = *kp->input++)
441
showlib(sp, kp, name, style);
442
else if (dls = dllsopen("sort", NiL, NiL))
443
{
444
while (dle = dllsread(dls))
445
showlib(sp, kp, dle->name, style);
446
dllsclose(dls);
447
}
448
if (style)
449
error_info.exit = oexit;
450
return 0;
451
}
452
453
struct Lib_s; typedef struct Lib_s Lib_t;
454
455
struct Lib_s
456
{
457
Lib_t* next;
458
char* name;
459
};
460
461
/*
462
* process argv as in sort(1)
463
*/
464
465
static int
466
parse(register Sort_t* sp, char** argv)
467
{
468
register Rskey_t* key = sp->key;
469
register int n;
470
register char* s;
471
char* e;
472
char* p;
473
char** a;
474
char** v;
475
size_t z;
476
int i;
477
int map;
478
char* plugins = 0;
479
Lib_t* firstlib = 0;
480
Lib_t* lastlib = 0;
481
Lib_t* lib;
482
Recfmt_t r;
483
int obsolescent = 1;
484
char opt[64];
485
Optdisc_t optdisc;
486
struct stat st;
487
488
optinit(&optdisc, optinfo);
489
for (;;)
490
{
491
switch (optget(argv, usage))
492
{
493
case 0:
494
break;
495
case 'c':
496
case 'C':
497
obsolescent = 0;
498
key->meth = Rsverify;
499
key->disc->events = RS_VERIFY;
500
key->disc->eventf = opt_info.option[1] == 'C' ? verify_silent : verify;
501
continue;
502
case 'E':
503
case 'J':
504
sfsprintf(opt, sizeof(opt), "%c%s", opt_info.option[1], opt_info.arg);
505
if (rskeyopt(key, opt, 1))
506
return 0;
507
continue;
508
case 'j':
509
key->nproc = opt_info.num;
510
continue;
511
case 'k':
512
if (rskey(key, opt_info.arg, 0))
513
return -1;
514
continue;
515
case 'l':
516
if (!(lib = newof(0, Lib_t, 1, 0)))
517
error(ERROR_SYSTEM|3, "out of space");
518
lib->name = opt_info.arg;
519
if (lastlib)
520
lastlib = lastlib->next = lib;
521
else
522
firstlib = lastlib = lib;
523
continue;
524
case 'm':
525
key->merge = !!opt_info.num;
526
continue;
527
case 'o':
528
key->output = opt_info.arg;
529
continue;
530
case 's':
531
if (opt_info.num)
532
key->type &= ~RS_DATA;
533
else
534
key->type |= RS_DATA;
535
continue;
536
case 't':
537
if (key->tab[0])
538
error(1, "%s: %s conflicts with %s", opt_info.option, *opt_info.arg, key->tab);
539
if ((n = mbsize(opt_info.arg)) < 1)
540
{
541
error(1, "%s: %s: invalid tab character", opt_info.option, opt_info.arg);
542
n = 0;
543
}
544
if (*(opt_info.arg + n) || n >= sizeof(key->tab))
545
error(1, "%s: %s: single character expected", opt_info.option, opt_info.arg);
546
memcpy(key->tab, opt_info.arg, n);
547
key->tab[n] = 0;
548
continue;
549
case 'u':
550
key->type &= ~RS_DATA;
551
key->type |= RS_UNIQ;
552
continue;
553
case 'v':
554
key->verbose = !!opt_info.num;
555
sp->verbose = key->verbose || (key->test & TEST_show);
556
continue;
557
case 'x':
558
if (!(key->meth = rskeymeth(key, opt_info.arg)))
559
error(2, "%s: unknown method", opt_info.arg);
560
continue;
561
case 'z':
562
if (isalpha(n = *(s = opt_info.arg)))
563
s++;
564
else
565
n = 'r';
566
size:
567
z = strton(s, &e, NiL, 1);
568
if (*e == '%')
569
{
570
error(2, "%s %c%s: %% not supported -- do you really want that much memory?", opt_info.option, n, s);
571
return -1;
572
}
573
if (*e || z < ((n == 'm' || n == 'o' || n == 'r' || isupper(n)) ? 0 : 512))
574
{
575
error(2, "%s %c%s: invalid size", opt_info.option, n, s);
576
return -1;
577
}
578
switch (n)
579
{
580
case 'a':
581
key->alignsize = z;
582
break;
583
case 'b':
584
sp->bufsize = z;
585
break;
586
case 'c':
587
sp->chunk = 1;
588
key->alignsize = key->insize = z;
589
break;
590
case 'i':
591
key->insize = z;
592
break;
593
case 'm':
594
if (z <= 0 || z > elementsof(sp->files))
595
z = elementsof(sp->files);
596
sp->xfiles = z;
597
break;
598
case 'p':
599
key->procsize = z;
600
break;
601
case 'o':
602
key->outsize = z;
603
break;
604
case 'r':
605
key->recsize = z;
606
break;
607
case 'I':
608
sp->zip |= SF_READ;
609
break;
610
case 'O':
611
sp->zip |= SF_WRITE;
612
break;
613
}
614
continue;
615
case 'D':
616
error_info.trace = -opt_info.num;
617
continue;
618
case 'K':
619
if (opt_info.offset)
620
{
621
opt_info.offset = 0;
622
opt_info.index++;
623
}
624
if (rskey(key, opt_info.arg, *opt_info.option))
625
return -1;
626
continue;
627
case 'L':
628
rskeylist(key, sfstdout, 0);
629
exit(0);
630
case 'P':
631
plugins = opt_info.arg;
632
continue;
633
case 'R':
634
key->disc->data = recstr(opt_info.arg, &e);
635
if (*e)
636
{
637
error(2, "%s: invalid record format", opt_info.arg);
638
return -1;
639
}
640
continue;
641
case 'S':
642
case 'y':
643
n = 'p';
644
s = opt_info.arg;
645
if (*s && *(e = s + strlen(s) - 1) != '%' && !isalpha(*e))
646
{
647
sfsprintf(opt, sizeof(opt), "%ski", s);
648
s = opt;
649
}
650
goto size;
651
case 'T':
652
pathtemp(NiL, 0, opt_info.arg, "/TMPPATH", NiL);
653
continue;
654
case 'X':
655
s = opt_info.arg;
656
opt_info.num = strton(s, &e, NiL, 1);
657
if (*e)
658
{
659
if (streq(s, "dump"))
660
opt_info.num = TEST_dump;
661
else if (streq(s, "io"))
662
opt_info.num = TEST_io;
663
else if (streq(s, "keys"))
664
opt_info.num = TEST_keys;
665
else if (streq(s, "read"))
666
opt_info.num = TEST_read;
667
else if (streq(s, "reserve"))
668
opt_info.num = TEST_reserve;
669
else if (streq(s, "show"))
670
opt_info.num = TEST_show;
671
else if (streq(s, "test"))
672
{
673
sfprintf(sfstdout, "ok\n");
674
exit(0);
675
}
676
else
677
error(1, "%s: unknown test", s);
678
}
679
if (*opt_info.option == '+')
680
key->test &= ~opt_info.num;
681
else
682
key->test |= opt_info.num;
683
sp->test = key->test;
684
sp->verbose = key->verbose || (key->test & TEST_show);
685
continue;
686
case '?':
687
error(ERROR_USAGE|4, "%s", opt_info.arg);
688
return 1;
689
case ':':
690
error(2, "%s", opt_info.arg);
691
return -1;
692
default:
693
opt[0] = opt_info.option[1];
694
opt[1] = 0;
695
if (rskeyopt(key, opt, 1))
696
return 0;
697
continue;
698
}
699
break;
700
}
701
argv += opt_info.index;
702
if (obsolescent && (opt_info.index <= 1 || !streq(*(argv - 1), "--")))
703
{
704
/*
705
* check for obsolescent `-o output' after first file operand
706
*/
707
708
a = v = argv;
709
while (s = *a++)
710
{
711
if (*s == '-' && *(s + 1) == 'o')
712
{
713
if (!*(s += 2) && !(s = *a++))
714
{
715
error(2, "-o: output argument expected");
716
break;
717
}
718
key->output = s;
719
}
720
else
721
*v++ = s;
722
}
723
*v = 0;
724
}
725
key->input = argv;
726
727
/*
728
* disciplines have the opportunity to modify key info
729
*/
730
731
while (lib = firstlib)
732
{
733
if (rslib(sp->rec, key, lib->name, 0))
734
return 1;
735
firstlib = firstlib->next;
736
free(lib);
737
}
738
739
/*
740
* plugins list bails early
741
*/
742
743
if (plugins)
744
exit(showplugins(sp, key, plugins));
745
746
/*
747
* record format chicanery
748
*/
749
750
if (map = RECTYPE(key->disc->data) == REC_method && REC_M_INDEX(key->disc->data) == REC_M_path)
751
for (n = 0, i = -1; p = key->input[n]; n++)
752
if (s = strrchr(p, '%'))
753
{
754
r = recstr(s + 1, &e);
755
if (!*e || *e == '.' && e > (s + 1))
756
{
757
if (r != key->disc->data && i >= 0 && (RECTYPE(r) != REC_variable || RECTYPE(key->disc->data) != REC_variable || REC_V_ATTRIBUTES(r) != REC_V_ATTRIBUTES(key->disc->data)))
758
{
759
error(2, "%s: format %s incompatible with %s format %s", p, fmtrec(r, 0), key->input[i], fmtrec(key->disc->data, 0));
760
return 1;
761
}
762
if (RECTYPE(r) != REC_variable || RECTYPE(key->disc->data) != REC_variable || REC_V_SIZE(key->disc->data) < REC_V_SIZE(r))
763
key->disc->data = r;
764
i = n;
765
}
766
}
767
if (RECTYPE(key->disc->data) == REC_method && ((n = REC_M_INDEX(key->disc->data)) == REC_M_path || n == REC_M_data))
768
{
769
if ((sp->opened = fileopen(sp, key->input[0])) && (s = sfreserve(sp->opened, SF_UNBOUND, SF_LOCKR)))
770
{
771
struct stat st;
772
773
z = sfvalue(sp->opened);
774
if (fstat(sffileno(sp->opened), &st) || st.st_size < z)
775
st.st_size = 0;
776
key->disc->data = recfmt(s, z, st.st_size);
777
sfread(sp->opened, s, 0);
778
}
779
else
780
{
781
z = sp->opened ? sfvalue(sp->opened) : -1;
782
key->disc->data = REC_N_TYPE();
783
}
784
if (z && key->disc->data == REC_N_TYPE())
785
error(3, "%s: record format cannot be determined from data sample", key->input[0]);
786
}
787
if (RECTYPE(key->disc->data) == REC_fixed)
788
key->fixed = REC_F_SIZE(key->disc->data);
789
if (map && key->output && key->disc->data != REC_N_TYPE() && (stat(key->output, &st) || S_ISREG(st.st_mode)))
790
{
791
if (p = strrchr(key->output, '/'))
792
s = p + 1;
793
else
794
s = key->output;
795
if (!strchr(s, '%'))
796
{
797
p = key->output;
798
if (!(e = strrchr(s, '.')))
799
e = s + strlen(s);
800
if (RECTYPE(key->disc->data) == REC_variable && !REC_V_SIZE(key->disc->data))
801
key->disc->data |= ((1<<15)-1);
802
if (!(s = strdup(sfprints("%-*.*s%%%s%s", e - p, e - p, p, fmtrec(key->disc->data, 1), e))))
803
error(ERROR_SYSTEM|3, "out of space");
804
key->output = s;
805
if (sp->verbose)
806
error(0, "%s rename output %s => %s", error_info.id, p, s);
807
}
808
}
809
if (sp->verbose)
810
error(0, "%s %s record format", error_info.id, fmtrec(key->disc->data, 0));
811
return error_info.errors != 0;
812
}
813
814
/*
815
* dump keys to stderr
816
*/
817
818
static ssize_t
819
dumpkey(Rs_t* rs, unsigned char* dat, size_t datlen, unsigned char* key, size_t keylen, Rsdisc_t* disc)
820
{
821
Sort_t* sp = (Sort_t*)RSKEYDISC(disc);
822
ssize_t n;
823
int i;
824
char buf[2];
825
826
if ((n = (*sp->defkeyf)(rs, dat, datlen, key, keylen, disc)) > 0)
827
{
828
buf[1] = 0;
829
for (i = 0; i < n; i++)
830
{
831
buf[0] = key[i];
832
sfputr(sp->tp, fmtesc(buf), -1);
833
}
834
sfprintf(sfstderr, "key: %s\n", sfstruse(sp->tp));
835
}
836
return n;
837
}
838
839
/*
840
* initialize sp from argv
841
*/
842
843
static int
844
init(register Sort_t* sp, Rskeydisc_t* dp, char** argv)
845
{
846
register Rskey_t* key;
847
register char* s;
848
register char** p;
849
char* t;
850
int n;
851
unsigned long x;
852
unsigned long z;
853
size_t fixed;
854
struct stat is;
855
struct stat os;
856
857
memset(sp, 0, sizeof(*sp));
858
sp->xfiles = elementsof(sp->files);
859
sfset(sfstdout, SF_SHARE, 0);
860
sfset(sfstderr, SF_SHARE, 0);
861
Vmdcsbrk->round = INBRK;
862
dp->version = RSKEY_VERSION;
863
dp->flags = 0;
864
dp->errorf = errorf;
865
if (!(sp->key = key = rskeyopen(dp, NiL)) || !(sp->rec = rsnew(key->disc)))
866
return -1;
867
z = key->insize = 2 * INMAX;
868
#if 0
869
if (conformance(0, 0))
870
#endif
871
key->type |= RS_DATA;
872
if ((n = strtol(astconf("PAGESIZE", NiL, NiL), &t, 0)) > 0 && !*t)
873
key->alignsize = n;
874
if ((n = parse(sp, argv)) || rskeyinit(key))
875
{
876
rskeyclose(key);
877
if (n < 0)
878
error(ERROR_USAGE|4, "%s", optusage(NiL));
879
return -1;
880
}
881
882
/*
883
* finalize the buffer dimensions
884
*/
885
886
if ((x = key->insize) != z)
887
z = 0;
888
if (x > INMAX)
889
x = INMAX;
890
else if (x < INMIN && !sp->chunk)
891
x = INMIN;
892
if (sp->single = !key->input[1])
893
{
894
if (!(sp->opened = fileopen(sp, key->input[0])))
895
error(ERROR_SYSTEM|3, "%s: cannot open", key->input[0]);
896
if (fstat(sffileno(sp->opened), &is))
897
error(ERROR_SYSTEM|3, "%s: cannot stat", key->input[0]);
898
if (!S_ISREG(is.st_mode) || sp->opened->disc) /* XXX: need sfio call to test if any disc pushed */
899
{
900
sp->total = 0;
901
sp->test |= TEST_read;
902
}
903
else if (x > (sp->total = is.st_size))
904
x = sp->total;
905
}
906
else
907
sp->test |= TEST_read;
908
if (sp->zip & SF_READ)
909
sp->test |= TEST_read;
910
fixed = key->fixed;
911
if ((sp->test & TEST_reserve) || !(sp->test & TEST_read))
912
{
913
sp->map = 1;
914
if (z)
915
x = key->procsize;
916
if (fixed)
917
x += fixed - x % fixed;
918
}
919
else
920
{
921
if (z)
922
x = key->procsize;
923
for (;;)
924
{
925
if (fixed)
926
x += fixed - x % fixed;
927
if (sp->buf = (char*)vmalign(Vmheap, x, key->alignsize))
928
break;
929
if ((x >>= 1) < INMIN)
930
error(ERROR_SYSTEM|3, "out of space");
931
}
932
sp->hit = x - key->alignsize;
933
}
934
if (sp->test & TEST_keys)
935
{
936
if (!key->disc->defkeyf)
937
error(2, "no key function to intercept");
938
else if (!(sp->tp = sfstropen()))
939
error(ERROR_SYSTEM|3, "out of space");
940
else
941
{
942
sp->defkeyf = key->disc->defkeyf;
943
key->disc->defkeyf = dumpkey;
944
}
945
}
946
if (key->nproc > 1)
947
{
948
off_t offset;
949
off_t total;
950
off_t size;
951
size_t chunk;
952
int i;
953
Job_t* jp;
954
955
if (!sp->map || pathstdin(key->input[0]))
956
{
957
uno:
958
key->nproc = 1;
959
}
960
else if ((n = (sp->total + key->procsize - 1) / (key->procsize)) <= 1)
961
goto uno;
962
else
963
{
964
if (n < key->nproc)
965
key->nproc = n;
966
else
967
n = key->nproc;
968
if (!(sp->jobs = vmnewof(Vmheap, 0, Job_t, n, 0)))
969
goto uno;
970
size = (sp->total + n - 1) / n;
971
if (fixed)
972
{
973
if (size % fixed)
974
size += fixed - size % fixed;
975
i = (size + x - 1) / x;
976
if (i * n > sp->xfiles)
977
{
978
error(1, "multi-process multi-stage not implemented; falling back to one processor");
979
goto uno;
980
}
981
chunk = size / i;
982
if (chunk % fixed)
983
chunk += fixed - chunk % fixed;
984
size = chunk * i;
985
offset = 0;
986
total = sp->total;
987
for (jp = sp->jobs; jp < sp->jobs + n; jp++)
988
{
989
jp->offset = offset;
990
if (size > total)
991
size = total;
992
total -= (jp->size = size);
993
jp->chunk = chunk;
994
jp->intermediates = i;
995
offset += size;
996
}
997
if (key->procsize > chunk)
998
key->procsize = chunk;
999
else
1000
{
1001
size = key->procsize;
1002
i = (chunk + size - 1) / size;
1003
size = chunk / i;
1004
if (size % fixed)
1005
size += fixed - size % fixed;
1006
key->procsize = size;
1007
}
1008
}
1009
else
1010
{
1011
register char* s;
1012
register char* t;
1013
register char* b;
1014
off_t ideal;
1015
off_t scan;
1016
char* file;
1017
Sfio_t* ip;
1018
1019
i = (size + x - 1) / x;
1020
if (i * n > sp->xfiles)
1021
{
1022
error(1, "multi-process multi-stage not implemented; falling back to one processor");
1023
goto uno;
1024
}
1025
chunk = (size + i - 1) / i;
1026
size = ideal = chunk * i;
1027
if ((scan = INREC) >= ideal)
1028
scan = (ideal / 32) * 4;
1029
offset = 0;
1030
total = sp->total;
1031
file = key->input[0];
1032
if (!(ip = fileopen(sp, file)))
1033
error(ERROR_SYSTEM|3, "%s: cannot read", file);
1034
for (jp = sp->jobs; jp < sp->jobs + n; jp++)
1035
{
1036
jp->offset = offset;
1037
if (((size = ideal) + scan) >= total)
1038
size = total;
1039
else
1040
{
1041
/*UNDENT...*/
1042
1043
/*
1044
* snoop around for the closest record boundary
1045
*/
1046
1047
size -= scan / 2;
1048
if (sfseek(ip, offset + size, SEEK_SET) != (offset + size))
1049
error(ERROR_SYSTEM|3, "%s: record boundary seek error at offset %lld", file, (Sflong_t)offset + size);
1050
if (!(b = (char*)sfreserve(ip, scan, 0)))
1051
error(ERROR_SYSTEM|3, "%s: record boundary read error at offset %lld", file, (Sflong_t)offset + size);
1052
s = t = b + scan / 2 - 1;
1053
while (*s++ != '\n')
1054
{
1055
if (t < b)
1056
{
1057
bigger:
1058
if (((size += scan) + offset) >= (total - scan))
1059
error(3, "%s: monster record at offset %lld", (Sflong_t)offset);
1060
if (sfseek(ip, offset + size, SEEK_SET) != (offset + size))
1061
error(ERROR_SYSTEM|3, "%s: record boundary input seek error at %lld", file, (Sflong_t)offset + size);
1062
if (!(b = (char*)sfreserve(ip, scan, 0)))
1063
error(ERROR_SYSTEM|3, "%s: record boundary read error at %lld", file, (Sflong_t)offset + size);
1064
t = (s = b) + scan;
1065
do
1066
{
1067
if (s >= t)
1068
goto bigger;
1069
} while (*s++ != '\n');
1070
break;
1071
}
1072
if (*t-- == '\n')
1073
{
1074
s = t + 2;
1075
break;
1076
}
1077
}
1078
size += (s - b);
1079
1080
/*...INDENT*/
1081
}
1082
total -= (jp->size = size);
1083
jp->chunk = (size + i - 1) / i;
1084
if (jp->chunk > chunk)
1085
chunk = jp->chunk;
1086
jp->intermediates = i;
1087
offset += size;
1088
}
1089
if (rsfileclose(sp->rec, ip))
1090
return -1;
1091
sfclose(ip);
1092
key->procsize = (key->procsize > chunk) ? chunk : chunk / ((chunk + key->procsize - 1) / key->procsize);
1093
}
1094
}
1095
}
1096
key->insize = sp->end = x;
1097
1098
/*
1099
* check the output file for clash with the input files
1100
*/
1101
1102
n = stat("/dev/null", &is);
1103
if (pathstdout(key->output))
1104
{
1105
key->output = "/dev/stdout";
1106
sp->op = sfstdout;
1107
if (!n && !fstat(sffileno(sp->op), &os) && os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1108
key->type |= RS_IGNORE;
1109
}
1110
else if (key->input)
1111
{
1112
if (!stat(key->output, &os))
1113
{
1114
if (!n && os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1115
key->type |= RS_IGNORE;
1116
else if (eaccess(key->output, W_OK))
1117
error(ERROR_SYSTEM|3, "%s: cannot write", key->output);
1118
else if (!fs3d(FS3D_TEST) || !iview(&os))
1119
{
1120
p = key->input;
1121
while (s = *p++)
1122
if (!pathstdin(s))
1123
{
1124
if (stat(s, &is))
1125
error(ERROR_SYSTEM|2, "%s: not found", s);
1126
else if (os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1127
{
1128
if (t = strrchr(key->output, '/'))
1129
{
1130
s = key->output;
1131
*t = 0;
1132
}
1133
else
1134
s = ".";
1135
if (sp->overwrite = pathtemp(NiL, 0, s, error_info.id, &n))
1136
sp->op = sfnew(sfstdout, NiL, SF_UNBOUND, n, SF_WRITE);
1137
if (t) *t = '/';
1138
if (!sp->op || fstat(n, &is))
1139
error(ERROR_SYSTEM|3, "%s: cannot create overwrite file %s", key->output, sp->overwrite);
1140
if (os.st_uid != is.st_uid || os.st_gid != is.st_gid)
1141
sp->preserve = 1;
1142
break;
1143
}
1144
}
1145
}
1146
}
1147
if (!sp->overwrite && !(sp->op = sfopen(sfstdout, key->output, "w")))
1148
error(ERROR_SYSTEM|3, "%s: cannot write", key->output);
1149
}
1150
if (rsfilewrite(sp->rec, sp->op, key->output))
1151
return -1;
1152
if (key->outsize > 0)
1153
sfsetbuf(sp->op, NiL, key->outsize);
1154
if (sp->zip & SF_WRITE)
1155
sfdcgzip(sp->op, 0);
1156
1157
/*
1158
* finally ready for recsort now
1159
*/
1160
1161
if (rsinit(sp->rec, key->meth, key->procsize, key->type, key))
1162
{
1163
error(ERROR_SYSTEM|2, "sort library initialization error");
1164
rskeyclose(key);
1165
return -1;
1166
}
1167
if (sp->rec->meth->type == RS_MTCOPY)
1168
sp->chunk = 1;
1169
if (sp->test & TEST_io)
1170
{
1171
for (n = 0; s = key->input[n]; n++)
1172
error(0, "%s input[%d]\t\"%s\"", error_info.id, n, s);
1173
if (s = key->output)
1174
error(0, "%s output\t\"%s\"", error_info.id, s);
1175
}
1176
return 0;
1177
}
1178
1179
/*
1180
* close sp->files and push fp if not 0
1181
*/
1182
1183
static void
1184
clear(register Sort_t* sp, Sfio_t* fp)
1185
{
1186
register int i;
1187
1188
for (i = fp ? sp->mfiles : 0; i < sp->nfiles; i++)
1189
{
1190
rstempclose(sp->rec, sp->files[i]);
1191
sp->files[i] = 0;
1192
}
1193
if (fp)
1194
{
1195
sp->files[sp->mfiles++] = fp;
1196
sp->nfiles = sp->mfiles;
1197
if (sp->mfiles >= (sp->xfiles - 1))
1198
sp->mfiles = 0;
1199
}
1200
else
1201
sp->nfiles = sp->mfiles = 0;
1202
}
1203
1204
/*
1205
* flush the intermediate data
1206
* r is the partial record offset
1207
* updated r is returned
1208
*/
1209
1210
static ssize_t
1211
flush(register Sort_t* sp, register size_t r)
1212
{
1213
register Sfio_t* fp;
1214
register size_t n;
1215
register size_t m;
1216
register size_t b;
1217
1218
if (sp->chunk)
1219
{
1220
/*
1221
* skip merge and output sorted chunk
1222
*/
1223
1224
if (rswrite(sp->rec, sp->op, RS_OTEXT))
1225
{
1226
if (!error_info.errors)
1227
error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1228
return -1;
1229
}
1230
}
1231
else if (sp->rec->meth->type != RS_MTVERIFY)
1232
{
1233
/*
1234
* write to an intermediate file and rewind for rsmerge
1235
*/
1236
1237
if (!(fp = sp->files[sp->nfiles]))
1238
{
1239
if (sp->child || !(fp = rstempwrite(sp->rec, (Sfio_t*)0)))
1240
error(ERROR_SYSTEM|3, "cannot create intermediate sort file %d", sp->nfiles);
1241
sp->files[sp->nfiles] = fp;
1242
}
1243
sp->nfiles++;
1244
if (sp->verbose)
1245
error(0, "%s write intermediate", error_info.id);
1246
if (rswrite(sp->rec, fp, 0))
1247
{
1248
error(ERROR_SYSTEM|2, "intermediate sort file write error");
1249
return -1;
1250
}
1251
if (rstempread(sp->rec, fp))
1252
{
1253
error(ERROR_SYSTEM|2, "intermediate sort file rewind error");
1254
return -1;
1255
}
1256
1257
/*
1258
* multi-stage merge when open file limit exceeded
1259
*/
1260
1261
if (sp->nfiles >= sp->xfiles)
1262
{
1263
if (sp->child || !(fp = rstempwrite(sp->rec, (Sfio_t*)0)))
1264
error(ERROR_SYSTEM|3, "cannot create intermediate merge file");
1265
if (sp->verbose)
1266
error(0, "%s merge multi-stage intermediate", error_info.id);
1267
if (rsmerge(sp->rec, fp, sp->files + sp->mfiles, sp->nfiles - sp->mfiles, 0))
1268
{
1269
error(ERROR_SYSTEM|2, "intermediate merge file write error");
1270
return -1;
1271
}
1272
if (rstempread(sp->rec, fp))
1273
{
1274
error(ERROR_SYSTEM|2, "intermediate merge file rewind error");
1275
return -1;
1276
}
1277
clear(sp, fp);
1278
}
1279
}
1280
1281
/*
1282
* slide over partial record data so the next read is aligned
1283
*/
1284
1285
if (!sp->map && (m = sp->cur - r))
1286
{
1287
n = roundof(m, sp->key->alignsize) - m;
1288
if (n < r)
1289
{
1290
m = n;
1291
while (r < sp->cur)
1292
sp->buf[n++] = sp->buf[r++];
1293
sp->cur = n;
1294
}
1295
else
1296
{
1297
b = r;
1298
r += m;
1299
n += m;
1300
sp->cur = n;
1301
while (r > b)
1302
sp->buf[--n] = sp->buf[--r];
1303
m = n;
1304
}
1305
}
1306
else
1307
m = sp->cur = 0;
1308
return m;
1309
}
1310
1311
/*
1312
* input the records for file ip
1313
*/
1314
1315
static int
1316
input(register Sort_t* sp, Sfio_t* ip, const char* name)
1317
{
1318
register ssize_t n;
1319
register ssize_t p;
1320
register ssize_t m;
1321
register ssize_t r;
1322
size_t z;
1323
char* b;
1324
int c;
1325
char del[2];
1326
1327
/*
1328
* align the read buffer and
1329
* loop on insize chunks
1330
*/
1331
1332
error_info.file = ip == sfstdin ? (char*)0 : (char*)name;
1333
m = -1;
1334
z = 0;
1335
if (sp->bufsize)
1336
sfsetbuf(ip, NiL, sp->bufsize);
1337
else if (sfsize(ip) > SF_BUFSIZE)
1338
{
1339
if (sp->map)
1340
sfsetbuf(ip, NiL, z = sp->end);
1341
else
1342
sfsetbuf(ip, NiL, 0);
1343
}
1344
r = sp->cur = roundof(sp->cur, sp->key->alignsize);
1345
p = 0;
1346
for (;;)
1347
{
1348
if (sp->cur > sp->hit)
1349
{
1350
if (sp->single && !sp->nfiles && sp->total == (sp->map ? 0 : p))
1351
break;
1352
if ((r = flush(sp, r)) < 0)
1353
return -1;
1354
}
1355
if (!sp->map)
1356
n = sfeof(ip) ? 0 : sfread(ip, sp->buf + sp->cur, sp->end - sp->cur);
1357
else
1358
{
1359
sp->buf = (char*)sfreserve(ip, m, SF_LOCKR);
1360
n = sfvalue(ip);
1361
if (!sp->buf)
1362
{
1363
if (m < 0 && n < -m && z == sp->end)
1364
{
1365
sfsetbuf(ip, NiL, z = 2 * sp->end);
1366
sp->buf = (char*)sfreserve(ip, m, SF_LOCKR);
1367
n = sfvalue(ip);
1368
if (sp->verbose && n)
1369
error(0, "%s buffer boundary expand to %I*d", error_info.id, sizeof(n), n);
1370
}
1371
if (!sp->buf && n > 0 && !(sp->buf = sfreserve(ip, n, SF_LOCKR)))
1372
n = -1;
1373
}
1374
}
1375
if (n <= 0)
1376
{
1377
if (n < 0)
1378
error(ERROR_SYSTEM|3, "read error");
1379
if (sp->cur <= r)
1380
break;
1381
if (sp->key->fixed)
1382
{
1383
error(1, "incomplete record length=%lld", (Sflong_t)(sp->cur - r));
1384
break;
1385
}
1386
if (RECTYPE(sp->key->disc->data) == REC_delimited && sp->buf[sp->cur - 1] != (c = REC_D_DELIMITER(sp->key->disc->data)))
1387
{
1388
sp->buf[sp->cur++] = c;
1389
if (c == '\n')
1390
error(1, "newline appended");
1391
else
1392
{
1393
del[0] = c;
1394
del[1] = 0;
1395
error(1, "%s appended", fmtquote(del, "'", NiL, 1, 0));
1396
}
1397
}
1398
}
1399
sp->cur += n;
1400
process:
1401
if (sp->verbose && !sp->child)
1402
error(ERROR_PROMPT, "%s process %lld ->", error_info.id, (Sflong_t)(sp->cur - r));
1403
if ((p = rsprocess(sp->rec, sp->buf + r, sp->cur - r)) < 0)
1404
error(ERROR_SYSTEM|3, "sort error");
1405
if (sp->verbose)
1406
{
1407
if (sp->child)
1408
error(0, "%s process %lld -> %lld", error_info.id, (Sflong_t)(sp->cur - r), (Sflong_t)p);
1409
else
1410
error(0, " %lld", (Sflong_t)p);
1411
}
1412
if (sp->map)
1413
{
1414
if (sp->map > 2)
1415
break;
1416
sfread(ip, sp->buf, p);
1417
if (p)
1418
{
1419
m = -(n - p + 1);
1420
if (((sp->total -= p) / 3) < (sp->end / 2) && sp->total > sp->end)
1421
{
1422
if ((r = flush(sp, r)) < 0)
1423
return -1;
1424
sfsetbuf(ip, NiL, sp->total);
1425
}
1426
}
1427
else if (sp->map == 1)
1428
{
1429
sp->map++;
1430
m = -(n + 1);
1431
}
1432
else if (n > sp->end)
1433
{
1434
error(2, "monster record", n, sp->end, sp->cur, p);
1435
break;
1436
}
1437
else if (sp->key->fixed)
1438
{
1439
error(1, "incomplete record length=%ld", n - p);
1440
break;
1441
}
1442
else
1443
{
1444
sp->cur = n - p;
1445
if (!(b = vmnewof(Vmheap, 0, char, sp->cur, 1)))
1446
error(ERROR_SYSTEM|3, "out of space");
1447
memcpy(b, sp->buf + p, sp->cur);
1448
if (RECTYPE(sp->key->disc->data) == REC_delimited && b[sp->cur - 1] != (c = REC_D_DELIMITER(sp->key->disc->data)))
1449
{
1450
b[sp->cur++] = '\n';
1451
if (c == '\n')
1452
error(1, "newline appended");
1453
else
1454
{
1455
del[0] = c;
1456
del[1] = 0;
1457
error(1, "%s appended", fmtquote(del, "'", "'", 1, 0));
1458
}
1459
}
1460
sp->buf = b;
1461
sp->map++;
1462
goto process;
1463
}
1464
}
1465
else
1466
r += p;
1467
}
1468
error_info.file = 0;
1469
return 0;
1470
}
1471
1472
/*
1473
* sfio part discipline read
1474
*/
1475
1476
static ssize_t
1477
partread(Sfio_t* fp, Void_t* buf, size_t size, Sfdisc_t* dp)
1478
{
1479
register Part_t* pp = (Part_t*)dp;
1480
1481
if (pp->remain <= 0)
1482
return 0;
1483
if (size > pp->remain)
1484
size = pp->remain;
1485
pp->remain -= size;
1486
return sfrd(fp, buf, size, dp);
1487
}
1488
1489
/*
1490
* sfio part discipline seek
1491
*/
1492
1493
static Sfoff_t
1494
partseek(Sfio_t* fp, Sfoff_t lloffset, int op, Sfdisc_t* dp)
1495
{
1496
register Part_t* pp = (Part_t*)dp;
1497
off_t offset = lloffset;
1498
1499
switch (op)
1500
{
1501
case SEEK_SET:
1502
offset += pp->offset;
1503
break;
1504
case SEEK_CUR:
1505
offset += pp->offset;
1506
break;
1507
case SEEK_END:
1508
offset = pp->offset + pp->size - offset;
1509
op = SEEK_SET;
1510
break;
1511
}
1512
if ((offset = sfsk(fp, offset, op, dp)) >= 0)
1513
{
1514
offset -= pp->offset;
1515
pp->remain = pp->size - offset;
1516
}
1517
return offset;
1518
}
1519
1520
/*
1521
* job control
1522
* requires single named input file
1523
* no multi-stage merge
1524
*/
1525
1526
static void
1527
jobs(register Sort_t* sp)
1528
{
1529
register Job_t* jp;
1530
register Job_t* xp;
1531
register int i;
1532
register int j;
1533
register int f;
1534
int status;
1535
char* file;
1536
Sfio_t* ip;
1537
Part_t part;
1538
char id[32];
1539
1540
sp->single = 0;
1541
if (sp->verbose)
1542
error(0, "%s %d processes %lld total", error_info.id, sp->key->nproc, (Sflong_t)sp->total);
1543
xp = sp->jobs + sp->key->nproc;
1544
if (sp->test & TEST_show)
1545
{
1546
for (jp = sp->jobs; jp < xp; jp++)
1547
error(0, "%s#%d pos %12lld : len %10lld : buf %10lld : num %2d", error_info.id, jp - sp->jobs + 1, (Sflong_t)jp->offset, (Sflong_t)jp->size, (Sflong_t)jp->chunk, jp->intermediates);
1548
exit(0);
1549
}
1550
f = 0;
1551
for (jp = sp->jobs; jp < xp; jp++)
1552
for (i = 0; i < jp->intermediates; i++)
1553
if (!(sp->files[f++] = rstempwrite(sp->rec, (Sfio_t*)0)))
1554
error(ERROR_SYSTEM|3, "cannot create intermediate file %d", i);
1555
part.disc.readf = partread;
1556
part.disc.writef = 0;
1557
part.disc.seekf = partseek;
1558
part.disc.exceptf = 0;
1559
part.disc.disc = 0;
1560
file = sp->key->input[0];
1561
j = 0;
1562
for (jp = sp->jobs; jp < xp; jp++)
1563
{
1564
ip = fileopen(sp, file);
1565
switch (fork())
1566
{
1567
case -1:
1568
error(ERROR_SYSTEM|3, "not enough child processes");
1569
case 0:
1570
sp->child = 1;
1571
sfsprintf(id, sizeof(id), "%s#%d", error_info.id, jp - sp->jobs + 1);
1572
error_info.id = id;
1573
sp->end = jp->chunk;
1574
part.offset = jp->offset;
1575
sp->total = part.size = part.remain = jp->size;
1576
sfdisc(ip, &part.disc);
1577
for (i = 0; i < jp->intermediates; i++)
1578
sp->files[i] = sp->files[j++];
1579
while (i < f)
1580
sp->files[i++] = 0;
1581
if (sp->verbose)
1582
error(0, "%s pos %12lld : len %10lld : buf %10lld : num %2d", error_info.id, (Sflong_t)jp->offset, (Sflong_t)jp->size, (Sflong_t)jp->chunk, jp->intermediates);
1583
exit(input(sp, ip, file) < 0);
1584
}
1585
if (rsfileclose(sp->rec, ip))
1586
exit(1);
1587
sfclose(ip);
1588
j += jp->intermediates;
1589
}
1590
sp->nfiles = f;
1591
i = 0;
1592
j = sp->key->nproc;
1593
while (j > 0)
1594
{
1595
if (wait(&status) != -1)
1596
{
1597
if (status)
1598
i++;
1599
j--;
1600
}
1601
else if (errno != EINTR)
1602
{
1603
error(ERROR_SYSTEM|3, "%d process%s did not complete", j, j == 1 ? "" : "es");
1604
break;
1605
}
1606
}
1607
if (i)
1608
error(3, "%d child process%s failed", i, i == 1 ? "" : "es");
1609
}
1610
1611
/*
1612
* all done
1613
*/
1614
1615
static void
1616
done(register Sort_t* sp)
1617
{
1618
while (rsdisc(sp->rec, NiL, RS_POP));
1619
if ((sfsync(sp->op) || sp->op != sfstdout && rsfileclose(sp->rec, sp->op)) && !error_info.errors)
1620
error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1621
if (rsclose(sp->rec))
1622
error(2, "sort error");
1623
if (sp->map > 2)
1624
vmfree(Vmheap, sp->buf);
1625
1626
/*
1627
* if the output would have overwritten an input
1628
* file now is the time to commit to it
1629
*/
1630
1631
if (sp->overwrite)
1632
{
1633
if (error_info.errors)
1634
remove(sp->overwrite);
1635
else if (sp->preserve)
1636
{
1637
Sfio_t* ip;
1638
Sfio_t* op;
1639
1640
if (ip = sfopen(NiL, sp->overwrite, "r"))
1641
{
1642
if (op = sfopen(NiL, sp->key->output, "w"))
1643
{
1644
if ((sfmove(ip, op, SF_UNBOUND, -1) < 0 || sfclose(op) || !sfeof(ip)) && !error_info.errors)
1645
error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1646
sfclose(op);
1647
}
1648
else
1649
error(ERROR_SYSTEM|2, "%s: cannot write", sp->key->output);
1650
sfclose(ip);
1651
remove(sp->overwrite);
1652
}
1653
else
1654
error(ERROR_SYSTEM|2, "%s: cannot read", sp->overwrite);
1655
sp->preserve = 0;
1656
}
1657
else if (remove(sp->key->output) || rename(sp->overwrite, sp->key->output))
1658
error(ERROR_SYSTEM|2, "%s: cannot overwrite", sp->key->output);
1659
free(sp->overwrite);
1660
sp->overwrite = 0;
1661
}
1662
if (rskeyclose(sp->key))
1663
error(2, "sort key error");
1664
}
1665
1666
int
1667
main(int argc, char** argv)
1668
{
1669
register char* s;
1670
register Sfio_t* fp;
1671
char** merge;
1672
Sort_t sort;
1673
1674
error_info.id = "sort";
1675
if (init(&sort, &sort.disc, argv))
1676
exit(1);
1677
if (sort.test & TEST_dump)
1678
{
1679
sfprintf(sfstderr, "main\n\tintermediates=%d\n", sort.xfiles);
1680
rskeydump(sort.key, sfstderr);
1681
}
1682
if (sort.key->type & RS_CAT)
1683
{
1684
while (s = *sort.key->input++)
1685
{
1686
fp = fileopen(&sort, s);
1687
if (sfmove(fp, sfstdout, SF_UNBOUND, -1) < 0 || !sfeof(fp))
1688
error(ERROR_SYSTEM|2, "%s: read error", s);
1689
if (sferror(sfstdout) || rsfileclose(sort.rec, fp))
1690
break;
1691
}
1692
}
1693
else
1694
{
1695
merge = sort.key->merge && sort.key->input[0] && sort.key->input[1] ? sort.key->input : (char**)0;
1696
fp = 0;
1697
if (sort.jobs)
1698
jobs(&sort);
1699
else if (sort.test & TEST_show)
1700
exit(0);
1701
else
1702
while (s = *sort.key->input++)
1703
{
1704
fp = fileopen(&sort, s);
1705
if (merge)
1706
{
1707
if (sort.nfiles >= sort.xfiles)
1708
{
1709
clear(&sort, NiL);
1710
sort.key->input = merge;
1711
merge = 0;
1712
if (fp != sfstdin)
1713
sfclose(fp);
1714
fp = 0;
1715
continue;
1716
}
1717
sort.files[sort.nfiles++] = fp;
1718
}
1719
else if (input(&sort, fp, s) < 0)
1720
break;
1721
else if (fp != sfstdin && !sort.map)
1722
{
1723
sfclose(fp);
1724
fp = 0;
1725
}
1726
}
1727
if (sort.nfiles)
1728
{
1729
if (sort.cur && flush(&sort, sort.cur) < 0)
1730
return 1;
1731
if (sort.verbose)
1732
error(0, "%s merge text", error_info.id);
1733
if (rsmerge(sort.rec, sort.op, sort.files, sort.nfiles, merge ? RS_TEXT : RS_OTEXT))
1734
error(ERROR_SYSTEM|3, "merge error");
1735
clear(&sort, NiL);
1736
}
1737
else
1738
{
1739
if (sort.verbose)
1740
error(0, "%s write text", error_info.id);
1741
if (rswrite(sort.rec, sort.op, RS_OTEXT) && !error_info.errors)
1742
error(ERROR_SYSTEM|3, "%s: write error", sort.key->output);
1743
if (fp && fp != sfstdin)
1744
sfclose(fp);
1745
}
1746
}
1747
done(&sort);
1748
exit(error_info.errors != 0);
1749
}
1750
1751