#include "dttest.h"
#include <vmalloc.h>
#include <sys/mman.h>
#include <sys/time.h>
#ifndef N_PROC
#define N_PROC 48
#endif
#if N_PROC < 8
#undef N_PROC
#define N_PROC 8
#endif
#define N_INSERT (N_PROC/3)
#define N_DELETE (N_PROC/3)
#define N_SEARCH (N_PROC/3)
#define N_WRITER (N_INSERT+N_DELETE)
#define N_PROCESS (N_INSERT+N_DELETE+N_SEARCH)
#define W_EXTENT (2604*N_PROCESS)
#define COLLISION (N_INSERT/2)
#define DTMETHOD Dtrhset
#define LONGGONE (-1000000)
static int Count[N_INSERT*W_EXTENT];
static char *Mapstore, *Shmstore;
#define MEMSIZE (4*N_INSERT*W_EXTENT*(sizeof(Obj_t)+16) + 64*1024*1024)
#define CDT_DATA 1
#define CDT_WRITER 2
#define CDT_SEARCHER 3
#define CDT_FINISHED 4
#define CDT_INSERT 5
#define CDT_DELETE 6
#define CDT_INCOMPLETE 7
typedef struct obj_s
{ Dtlink_t link;
int dval;
char* sval;
int ready;
int refn;
int type;
int opid;
int ins;
int del;
int srch;
int free;
int fpid;
struct obj_s* next;
} Obj_t;
typedef struct _mmdisc_s
{ Dtdisc_t disc;
Vmalloc_t* vm;
ssize_t ndel;
Obj_t* list;
int pid;
} Mmdisc_t;
Void_t* mmmemory(Dt_t* dt, Void_t* data, size_t size, Dtdisc_t* disc)
{
return vmresize(((Mmdisc_t*)disc)->vm, data, size, 0);
}
static int mmevent(Dt_t* dt, int type, Void_t* data, Dtdisc_t* disc)
{
Void_t *ctrl;
Mmdisc_t *mmdc = (Mmdisc_t*)disc;
if(type == DT_OPEN)
{ ctrl = vmmvalue(mmdc->vm, CDT_DATA, (Void_t*)0, VM_MMGET);
if(data)
{ if(!ctrl)
return 0;
else
{ *((Void_t**)data) = ctrl;
return 1;
}
}
else return 0;
}
else if(type == DT_ENDOPEN)
{ ctrl = vmmvalue(mmdc->vm, CDT_DATA, (Void_t*)0, VM_MMGET);
if(!ctrl)
{ ctrl = vmmvalue(mmdc->vm, CDT_DATA, (Void_t*)dt->data, VM_MMSET);
return ctrl == (Void_t*)dt->data ? 0 : -1;
}
else return 0;
}
else if(type == DT_CLOSE)
return 1;
else if(type == DT_ENDCLOSE)
{ vmmrelease(mmdc->vm, 0);
vmclose(mmdc->vm);
mmdc->vm = NIL(Vmalloc_t*);
return 0;
}
else if(type & DT_ANNOUNCE)
{ Obj_t *obj = (Obj_t*)data;
if(obj->refn < 0 )
{ if(obj->refn != LONGGONE)
terror("Process %d: refn != LONGGONE op=%d obj[%d,refn=%d,fpid=%d]",
mmdc->pid, type&~DT_ANNOUNCE, obj->dval, obj->refn, obj->fpid);
if(!(type&DT_DELETE) )
tpause("Process %d: Op=%d != DT_DELETE obj[%d,refn=%d,fpid=%d]",
mmdc->pid, type&~DT_ANNOUNCE, obj->dval, obj->refn, obj->fpid);
}
obj->type = type & ~DT_ANNOUNCE;
obj->opid = mmdc->pid;
if(!(type & DT_DELETE) )
{
if(type & ~(DT_ANNOUNCE|DT_INSERT|DT_ATTACH) )
while(obj->ready == 0 )
asorelax(1);
if(asoincint(&obj->refn) < 0 )
tpause("Process %d: refn<0 on adding op=%d obj[%d,refn=%d,fpid=%d]",
mmdc->pid, type&~DT_ANNOUNCE, obj->dval, obj->refn, obj->fpid);
}
return 0;
}
else return 0;
}
static int mmcompare(Dt_t* dt, Void_t* key1, Void_t* key2, Dtdisc_t* disc)
{
return *((int*)key1) - *((int*)key2);
}
static void mmfree(Dt_t* dt, Void_t* objarg, Dtdisc_t* disc)
{
int refn;
Obj_t *obj = (Obj_t*)objarg;
Mmdisc_t *mmdc = (Mmdisc_t*)dt->disc;
if(obj->free > 0 )
terror("Process %d: multiple deletion? obj[%d,sval=%s,free=%d,pid=%d,refn=%d]",
mmdc->pid, obj->dval, obj->sval, obj->free, obj->fpid, obj->refn );
while(!obj->ready)
asorelax(1);
obj->fpid = mmdc->pid;
obj->free += 1;
while(obj->refn > 0)
asorelax(1);
if((refn = (int)asocasint(&obj->refn, 0, (uint)LONGGONE)) != 0 )
terror("Process %d: refn=%d > 0? obj[%d,sval=%s,free=%d,pid=%d,refn=%d,op=%d]",
mmdc->pid, refn, obj->dval, obj->sval, obj->free, obj->fpid, obj->refn, obj->type );
obj->dval = -obj->dval;
obj->next = mmdc->list;
mmdc->list = obj;
mmdc->ndel += 1;
}
static Dt_t* opendictionary(char* actor, char* type, int num, pid_t pid, char* store)
{
Vmalloc_t *vm;
Dt_t *dt;
ssize_t size;
static Mmdisc_t Mmdc;
if(!(vm = vmmopen(store, store == Mapstore ? -1 : 1, MEMSIZE)) )
terror("%s %s [num=%d,pid=%d]: Couldn't create vmalloc region", actor, type, num, pid);
Mmdc.disc.key = (ssize_t)DTOFFSET(Obj_t,dval);
Mmdc.disc.size = (ssize_t)sizeof(int);
Mmdc.disc.link = (ssize_t)DTOFFSET(Obj_t,link);
Mmdc.disc.makef = (Dtmake_f)0;
Mmdc.disc.freef = mmfree;
Mmdc.disc.comparf = mmcompare;
Mmdc.disc.hashf = (Dthash_f)0;
Mmdc.disc.memoryf = mmmemory;
Mmdc.disc.eventf = mmevent;
Mmdc.vm = vm;
Mmdc.ndel = 0;
Mmdc.list = (Obj_t*)0;
Mmdc.pid = (int)getpid();
if(!(dt = dtopen(&Mmdc.disc, DTMETHOD)) )
terror("%s %s [num=%d,pid=%d]: Can't open dictionary", actor, type, num, pid);
if(dtcustomize(dt, (DT_ANNOUNCE|DT_SHARE), 1) != (DT_ANNOUNCE|DT_SHARE) )
terror("%s %s [num=%d,pid=%d]: Can't customize dictionary", actor, type, num, pid);
return dt;
}
static pid_t makeprocess(char* proc, char* type, char* actor, int num, char* aso)
{
int i;
pid_t pid;
char text[16];
char *argv[9];
if((pid = fork()) < 0 )
terror("%s %s [num=%d]: Could not fork() a subprocess", actor, type, num);
else if(pid > 0 )
return pid;
else
{ sprintf(text, "%d", num);
i = 0;
argv[i++] = proc;
if (aso)
argv[i++] = aso;
argv[i++] = "--child";
argv[i++] = Mapstore;
argv[i++] = Shmstore;
argv[i++] = type;
argv[i++] = actor;
argv[i++] = text;
argv[i++] = 0;
if(execv(proc, argv) < 0 )
terror("%s %s [num=%d]: Could not execv() process %s %s", actor, type, num, proc, type);
}
return -1;
}
static int readwrite(char* type, char* store, char* actor, char* procnum)
{
int i, k, p, num, base, insert, delete, search, unsrch, undel, unins, walk, first, size;
char *sval;
Obj_t obj, *o, *next, *rv;
Dt_t *dt;
pid_t pid;
Mmdisc_t *mmdc;
num = atoi(procnum);
if((pid = getpid()) < 0 )
terror("%s %s [num=%d]: can't get process id", actor, type, num);
if(!(dt = opendictionary(actor, type, num, pid, store)) )
terror("%s %s [num=%d,pid=%d]: can't open dictionary", actor, type, num, pid);
if(!(mmdc = (Mmdisc_t*)dtdisc(dt, NIL(Dtdisc_t*), 0)) )
terror("%s %s [num=%d,pid=%d]: can't get dictionary discipline", actor, type, num, pid);
if(strcmp(actor, "inserter") == 0 || strcmp(actor, "deleter") == 0)
{ p = (int)((long)vmmvalue(mmdc->vm, CDT_WRITER, (Void_t*)1, VM_MMADD));
for(; p < N_WRITER; asorelax(1) )
p = (int)((long)vmmvalue(mmdc->vm, CDT_WRITER, (Void_t*)0, VM_MMGET));
}
else
{ p = (int)((long)vmmvalue(mmdc->vm, CDT_SEARCHER, (Void_t*)1, VM_MMADD));
k = (int)((long)vmmvalue(mmdc->vm, CDT_WRITER, (Void_t*)0, VM_MMGET));
for(; p+k < N_PROCESS; asorelax(1) )
{ p = (int)((long)vmmvalue(mmdc->vm, CDT_SEARCHER, (Void_t*)0, VM_MMGET));
k = (int)((long)vmmvalue(mmdc->vm, CDT_WRITER, (Void_t*)0, VM_MMGET));
}
}
#define BASE(n) ((n)*W_EXTENT + 1)
walk = first = insert = delete = search = unsrch = unins = undel = 0;
if(strcmp(actor, "inserter") == 0 )
{ base = BASE((num/COLLISION)*COLLISION);
tinfo("%s %s [num=%d,pid=%d]: range=[%d,%d) ready to go", actor, type, num, pid, base, base+W_EXTENT);
for(i = 0; i < W_EXTENT; ++i)
{
if(!(o = (Obj_t*)vmalloc(mmdc->vm, sizeof(Obj_t))) )
terror("%s %s [num=%d,pid=%d]: vmalloc failed", actor, type, num, pid);
memset(o, 0, sizeof(Obj_t));
o->dval = i+base;
if(!(rv = dtinsert(dt, o)) )
terror("%s %s [num=%d,pid=%d]: insert failed", actor, type, num, pid);
if(rv == o)
{ insert += 1;
if(!(sval = vmalloc(mmdc->vm, 16)) )
terror("%s %s [num=%d,pid=%d]: vmalloc failed", actor, type, num, pid);
sprintf(sval, "%d", o->dval);
o->sval = sval;
if(o->dval < 0)
terror("%s %s [num=%d,pid=%d]: already freed? obj[dval=%d,pid=%d]",
actor, type, num, pid, o->dval, o->fpid);
if(o->refn != 1 )
terror("%s %s [num=%d,pid=%d]: refn != 1? obj[dval=%d,refn=%d,op=%d,opid=%d]",
actor, type, num, pid, o->dval, o->refn, o->type, o->opid);
o->ready = 1;
asodecint(&o->refn);
}
else
{ unins += 1;
vmfree(mmdc->vm, o);
if(rv->refn <= 0 || rv->dval < 0)
terror("%s %s [num=%d,pid=%d]: refn <= 0? obj[dval=%d,refn=%d,op=%d,opid=%d]",
actor, type, num, pid, o->dval, o->refn, o->type, o->opid);
if(!rv->sval)
vmmvalue(mmdc->vm, CDT_INCOMPLETE, (Void_t*)1, VM_MMADD);
asodecint(&rv->refn);
}
}
tinfo("%s %s [num=%d,pid=%d]: done, base=%d try=%d insert=%d[+%d]",
actor, type, num, pid, base, W_EXTENT, insert, unins);
vmmvalue(mmdc->vm, CDT_INSERT, (Void_t*)((long)insert), VM_MMADD );
}
else if(strcmp(actor, "deleter") == 0 )
{ srandom(num);
size = dtsize(dt);
tinfo("%s %s [num=%d,pid=%d]: dtsize=%d ready to go", actor, type, num, pid, size);
for(o = dtfirst(dt); o; o = next )
{ next = dtnext(dt,o);
walk += 1;
if(o->refn <= 0 )
terror("%s %s [num=%d,pid=%d]: refn <= 0? obj[dval=%d,refn=%d,op=%d,opid=%d]",
actor, type, num, pid, o->dval, o->refn, o->type, o->opid);
if(!o->sval)
vmmvalue(mmdc->vm, CDT_INCOMPLETE, (Void_t*)1, VM_MMADD);
asodecint(&o->refn);
if(!(rv = dtdelete(dt,o)) )
undel += 1;
else delete += 1;
}
tinfo("%s %s [num=%d,pid=%d]: done, size=%d[walk=%d] delete=%d[+%d]",
actor, type, num, pid, size, walk, delete, undel);
size = 0;
for(o = mmdc->list; o; o = next)
{ next = o->next;
size += 1;
if(o->free != 1)
terror("%s %s [num=%d,pid=%d]: multiple delete? obj[dval=%d,free=%d,fpid=%d]",
actor, type, num, mmdc->pid, o->dval, o->free, o->fpid);
if(o->fpid != mmdc->pid)
terror("%s %s [num=%d,pid=%d]: Wrong deleter obj[dval=%d,free=%d,fpid=%d]",
actor, type, num, mmdc->pid, o->dval, o->free, o->fpid);
}
if(size != delete)
terror("%s %s [num=%d,pid=%d]: free=%d delete=%d", actor, type, num, pid, size, delete);
vmmvalue(mmdc->vm, CDT_DELETE, (Void_t*)((long)delete), VM_MMADD );
}
else if(strcmp(actor, "searcher") == 0 )
{ srandom(num);
size = dtsize(dt);
tinfo("%s %s [num=%d,pid=%d]: dtsize=%d ready to go", actor, type, num, pid, size);
size = size > COLLISION*W_EXTENT ? size : COLLISION*W_EXTENT;
if(num%2 == 0 )
{ for(k = 0; k < size; ++k)
{ obj.dval = random()%(N_INSERT*W_EXTENT) + 1;
if(!(rv = dtsearch(dt, &obj)) )
unsrch += 1;
else
{ search += 1;
if(rv->dval < 0)
terror("%s %s [num=%d,pid=%d]: already freed? obj[%d,fpid=%d]",
actor, type, num, pid, rv->dval, rv->fpid);
if(rv->refn <= 0)
terror("%s %s [num=%d,pid=%d]: refn <= 0? obj[%d,refn=%d,op=%d,opid=%d]",
actor, type, num, pid, rv->dval, rv->refn, rv->type, rv->opid);
if(!rv->sval)
vmmvalue(mmdc->vm, CDT_INCOMPLETE, (Void_t*)1, VM_MMADD);
asodecint(&rv->refn);
}
}
size = dtsize(dt);
tinfo("%s %s [num=%d,pid=%d]: done, dtsize=%d search=%d[+%d]",
actor, type, num, pid, size, search, unsrch);
}
else
{ obj.dval = 0;
for(k = 0; k < size; ++k)
{ if((o = dtnext(dt, &obj)) )
walk += 1;
else if((o = dtfirst(dt)) )
first += 1;
else break;
if(o->dval < 0)
terror("%s %s [num=%d,pid=%d]: already freed obj[%d,refn=%d,fpid=%d,op=%d,opid=%d ",
actor, type, num, pid, o->dval, o->refn, o->fpid, o->type, o->opid);
if(o->refn <= 0 )
terror("%s %s [num=%d,pid=%d]: refn<=0? obj[%d,refn=%d,op=%d,opid=%d",
actor, type, num, pid, o->dval, o->refn, o->type, o->opid);
if(!o->sval)
vmmvalue(mmdc->vm, CDT_INCOMPLETE, (Void_t*)1, VM_MMADD);
obj.dval = o->dval;
asodecint(&o->refn);
}
size = dtsize(dt);
tinfo("%s %s [num=%d,pid=%d]: done, dtsize=%d walk=%d first=%d",
actor, type, num, pid, size, walk, first);
}
}
else terror("%s %s [num=%d,pid=%d]: unknown actor", actor, num, pid);
vmmvalue(mmdc->vm, CDT_FINISHED, (Void_t*)1, VM_MMADD);
for(p = 0; p < N_PROCESS; asorelax(1) )
p = (int)((long)vmmvalue(mmdc->vm, CDT_FINISHED, (Void_t*)0, VM_MMGET));
insert = (ssize_t)(unsigned long)vmmvalue(mmdc->vm, CDT_INSERT, 0, VM_MMGET);
delete = (ssize_t)(unsigned long)vmmvalue(mmdc->vm, CDT_DELETE, 0, VM_MMGET);
size = dtsize(dt);
for(walk = 0, o = dtfirst(dt); o; o = dtnext(dt,o) )
walk += 1;
tinfo("%s %s [num=%d,pid=%d]: start walk, dtsize=%d, dtfirst/next=%d ins-del=%d",
actor, type, num, pid, size, walk, insert-delete);
if(insert < delete)
terror("%s %s [num=%d,pid=%d]: insert=%d < delete=%d", actor, type, num, pid, insert, delete);
if(walk != size)
terror("%s %s [num=%d,pid=%d]: dtsize=%d != walk=%d", actor, type, num, pid, size, walk);
i = strcmp(actor, "inserter") == 0 ? 1 : strcmp(actor, "deleter") == 0 ? 2 : 3;
for(k = 0, o = (Obj_t*)dtfirst(dt); o; ++k, o = (Obj_t*)dtnext(dt,o) )
{ if(o->dval < 0)
terror("%s %s [num=%d,pid=%d]: object %d already freed",
actor, type, num, pid, o->dval);
if(i == 1)
{ asoincint(&o->ins);
#ifdef DEBUG
tlog(pid, "%d", o->dval);
#endif
}
else if(i == 2)
{ asoincint(&o->del);
#ifdef DEBUG
tlog(pid, "%d", o->dval);
#endif
}
else
{ asoincint(&o->srch);
#ifdef DEBUG
tlog(pid, "%d", o->dval);
#endif
}
}
dtclose(dt);
return 0;
}
tmain()
{
pid_t wpid[N_INSERT+N_DELETE+N_SEARCH], ppid, pid;
size_t size, walk, i, k, p;
int t;
struct timeval begtm, endtm;
Dt_t *dt;
Obj_t *o;
Mmdisc_t *mmdc;
Vmalloc_t *vm;
char *aso, *a, *cmd, *store, *type;
cmd = *argv;
aso = taso(ASO_PROCESS);
if(k = tchild())
{ Mapstore = argv[k++];
Shmstore = argv[k++];
type = argv[k++];
if(strcmp(type, "map") == 0 )
store = Mapstore;
else if(strcmp(type, "shm") == 0)
store = Shmstore;
else
terror("%s: invalid store type -- { map shm } expected", type);
a = argv[k++];
if(strcmp(a, "inserter") == 0 ||
strcmp(a, "deleter") == 0 ||
strcmp(a, "searcher") == 0 )
return readwrite(type, store, a, argv[k]);
terror("%s: invalid child process operation -- { inserter deleter searcher } expected", a);
}
else if(*++argv)
t = -1;
else
t = 0;
if((ppid = getpid()) < 0 )
terror("Parent: can't get process id");
Mapstore = tstfile("map", -1);
Shmstore = tstfile("shm", -1);
(void)unlink(Mapstore);
(void)unlink(Shmstore);
for(; t < 2; t++)
{ switch (t)
{
case -1:
type = *argv++;
if(strcmp(type, "map") == 0 )
store = Mapstore;
else if(strcmp(type, "shm") == 0)
store = Shmstore;
else
terror("%s: invalid store type -- { map shm } expected", type);
t = *argv ? -2 : 2;
break;
case 0:
type = "map";
store = Mapstore;
#if __sun
twarn("Skipping %s test on __sun because high mmap() memory sync frequency makes it crawl", type);
continue;
#else
break;
#endif
case 1:
type = "shm";
store = Shmstore;
break;
}
tinfo("parent %s [pid=%d]: Testing %s concurrent accesses", type, ppid, type);
gettimeofday(&begtm, 0);
tinfo("parent %s [pid=%d]: initializing dictionary", type, ppid);
if(!(dt = opendictionary("parent", type, 0, ppid, store)) )
terror("parent %s [pid=%d]: Can't open %s dictionary", type, ppid, type);
if(!(mmdc = (Mmdisc_t*)dtdisc(dt, (Dtdisc_t*)0, 0)) )
terror("parent %s [pid=%d]: Can't get dictionary discipline", type, ppid);
tinfo("parent %s [pid=%d]: share dictionary created", type, ppid);
for(p = i = 0; i < N_INSERT; ++i)
if((wpid[p++] = makeprocess(cmd, type, "inserter", i, aso)) < 0 )
terror("parent %s [pid=%d]: Could not make inserter process %d", type, ppid, i);
for(i = 0; i < N_DELETE; ++i)
if((wpid[p++] = makeprocess(cmd, type, "deleter", i, aso)) < 0 )
terror("parent %s [pid=%d]: Could not make deleter process %d", type, ppid, i);
for(i = 0; i < N_SEARCH; ++i)
if((wpid[p++] = makeprocess(cmd, type, "searcher", i, aso)) < 0 )
terror("parent %s [pid=%d]: Could not make searcher process %d", type, ppid, i);
if (twait(wpid, p))
terror("workload subprocess error");
walk = 0;
for(o = (Obj_t*)dtfirst(dt); o; o = (Obj_t*)dtnext(dt,o) )
{ if(o->dval < 0 || o->dval > N_INSERT*W_EXTENT)
terror("parent %s [pid=%d]: bad object", type, ppid);
if(o->ins != N_INSERT)
terror("parent %s [pid=%d]: object %d has wrong insert count %d",
type, ppid, o->dval, o->ins);
if(o->srch != N_SEARCH)
terror("parent %s [pid=%d]: object %d has wrong search count %d",
type, ppid, o->dval, o->srch);
if(o->del != N_DELETE)
terror("parent %s [pid=%d]: object %d has wrong delete count %d",
type, ppid, o->dval, o->del);
#ifdef DEBUG
tlog(ppid, "%d", o->dval);
#endif
Count[o->dval] += 1;
walk += 1;
}
for(k = 0; k < N_INSERT*W_EXTENT; ++k)
if(Count[k] > 1)
terror("parent %s [pid=%d]: Count[%d] = %d > 1", type, ppid, k, Count[k]);
size = (ssize_t)dtsize(dt);
i = (ssize_t)(unsigned long)vmmvalue(mmdc->vm, CDT_INCOMPLETE, (Void_t*)0, VM_MMGET);
tinfo("parent %s [pid=%d]: dtfirst/dtnext=%d dtsize=%d, incomplete=%d, no error.",
type, ppid, walk, size, i);
if(size != walk)
terror("parent %s [pid=%d]: counts mismatched", type, ppid);
tinfo("Storage type=%s, #insertions=%d #inserters=%d #deleters=%d #searchers=%d",
type, N_INSERT*W_EXTENT, N_INSERT, N_DELETE, N_SEARCH);
gettimeofday(&endtm, 0);
if(begtm.tv_usec > endtm.tv_usec)
{ endtm.tv_sec -= 1;
endtm.tv_usec += 1000000;
}
tinfo("Storage method %s: running time = %ld.%lds",
DTMETHOD->name, endtm.tv_sec - begtm.tv_sec, endtm.tv_usec - begtm.tv_usec);
vmmrelease(mmdc->vm, 1);
dtclose(dt);
}
texit(0);
}