Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
att
GitHub Repository: att/ast
Path: blob/master/src/lib/librecsort/rsmerge.c
1808 views
1
/***********************************************************************
2
* *
3
* This software is part of the ast package *
4
* Copyright (c) 1996-2011 AT&T Intellectual Property *
5
* and is licensed under the *
6
* Eclipse Public License, Version 1.0 *
7
* by AT&T Intellectual Property *
8
* *
9
* A copy of the License is available at *
10
* http://www.eclipse.org/org/documents/epl-v10.html *
11
* (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12
* *
13
* Information and Software Systems Research *
14
* AT&T Research *
15
* Florham Park NJ *
16
* *
17
* Phong Vo <[email protected]> *
18
* Glenn Fowler <[email protected]> *
19
* *
20
***********************************************************************/
21
#include "rshdr.h"
22
23
/* Merging streams of sorted records.
24
** Strategy:
25
** 1. Each stream is represented by a current least records.
26
** A cache of read-ahead records are kept for each stream.
27
** 2. Streams are sorted by representative records and by positions
28
** for stability.
29
**
30
** Written by Kiem-Phong Vo (07/08/96)
31
*/
32
33
#define MG_CACHE 1024 /* maximum # records in cache */
34
35
typedef struct _merge_s
36
{ Rsobj_t obj[MG_CACHE]; /* records */
37
int cpos; /* current cache position */
38
int cend; /* end of cached records */
39
ssize_t match; /* # incoming singletons/equiv */
40
Sfio_t* f; /* input stream */
41
int pos; /* stream position for tiebreak */
42
int eof; /* have reached eof */
43
int flags; /* stream flags */
44
uchar* rsrv; /* reserved data begin */
45
uchar* cur; /* reserved data current */
46
uchar* endrsrv; /* reserved data end */
47
Vmalloc_t* vm; /* space for keys */
48
struct _merge_s *equi; /* equivalence class chain */
49
} Merge_t;
50
51
#define APPEND(rs,obj,t) \
52
{ if((t = rs->sorted)) \
53
{ t->left->right = (obj); } \
54
else { rs->sorted = t = (obj); } \
55
t->left = (obj); \
56
}
57
58
#define MGSETEOF(mg) (mg->eof = 1)
59
#define MGCLREOF(mg) (mg->eof = 0)
60
#define MGISEOF(mg) (mg->eof)
61
#define MGRESERVE(mg,rsrv,endrsrv,cur,r,action) \
62
{ reg ssize_t rr; \
63
if((cur+r) > endrsrv) \
64
{ if(rsrv && sfread(mg->f,rsrv,cur-rsrv) != cur-rsrv) { MGSETEOF(mg); action;} \
65
rsrv = endrsrv = cur = NIL(uchar*); \
66
rr = r <= RS_RESERVE ? RS_RESERVE : ((r/1024)+1)*1024; \
67
if(!(rsrv = (uchar*)sfreserve(mg->f,rr,SF_LOCKR)) ) \
68
{ if((rr = sfvalue(mg->f)) < r) { if (rr <= 0) { MGSETEOF(mg); action;} rr = r;} \
69
if(!(rsrv = (uchar*)sfreserve(mg->f,rr,SF_LOCKR)) ) { MGSETEOF(mg); action;} \
70
} \
71
endrsrv = (cur = rsrv) + rr; \
72
} \
73
}
74
75
#define RSRESERVE(rs,rsrv,endrsrv,cur,w,action) \
76
do \
77
{ reg ssize_t rw; \
78
if((endrsrv-cur) < w) \
79
{ if(rsrv && sfwrite(rs->f,rsrv,cur-rsrv) != cur-rsrv) { action;} \
80
rsrv = endrsrv = cur = NIL(uchar*); \
81
rw = w <= RS_RESERVE ? RS_RESERVE : ((w/1024)+1)*1024; \
82
if(!(rsrv = (uchar*)sfreserve(rs->f,rw,SF_LOCKR)) ) \
83
{ if((rw = sfvalue(rs->f)) < w) rw = w; \
84
if(!(rsrv = (uchar*)sfreserve(rs->f,rw,SF_LOCKR)) ) { action;} \
85
} \
86
endrsrv = (cur = rsrv) + rw; \
87
} \
88
} while (0)
89
90
#define RSSYNC(rs) \
91
{ if(rs->rsrv) \
92
{ sfwrite(rs->f,rs->rsrv,rs->cur-rs->rsrv); \
93
rs->rsrv = rs->cur = rs->endrsrv = NIL(uchar*); \
94
} \
95
}
96
97
/* write out any pending records */
98
#if __STD_C
99
static int mgflush(reg Rs_t* rs)
100
#else
101
static int mgflush(rs)
102
reg Rs_t* rs;
103
#endif
104
{
105
reg Rsobj_t* r;
106
reg ssize_t n;
107
108
if((r = rs->sorted) )
109
{ r->left->right = NIL(Rsobj_t*);
110
if(!(rs->type&RS_OTEXT) ) /* need to write the count */
111
{ for(n = -1, r = r->right; r; r = r->right)
112
n -= 1;
113
rs->sorted->order = n;
114
}
115
116
if(RSWRITE(rs,rs->f,rs->type&RS_TEXT) < 0)
117
return -1;
118
119
rs->sorted = NIL(Rsobj_t*);
120
}
121
122
return 0;
123
}
124
125
/* Read new records from stream mg */
126
#if __STD_C
127
static int mgrefresh(Rs_t* rs, Merge_t* mg)
128
#else
129
static int mgrefresh(rs, mg)
130
Rs_t* rs;
131
Merge_t* mg;
132
#endif
133
{
134
ssize_t datalen, rsc;
135
reg Rsobj_t *obj, *endobj;
136
reg uchar *t, *cur, *rsrv, *endrsrv;
137
reg int n, type = rs->type;
138
reg ssize_t key = rs->disc->key;
139
reg ssize_t keylen = rs->disc->keylen;
140
reg Rsdefkey_f defkeyf = rs->disc->defkeyf;
141
reg uchar *m_key, *c_key;
142
reg ssize_t s_key, s, o, x;
143
144
if(MGISEOF(mg))
145
return -1;
146
147
/* release key memory */
148
if(defkeyf)
149
{ vmclear(mg->vm);
150
m_key = c_key = NIL(uchar*);
151
s_key = 0;
152
}
153
154
mg->cend = 0;
155
156
if(mgflush(rs) < 0)
157
return -1;
158
159
cur = mg->cur; rsrv = mg->rsrv; endrsrv = mg->endrsrv;
160
161
datalen = rsc = rs->disc->data;
162
163
if(type&RS_ITEXT)
164
{ if(type&RS_DSAMELEN)
165
{ MGRESERVE(mg,rsrv,endrsrv,cur,datalen, return -1);
166
}
167
else for(s = RS_RESERVE, o = 0;;) /* make sure we have at least 1 record */
168
{ MGRESERVE(mg,rsrv,endrsrv,cur,s, goto last_chunk);
169
x = endrsrv-cur;
170
#if _PACKAGE_ast
171
if (rsc & ~0xff) /* Recfmt_t record descriptor */
172
{ if ((datalen = reclen(rsc, cur, x)) < 0)
173
return -1;
174
if (datalen <= x)
175
break;
176
}
177
else
178
#endif
179
if((t = (uchar*)memchr(cur,rsc,x)) )
180
{ datalen = (t-cur)+1;
181
break;
182
}
183
if(MGISEOF(mg))
184
return -1;
185
else if(o == x)
186
{ datalen = x;
187
break;
188
}
189
else
190
{ o = x;
191
s += RS_RESERVE;
192
continue;
193
}
194
last_chunk:
195
if((s = sfvalue(mg->f)) <= 0)
196
return -1;
197
MGCLREOF(mg);
198
}
199
}
200
else
201
{ if(mg->match == 0) /* get group size */
202
{ MGRESERVE(mg,rsrv,endrsrv,cur,sizeof(ssize_t), return -1);
203
t = (uchar*)(&mg->match); MEMCPY(t,cur,sizeof(ssize_t));
204
if(mg->match == 0)
205
{ MGSETEOF(mg);
206
return -1;
207
}
208
}
209
210
/* define length of next record */
211
if(!(type&RS_DSAMELEN) )
212
{ MGRESERVE(mg,rsrv,endrsrv,cur,sizeof(ssize_t), return -1);
213
#if _PACKAGE_ast
214
if (rsc & ~0xff) /* Recfmt_t record descriptor */
215
datalen = reclen(rsc, cur, sizeof(ssize_t));
216
else
217
#endif
218
{ t = (uchar*)(&datalen);
219
MEMCPY(t,cur,sizeof(ssize_t));
220
}
221
if(datalen < 0)
222
{ MGSETEOF(mg);
223
return -1;
224
}
225
}
226
227
/* get data for at least 1 record */
228
MGRESERVE(mg,rsrv,endrsrv,cur,datalen, return -1);
229
}
230
231
endobj = (obj = mg->obj)+MG_CACHE;
232
n = mg->match < 0 ? 1 : -1;
233
234
/* fast loop for a common case */
235
if(!defkeyf && (type&RS_DSAMELEN) && !(type&RS_ITEXT))
236
{ if(keylen <= 0)
237
keylen += datalen-key;
238
for(;;)
239
{ obj->equal = NIL(Rsobj_t*);
240
obj->data = cur;
241
obj->datalen = datalen;
242
obj->key = cur+key;
243
obj->keylen = keylen;
244
cur += datalen;
245
246
OBJHEAD(obj);
247
obj += 1;
248
249
if((mg->match += n) >= 0 ||
250
obj >= endobj || (cur+datalen) > endrsrv )
251
goto done;
252
}
253
}
254
255
for(;; )
256
{ obj->equal = NIL(Rsobj_t*);
257
obj->data = cur;
258
cur += datalen;
259
obj->datalen = datalen;
260
261
if(defkeyf)
262
{ if((s = key*datalen) > s_key )
263
{ s = ((s + RS_RESERVE-1)/RS_RESERVE)*RS_RESERVE;
264
if(m_key && !vmresize(mg->vm,m_key,(c_key-m_key)+s,0) )
265
{ vmresize(mg->vm,m_key,c_key-m_key,0);
266
m_key = c_key = NIL(uchar*);
267
s_key = 0;
268
}
269
if(!m_key)
270
{ if(!(m_key = (uchar*)vmalloc(mg->vm,s)) )
271
{ MGSETEOF(mg);
272
return -1;
273
}
274
c_key = m_key;
275
}
276
s_key = s;
277
}
278
279
s = (*defkeyf)(rs,obj->data,datalen,c_key,s_key,rs->disc);
280
if(s < 0)
281
{ MGSETEOF(mg);
282
return -1;
283
}
284
285
obj->key = c_key;
286
obj->keylen = s;
287
c_key += s;
288
s_key -= s;
289
}
290
else
291
{ obj->key = obj->data + key;
292
if((obj->keylen = keylen) <= 0)
293
obj->keylen += datalen - key;
294
}
295
296
OBJHEAD(obj); /* set up obj->order for quick comparison */
297
obj += 1;
298
299
if(type&RS_ITEXT)
300
{ if(obj >= endobj)
301
goto done;
302
if(type&RS_DSAMELEN)
303
{ if((cur+datalen) > endrsrv)
304
goto done;
305
}
306
else
307
{
308
#if _PACKAGE_ast
309
if (rsc & ~0xff) /* Recfmt_t record descriptor */
310
{ if ((datalen = reclen(rsc, cur, endrsrv-cur)) < 0 || datalen > (endrsrv-cur))
311
goto done;
312
}
313
else
314
#endif
315
if(!(t = (uchar*)memchr(cur,rsc,endrsrv-cur)) )
316
goto done;
317
else
318
datalen = (t-cur)+1;
319
}
320
}
321
else
322
{ if((mg->match += n) >= 0 || obj >= endobj)
323
goto done;
324
325
if(type&RS_DSAMELEN)
326
{ if((cur+datalen) > endrsrv)
327
goto done;
328
}
329
else
330
{ if(cur+sizeof(ssize_t) > endrsrv)
331
goto done;
332
#if _PACKAGE_ast
333
if (rsc & ~0xff) /* Recfmt_t record descriptor */
334
datalen = reclen(rsc, cur, sizeof(ssize_t));
335
else
336
#endif
337
{ t = (uchar*)(&datalen);
338
MEMCPY(t,cur,sizeof(ssize_t));
339
}
340
if(datalen < 0)
341
{ MGSETEOF(mg);
342
return -1;
343
}
344
if((cur+datalen) > endrsrv)
345
{
346
#if _PACKAGE_ast
347
if (!(rsc & ~0xff))
348
#endif
349
cur -= sizeof(ssize_t);
350
goto done;
351
}
352
}
353
}
354
}
355
356
done:
357
mg->cpos = 0;
358
mg->cend = obj-mg->obj;
359
mg->rsrv = rsrv; mg->endrsrv = endrsrv; mg->cur = cur;
360
361
return 0;
362
}
363
364
#if __STD_C
365
static int mgclose(Rs_t* rs, Merge_t* mg)
366
#else
367
static int mgclose(rs, mg)
368
Rs_t* rs;
369
Merge_t* mg;
370
#endif
371
{
372
int ret;
373
374
ret = mgflush(rs);
375
376
if(mg->rsrv)
377
sfread(mg->f,mg->rsrv,mg->cur-mg->rsrv);
378
379
sfset(mg->f,(mg->flags&(SF_WRITE|SF_SHARE|SF_PUBLIC)),1);
380
381
if(rs->disc->defkeyf && mg->vm)
382
vmclose(mg->vm);
383
384
vmfree(Vmheap,mg);
385
386
return ret;
387
}
388
389
#if __STD_C
390
static Merge_t* mgopen(Rs_t* rs, Sfio_t* f, int pos)
391
#else
392
static Merge_t* mgopen(rs, f, pos)
393
Rs_t* rs; /* sorting context */
394
Sfio_t* f; /* input stream */
395
int pos; /* stream position for resolving equal records */
396
#endif
397
{
398
reg Merge_t* mg;
399
static Vmdisc_t vmdisc;
400
401
if(!vmdisc.memoryf)
402
{ vmdisc.memoryf = Vmdcheap->memoryf;
403
vmdisc.exceptf = Vmdcheap->exceptf;
404
vmdisc.round = RS_RESERVE;
405
}
406
407
if(!(mg = (Merge_t*)vmresize(Vmheap,NIL(Void_t*),sizeof(Merge_t),VM_RSZERO)) )
408
return NIL(Merge_t*);
409
410
mg->vm = NIL(Vmalloc_t*);
411
if(rs->disc->defkeyf && !(mg->vm = vmopen(&vmdisc,Vmlast,0)) )
412
{ vmfree(Vmheap,mg);
413
return NIL(Merge_t*);
414
}
415
416
mg->cpos = mg->cend = 0;
417
mg->match = 0;
418
mg->f = f;
419
mg->pos = pos;
420
mg->eof = 0;
421
mg->flags = sfset(f,0,1); /* original stream flags */
422
mg->rsrv = mg->endrsrv = mg->cur = NIL(uchar*);
423
mg->equi = NIL(Merge_t*);
424
425
/* make sure that Sfio will use mmap if appropriate */
426
sfset(f,(SF_WRITE|SF_SHARE|SF_PUBLIC),0);
427
428
/* get a decent size buffer to work with */
429
if((mg->flags&SF_MALLOC) && !(mg->flags&SF_STRING) )
430
{ ssize_t round;
431
if((round = rs->c_max) > 0)
432
round /= 4;
433
sfsetbuf(f,NIL(Void_t*),round < RS_RESERVE ? RS_RESERVE : round);
434
}
435
436
/* fill first cache */
437
if(mgrefresh(rs,mg) < 0 )
438
{ mgclose(rs,mg);
439
return NIL(Merge_t*);
440
}
441
442
return mg;
443
}
444
445
/* compare two records. RS_REVERSE is taken care of here too. */
446
#define MGCOMPARE(rs,one,two,reverse) \
447
((one)->order == (two)->order ? mgcompare(rs,one,two,reverse) : \
448
(one)->order < (two)->order ? (reverse ? 1 : -1) : (reverse ? -1 : 1) )
449
#define MGMEMCMP(o1,o2,len,cmp,reverse) \
450
{ for(; len > 0; len -= 8) \
451
{ switch(len) \
452
{ default : if((cmp = *o1++ - *o2++) ) return reverse ? -cmp : cmp; \
453
case 7 : if((cmp = *o1++ - *o2++) ) return reverse ? -cmp : cmp; \
454
case 6 : if((cmp = *o1++ - *o2++) ) return reverse ? -cmp : cmp; \
455
case 5 : if((cmp = *o1++ - *o2++) ) return reverse ? -cmp : cmp; \
456
case 4 : if((cmp = *o1++ - *o2++) ) return reverse ? -cmp : cmp; \
457
case 3 : if((cmp = *o1++ - *o2++) ) return reverse ? -cmp : cmp; \
458
case 2 : if((cmp = *o1++ - *o2++) ) return reverse ? -cmp : cmp; \
459
case 1 : if((cmp = *o1++ - *o2++) ) return reverse ? -cmp : cmp; \
460
} \
461
} \
462
}
463
464
#if __STD_C
465
static int mgcompare(Rs_t* rs, Rsobj_t* one, Rsobj_t* two, int reverse)
466
#else
467
static int mgcompare(rs, one, two, reverse)
468
Rs_t* rs;
469
reg Rsobj_t* one;
470
reg Rsobj_t* two;
471
int reverse;
472
#endif
473
{
474
reg uchar *o, *t;
475
reg int c;
476
reg ssize_t l, d;
477
478
o = one->key+SIZEOF_LONG; t = two->key+SIZEOF_LONG;
479
if((d = (l = one->keylen) - two->keylen) > 0)
480
l -= d;
481
l -= SIZEOF_LONG;
482
MGMEMCMP(o,t,l,c,reverse);
483
484
if(d != 0)
485
return reverse ? -d : d;
486
else if(rs->type&RS_DATA) /* compare by data */
487
{ o = one->data; t = two->data;
488
if((d = (l = one->datalen) - two->datalen) > 0)
489
l -= d;
490
MGMEMCMP(o,t,l,c,reverse);
491
492
return reverse ? -d : d;
493
}
494
else return 0;
495
}
496
497
/* The stream list is kept in reverse order to ease data movement.
498
** Ties are broken by stream positions to preserve stability.
499
*/
500
#if __STD_C
501
static int mginsert(Rs_t* rs, Merge_t** list, int n, Merge_t* mg)
502
#else
503
static int mginsert(rs, list, n, mg)
504
Rs_t* rs;
505
Merge_t** list;
506
int n;
507
Merge_t* mg;
508
#endif
509
{
510
reg Rsobj_t *obj, *o;
511
reg Merge_t **l, **r, **m, *p, *h;
512
reg int cmp;
513
int reverse = rs->type&RS_REVERSE;
514
515
obj = mg->obj+mg->cpos;
516
r = (l = list) + n;
517
518
if(n > 4)
519
{ while(l != r)
520
{ m = l + (r-l)/2;
521
o = (*m)->obj+(*m)->cpos;
522
if((cmp = MGCOMPARE(rs,o,obj,reverse)) == 0)
523
l = r = m;
524
else if(cmp > 0)
525
l = l == m ? r : m;
526
else r = m;
527
}
528
}
529
else
530
{ for(r -= 1, cmp = 1; r >= l; --r)
531
{ o = (*r)->obj+(*r)->cpos;
532
if((cmp = MGCOMPARE(rs,o,obj,reverse)) > 0)
533
{ l = r+1; break; }
534
else if(cmp == 0)
535
{ l = r; break; }
536
}
537
}
538
539
if(cmp == 0)
540
{ for(p = NIL(Merge_t*), h = *l;; )
541
if(mg->pos < h->pos || !(p=h, h=h->equi) )
542
break;
543
mg->equi = h;
544
if(p) p->equi = mg;
545
else *l = mg;
546
}
547
else
548
{ for(r = list+n; r > l; --r)
549
*r = *(r-1);
550
*l = mg; mg->equi = NIL(Merge_t*);
551
n += 1;
552
}
553
554
return n;
555
}
556
557
/* move data from stream mg->f to output stream rs->f */
558
#if __STD_C
559
static int mgmove(reg Rs_t* rs, reg Merge_t* mg, ssize_t n)
560
#else
561
static int mgmove(rs, mg, n)
562
reg Rs_t* rs;
563
reg Merge_t* mg;
564
ssize_t n;
565
#endif
566
{
567
ssize_t w, r, len, n_obj;
568
reg uchar *d, *cur, *mgcur;
569
reg uchar *rsrv, *endrsrv, *mgrsrv, *mgendrsrv;
570
int ret = -1;
571
int notify, c, rsc;
572
Rsobj_t obj, out;
573
574
#if 0
575
static const char* event[] = { "TERMINATE", "ACCEPT", "INSERT", "DELETE", "DONE", "[5]", "[6]", "[7]" };
576
#endif
577
578
mgflush(rs);
579
580
rsrv = rs->rsrv; endrsrv = rs->endrsrv; cur = rs->cur;
581
mgrsrv = mg->rsrv; mgendrsrv = mg->endrsrv; mgcur = mg->cur;
582
notify = (rs->events & RS_WRITE) && (rs->type & RS_OTEXT);
583
rsc = rs->disc->data;
584
585
/* easy case, just copy everything over, let Sfio worry about it */
586
if(n < 0 && (rs->type&RS_ITEXT) && !notify)
587
{ if(rsrv)
588
{ sfwrite(rs->f, rsrv, cur-rsrv);
589
rs->rsrv = NIL(uchar*);
590
}
591
if(mgrsrv)
592
{ sfread(mg->f, mgrsrv, mgcur-mgrsrv);
593
mg->rsrv = NIL(uchar*);
594
}
595
return sfmove(mg->f,rs->f,-1,-1) < 0 ? -1 : 0;
596
}
597
598
for(n_obj = n < 0 ? 0 : n;; )
599
{ if(n_obj == 0)
600
{ if(MGISEOF(mg))
601
break;
602
if(rs->type&RS_ITEXT)
603
n_obj = 1;
604
else
605
{ MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,sizeof(ssize_t),break);
606
d = (uchar*)(&n_obj); MEMCPY(d,mgcur,sizeof(ssize_t));
607
if(n_obj == 0)
608
{ MGSETEOF(mg);
609
break;
610
}
611
}
612
if(!(rs->type&RS_OTEXT))
613
{ RSRESERVE(rs,rsrv,endrsrv,cur,sizeof(ssize_t),goto done);
614
d = (uchar*)(&n_obj); MEMCPY(cur,d,sizeof(ssize_t));
615
}
616
}
617
618
if(n_obj < 0)
619
n_obj = -n_obj;
620
621
if(rs->type&RS_DSAMELEN)
622
{ len = rs->disc->data;
623
if(notify)
624
{ for(; n_obj > 0; --n_obj)
625
{ MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,len,break);
626
RSRESERVE(rs,rsrv,endrsrv,cur,len, goto done);
627
obj.data = mgcur;
628
mgcur += len;
629
obj.datalen = len;
630
do
631
{ for (;;)
632
{ out.data = cur;
633
out.datalen = w = endrsrv - cur;
634
if ((c = rsnotify(rs, RS_WRITE, &obj, &out, rs->disc)) < 0)
635
goto done;
636
if (c == RS_DELETE)
637
{ out.datalen = 0;
638
break;
639
}
640
if (w >= out.datalen)
641
break;
642
RSRESERVE(rs,rsrv,endrsrv,cur,out.datalen, goto done);
643
}
644
cur += out.datalen;
645
} while (c == RS_INSERT);
646
}
647
}
648
else
649
{
650
len *= n_obj;
651
for(;;)
652
{ if((r = mgendrsrv-mgcur) > 0)
653
w = len > r ? r : len;
654
else
655
{ w = len > RS_RESERVE ? RS_RESERVE : len;
656
MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,w,break);
657
}
658
RSRESERVE(rs,rsrv,endrsrv,cur,w, goto done);
659
MEMCPY(cur,mgcur,w);
660
if((len -= w) == 0)
661
break;
662
}
663
}
664
n_obj = 0;
665
}
666
else if(rs->type&RS_ITEXT)
667
{ for(; n_obj > 0; --n_obj)
668
{ uchar *t;
669
ssize_t s, o, x;
670
for(s = RS_RESERVE, o = 0;;) /* make sure we have at least 1 record */
671
{ MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,s,goto last_chunk);
672
x = mgendrsrv-mgcur;
673
#if _PACKAGE_ast
674
if (rsc & ~0xff) /* Recfmt_t record descriptor */
675
{ if ((len = reclen(rsc, mgcur, x)) < 0)
676
goto done;
677
if (len <= x)
678
break;
679
}
680
else
681
#endif
682
if((t = (uchar*)memchr(mgcur,rsc,x)) )
683
{ len = (t-cur)+1;
684
break;
685
}
686
else if(o == x)
687
{ len = x;
688
break;
689
}
690
else
691
{ o = x;
692
s += RS_RESERVE;
693
continue;
694
}
695
last_chunk:
696
if((s = sfvalue(mg->f)) <= 0)
697
{ if(!s)
698
ret = 0;
699
MGSETEOF(mg);
700
goto done;
701
}
702
MGCLREOF(mg);
703
}
704
if(len <= 0)
705
{ ret = 0;
706
MGSETEOF(mg);
707
goto done;
708
}
709
MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,len,break);
710
RSRESERVE(rs,rsrv,endrsrv,cur,len, goto done);
711
if(notify)
712
{
713
obj.data = mgcur;
714
mgcur += len;
715
obj.datalen = len;
716
do
717
{ for (;;)
718
{ out.data = cur;
719
out.datalen = w = endrsrv - cur;
720
if ((c = rsnotify(rs, RS_WRITE, &obj, &out, rs->disc)) < 0)
721
goto done;
722
if (c == RS_DELETE)
723
{ out.datalen = 0;
724
break;
725
}
726
if (w >= out.datalen)
727
break;
728
RSRESERVE(rs,rsrv,endrsrv,cur,out.datalen, goto done);
729
}
730
cur += out.datalen;
731
} while (c == RS_INSERT);
732
}
733
else
734
MEMCPY(cur,mgcur,len);
735
}
736
}
737
#if _PACKAGE_ast
738
else if (rsc & ~0xff)
739
{ for(; n_obj > 0; --n_obj)
740
{ MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,sizeof(ssize_t),break);
741
if ((len = reclen(rsc, mgcur, sizeof(ssize_t))) < 0)
742
{ MGSETEOF(mg);
743
goto done;
744
}
745
MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,len,break);
746
RSRESERVE(rs,rsrv,endrsrv,cur,len, goto done);
747
if (notify)
748
{ obj.data = mgcur;
749
mgcur += len;
750
obj.datalen = len;
751
do
752
{ for (;;)
753
{ out.data = cur;
754
out.datalen = w = endrsrv - cur;
755
if ((c = rsnotify(rs, RS_WRITE, &obj, &out, rs->disc)) < 0)
756
goto done;
757
if (c == RS_DELETE)
758
{ out.datalen = 0;
759
break;
760
}
761
if (w >= out.datalen)
762
break;
763
RSRESERVE(rs,rsrv,endrsrv,cur,out.datalen, goto done);
764
}
765
cur += out.datalen;
766
} while (c == RS_INSERT);
767
}
768
else
769
MEMCPY(cur,mgcur,len);
770
}
771
}
772
#endif
773
else
774
{ for(; n_obj > 0; --n_obj)
775
{ MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,sizeof(ssize_t),break);
776
d = (uchar*)(&len); MEMCPY(d,mgcur,sizeof(ssize_t));
777
MGRESERVE(mg,mgrsrv,mgendrsrv,mgcur,len,break);
778
779
if(rs->type&RS_OTEXT)
780
RSRESERVE(rs,rsrv,endrsrv,cur,len, goto done);
781
else
782
{ w = len + sizeof(ssize_t);
783
RSRESERVE(rs,rsrv,endrsrv,cur,w, goto done);
784
d = (uchar*)(&len); MEMCPY(cur,d,sizeof(ssize_t));
785
}
786
787
if (notify)
788
{ obj.data = mgcur;
789
mgcur += len;
790
obj.datalen = len;
791
do
792
{ for (;;)
793
{ out.data = cur;
794
out.datalen = w = endrsrv - cur;
795
if ((c = rsnotify(rs, RS_WRITE, &obj, &out, rs->disc)) < 0)
796
goto done;
797
if (c == RS_DELETE)
798
{ out.datalen = 0;
799
break;
800
}
801
if (w >= out.datalen)
802
break;
803
RSRESERVE(rs,rsrv,endrsrv,cur,out.datalen, goto done);
804
}
805
cur += out.datalen;
806
} while (c == RS_INSERT);
807
}
808
else
809
MEMCPY(cur,mgcur,len);
810
}
811
}
812
813
if(n > 0)
814
break;
815
}
816
ret = 0;
817
818
done:
819
if(!(rs->rsrv = rsrv) )
820
rs->endrsrv = rs->cur = NIL(uchar*);
821
else
822
{ rs->endrsrv = endrsrv;
823
rs->cur = cur;
824
}
825
if(!(mg->rsrv = mgrsrv) )
826
mg->endrsrv = mg->cur = NIL(uchar*);
827
else
828
{ mg->endrsrv = mgendrsrv;
829
mg->cur = mgcur;
830
}
831
832
return ret;
833
}
834
835
/* write out a bunch of records from stream f */
836
#if __STD_C
837
static int mgwrite(reg Rs_t* rs, reg Merge_t* mg, reg int n)
838
#else
839
static int mgwrite(rs, mg, n)
840
reg Rs_t* rs;
841
reg Merge_t* mg; /* stream being output */
842
reg int n; /* total in equivalence class */
843
#endif
844
{
845
reg Rsobj_t *obj;
846
847
if(rs->type&RS_ITEXT) /* output entire equivalence class */
848
{ reg int reverse = rs->type&RS_REVERSE;
849
Rsobj_t first, *o, *t, *endobj;
850
851
obj = mg->obj+mg->cpos; o = &first;
852
o->data = (uchar*)vmalloc(Vmheap,obj->datalen+obj->keylen);
853
o->key = o->data + obj->datalen;
854
memcpy(o->data,obj->data,obj->datalen); o->datalen = obj->datalen;
855
memcpy(o->key,obj->key,obj->keylen); o->keylen = obj->keylen;
856
o->order = obj->order;
857
for(endobj = mg->obj+mg->cend;; )
858
{ APPEND(rs,obj,t);
859
if((obj += 1) >= endobj)
860
{ mg->cpos = mg->cend;
861
if(mgrefresh(rs,mg) < 0)
862
break;
863
else endobj = (obj = mg->obj)+mg->cend;
864
}
865
if(MGCOMPARE(rs,o,obj,reverse) != 0)
866
break;
867
}
868
mg->cpos = obj-mg->obj;
869
vmfree(Vmheap,o->data);
870
}
871
else
872
{ if(rs->sorted)
873
mgflush(rs);
874
if(mg->cpos < mg->cend)
875
{ /* write out head object with count */
876
obj = mg->obj + mg->cpos;
877
obj->order = n;
878
obj->right = NIL(Rsobj_t*);
879
rs->sorted = obj;
880
RSWRITE(rs,rs->f,rs->type&RS_TEXT);
881
rs->sorted = NIL(Rsobj_t*);
882
mg->cpos += 1;
883
}
884
if(mg->match > 0) /* output the rest of the equi-class */
885
{ if(mgmove(rs,mg,mg->match) < 0)
886
return -1;
887
mg->match = 0;
888
}
889
}
890
891
return 0;
892
}
893
894
#if __STD_C
895
static int mgerror(Rs_t* rs, Merge_t** list, int n)
896
#else
897
static int mgerror(rs, list, n)
898
Rs_t* rs;
899
Merge_t** list;
900
int n;
901
#endif
902
{
903
reg int k;
904
reg Merge_t *mg, *e;
905
906
for(k = 0; k <= n; ++k)
907
{ for(mg = list[k]; mg; mg = e)
908
{ e = mg->equi;
909
mgclose(rs, mg);
910
}
911
}
912
913
rsclear(rs);
914
vmfree(Vmheap, list);
915
916
return -1;
917
}
918
919
/* merging streams of sorted records */
920
#if __STD_C
921
int rsmerge(Rs_t* rs, Sfio_t* f, Sfio_t** files, int n, int type)
922
#else
923
int rsmerge(rs, f, files, n, type)
924
Rs_t* rs; /* sorting context */
925
Sfio_t* f; /* output stream */
926
Sfio_t** files; /* streams to be merged */
927
int n; /* number of such streams */
928
int type; /* RS_ITEXT|RS_OTEXT */
929
#endif
930
{
931
reg Rsobj_t *obj, *o, *t, *endobj;
932
reg Merge_t *mg, **list;
933
reg Merge_t *p, *m;
934
reg ssize_t k, r, n_list;
935
reg int uniq = rs->type&RS_UNIQ;
936
reg int reverse = rs->type&RS_REVERSE;
937
reg int flags;
938
939
if(n <= 0)
940
return 0;
941
942
/* make sure f is writable */
943
flags = sfset(f,0,0);
944
if(!(flags&SF_WRITE))
945
return -1;
946
sfset(f,(SF_READ|SF_SHARE|SF_PUBLIC),0);
947
948
rsclear(rs);
949
950
if(!(list = (Merge_t**)vmalloc(Vmheap,n*sizeof(Merge_t*))) )
951
return -1;
952
953
rs->f = f;
954
rs->rsrv = rs->endrsrv = rs->cur = NIL(uchar*);
955
rs->type = (rs->type&~RS_TEXT) | (type&RS_TEXT);
956
957
/* construct a list of streams sorted in reverse order */
958
for(n_list = 0, k = 0; k < n; ++k)
959
if((mg = mgopen(rs,files[k],k)) )
960
n_list = mginsert(rs,list,n_list,mg);
961
962
while(n_list > 0)
963
{ mg = list[n_list -= 1];
964
if(mg->equi) /* hitting an equi-class across streams */
965
{ if(uniq)
966
{ /* we assume here that mg->f is RS_UNIQ */
967
obj = mg->obj+mg->cpos; mg->cpos += 1;
968
if(rs->events & RS_SUMMARY)
969
{ for(m = mg->equi; m; m = m->equi)
970
{ o = m->obj+m->cpos;
971
EQUAL(obj,o,t);
972
}
973
obj->equal->left->right = NIL(Rsobj_t*);
974
}
975
APPEND(rs,obj,t);
976
for(;;)
977
{ m = mg->equi;
978
if(mg->cpos >= mg->cend && mgrefresh(rs,mg) < 0)
979
{ if (mgclose(rs,mg) < 0)
980
return mgerror(rs,list,n_list-1);
981
}
982
else n_list = mginsert(rs,list,n_list,mg);
983
if(!(mg = m) )
984
break;
985
else mg->cpos += 1;
986
}
987
}
988
else /* write out the union of the equi-class */
989
{ for(k = 0, m = mg; m; m = m->equi)
990
k += m->match > 0 ? m->match+1 : 1;
991
if(mgwrite(rs,mg,k) < 0)
992
return mgerror(rs,list,n_list);
993
for(;;)
994
{ m = mg->equi;
995
if(mg->cpos >= mg->cend && mgrefresh(rs,mg) < 0)
996
{ if (mgclose(rs,mg) < 0)
997
return mgerror(rs,list,n_list-1);
998
}
999
else n_list = mginsert(rs,list,n_list,mg);
1000
if(!(mg = m))
1001
break;
1002
else if(mgwrite(rs,mg,0) < 0)
1003
return mgerror(rs,list,n_list);
1004
}
1005
}
1006
}
1007
else if((k = n_list-1) >= 0)
1008
{ o = list[k]->obj + list[k]->cpos;
1009
obj = mg->obj+mg->cpos;
1010
for(;;)
1011
{ if(mg->match > 0)
1012
{ if(mgwrite(rs,mg,mg->match+1) < 0)
1013
return mgerror(rs,list,n_list);
1014
}
1015
else
1016
{ for(endobj = mg->obj+mg->cend;; )
1017
{ APPEND(rs,obj,t);
1018
if((obj += 1) >= endobj)
1019
{ mg->cpos = mg->cend;
1020
break;
1021
}
1022
else if((r = MGCOMPARE(rs,obj,o,reverse))
1023
>= 0 )
1024
{ mg->cpos = obj - mg->obj;
1025
goto move_stream;
1026
}
1027
}
1028
}
1029
1030
if(mgrefresh(rs,mg) < 0)
1031
{ if (mgclose(rs,mg) < 0)
1032
return mgerror(rs,list,n_list-1);
1033
break;
1034
}
1035
else
1036
{ obj = mg->obj + mg->cpos;
1037
if((r = MGCOMPARE(rs,obj,o,reverse)) < 0)
1038
continue;
1039
}
1040
1041
move_stream:
1042
if(r == 0) /* new equi-class */
1043
{ for(p = NIL(Merge_t*), m = list[k];; )
1044
if(mg->pos < m->pos || !(p=m, m=m->equi))
1045
break;
1046
mg->equi = m;
1047
if(p) p->equi = mg;
1048
else list[k] = mg;
1049
}
1050
else /* new least element */
1051
{ list[n_list] = list[k];
1052
if(k == 0)
1053
{ n_list = 2;
1054
list[0] = mg;
1055
}
1056
else if(mginsert(rs,list,k,mg) == k)
1057
list[k] = list[n_list];
1058
else n_list += 1;
1059
}
1060
break;
1061
}
1062
}
1063
else /* if(!mg->equi && n_list == 0) */
1064
{ if(mg->match > 0)
1065
{ if(mgwrite(rs,mg,mg->match+1) < 0)
1066
return mgerror(rs,list,n_list);
1067
}
1068
else if(mg->match < 0 || mg->cpos < mg->cend )
1069
{ if(mg->cpos >= mg->cend && mgrefresh(rs,mg) < 0)
1070
return mgerror(rs,list,n_list);
1071
1072
/* count all pending objects */
1073
for(obj = rs->sorted, k = 0; obj; obj = obj->right)
1074
k += 1;
1075
k += mg->cend - mg->cpos;
1076
1077
/* add cached objects to output list */
1078
obj = mg->obj + mg->cpos; endobj = mg->obj + mg->cend;
1079
for(; obj < endobj; ++obj)
1080
APPEND(rs,obj,t);
1081
mg->cpos = mg->cend;
1082
1083
/* write pending objects out with the "right count" */
1084
rs->sorted->order = mg->match-k;
1085
rs->sorted->left->right = NIL(Rsobj_t*);
1086
if(RSWRITE(rs,rs->f,rs->type&RS_TEXT) < 0)
1087
return mgerror(rs,list,n_list);
1088
rs->sorted = NIL(Rsobj_t*);
1089
1090
if(mg->match < 0 && mgmove(rs,mg,-mg->match) < 0)
1091
return mgerror(rs,list,n_list);
1092
}
1093
1094
/* now do the remainder */
1095
if (mgmove(rs,mg,-1) < 0)
1096
return mgerror(rs,list,n_list);
1097
if (mgclose(rs,mg) < 0)
1098
return mgerror(rs,list,n_list);
1099
}
1100
}
1101
1102
RSSYNC(rs); /* finish up any partial write buffer */
1103
1104
rs->f = NIL(Sfio_t*);
1105
rs->type &= RS_TYPES;
1106
1107
rsclear(rs);
1108
vmfree(Vmheap,list);
1109
1110
sfset(f,(flags&(SF_READ|SF_SHARE|SF_PUBLIC)),1);
1111
1112
return 0;
1113
}
1114
1115