Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/contrib/zstd/programs/fileio_asyncio.c
289024 views
1
/*
2
* Copyright (c) Meta Platforms, Inc. and affiliates.
3
* All rights reserved.
4
*
5
* This source code is licensed under both the BSD-style license (found in the
6
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
7
* in the COPYING file in the root directory of this source tree).
8
* You may select, at your option, one of the above-listed licenses.
9
*/
10
11
#include "platform.h"
12
#include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
13
#include <stdlib.h> /* malloc, free */
14
#include <assert.h>
15
#include <errno.h> /* errno */
16
17
#if defined (_MSC_VER)
18
# include <sys/stat.h>
19
# include <io.h>
20
#endif
21
22
#include "fileio_asyncio.h"
23
#include "fileio_common.h"
24
25
/* **********************************************************************
26
* Sparse write
27
************************************************************************/
28
29
/** AIO_fwriteSparse() :
30
* @return : storedSkips,
31
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
32
static unsigned
33
AIO_fwriteSparse(FILE* file,
34
const void* buffer, size_t bufferSize,
35
const FIO_prefs_t* const prefs,
36
unsigned storedSkips)
37
{
38
const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
39
size_t bufferSizeT = bufferSize / sizeof(size_t);
40
const size_t* const bufferTEnd = bufferT + bufferSizeT;
41
const size_t* ptrT = bufferT;
42
static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
43
44
if (prefs->testMode) return 0; /* do not output anything in test mode */
45
46
if (!prefs->sparseFileSupport) { /* normal write */
47
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
48
if (sizeCheck != bufferSize)
49
EXM_THROW(70, "Write error : cannot write block : %s",
50
strerror(errno));
51
return 0;
52
}
53
54
/* avoid int overflow */
55
if (storedSkips > 1 GB) {
56
if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
57
EXM_THROW(91, "1 GB skip error (sparse file support)");
58
storedSkips -= 1 GB;
59
}
60
61
while (ptrT < bufferTEnd) {
62
size_t nb0T;
63
64
/* adjust last segment if < 32 KB */
65
size_t seg0SizeT = segmentSizeT;
66
if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
67
bufferSizeT -= seg0SizeT;
68
69
/* count leading zeroes */
70
for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
71
storedSkips += (unsigned)(nb0T * sizeof(size_t));
72
73
if (nb0T != seg0SizeT) { /* not all 0s */
74
size_t const nbNon0ST = seg0SizeT - nb0T;
75
/* skip leading zeros */
76
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
77
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
78
storedSkips = 0;
79
/* write the rest */
80
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
81
EXM_THROW(93, "Write error : cannot write block : %s",
82
strerror(errno));
83
}
84
ptrT += seg0SizeT;
85
}
86
87
{ static size_t const maskT = sizeof(size_t)-1;
88
if (bufferSize & maskT) {
89
/* size not multiple of sizeof(size_t) : implies end of block */
90
const char* const restStart = (const char*)bufferTEnd;
91
const char* restPtr = restStart;
92
const char* const restEnd = (const char*)buffer + bufferSize;
93
assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
94
for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
95
storedSkips += (unsigned) (restPtr - restStart);
96
if (restPtr != restEnd) {
97
/* not all remaining bytes are 0 */
98
size_t const restSize = (size_t)(restEnd - restPtr);
99
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
100
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
101
if (fwrite(restPtr, 1, restSize, file) != restSize)
102
EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
103
strerror(errno));
104
storedSkips = 0;
105
} } }
106
107
return storedSkips;
108
}
109
110
static void
111
AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
112
{
113
if (prefs->testMode) assert(storedSkips == 0);
114
if (storedSkips>0) {
115
assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
116
(void)prefs; /* assert can be disabled, in which case prefs becomes unused */
117
if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
118
EXM_THROW(69, "Final skip error (sparse file support)");
119
/* last zero must be explicitly written,
120
* so that skipped ones get implicitly translated as zero by FS */
121
{ const char lastZeroByte[1] = { 0 };
122
if (fwrite(lastZeroByte, 1, 1, file) != 1)
123
EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
124
} }
125
}
126
127
128
/* **********************************************************************
129
* AsyncIO functionality
130
************************************************************************/
131
132
/* AIO_supported:
133
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
134
int AIO_supported(void) {
135
#ifdef ZSTD_MULTITHREAD
136
return 1;
137
#else
138
return 0;
139
#endif
140
}
141
142
/* ***********************************
143
* Generic IoPool implementation
144
*************************************/
145
146
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
147
IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
148
void* const buffer = malloc(bufferSize);
149
if(!job || !buffer)
150
EXM_THROW(101, "Allocation error : not enough memory");
151
job->buffer = buffer;
152
job->bufferSize = bufferSize;
153
job->usedBufferSize = 0;
154
job->file = NULL;
155
job->ctx = ctx;
156
job->offset = 0;
157
return job;
158
}
159
160
161
/* AIO_IOPool_createThreadPool:
162
* Creates a thread pool and a mutex for threaded IO pool.
163
* Displays warning if asyncio is requested but MT isn't available. */
164
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
165
ctx->threadPool = NULL;
166
ctx->threadPoolActive = 0;
167
if(prefs->asyncIO) {
168
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
169
EXM_THROW(102,"Failed creating ioJobsMutex mutex");
170
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
171
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
172
assert(MAX_IO_JOBS >= 2);
173
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
174
ctx->threadPoolActive = 1;
175
if (!ctx->threadPool)
176
EXM_THROW(104, "Failed creating I/O thread pool");
177
}
178
}
179
180
/* AIO_IOPool_init:
181
* Allocates and sets and a new I/O thread pool including its included availableJobs. */
182
static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
183
int i;
184
AIO_IOPool_createThreadPool(ctx, prefs);
185
ctx->prefs = prefs;
186
ctx->poolFunction = poolFunction;
187
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
188
ctx->availableJobsCount = ctx->totalIoJobs;
189
for(i=0; i < ctx->availableJobsCount; i++) {
190
ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
191
}
192
ctx->jobBufferSize = bufferSize;
193
ctx->file = NULL;
194
}
195
196
197
/* AIO_IOPool_threadPoolActive:
198
* Check if current operation uses thread pool.
199
* Note that in some cases we have a thread pool initialized but choose not to use it. */
200
static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
201
return ctx->threadPool && ctx->threadPoolActive;
202
}
203
204
205
/* AIO_IOPool_lockJobsMutex:
206
* Locks the IO jobs mutex if threading is active */
207
static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
208
if(AIO_IOPool_threadPoolActive(ctx))
209
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
210
}
211
212
/* AIO_IOPool_unlockJobsMutex:
213
* Unlocks the IO jobs mutex if threading is active */
214
static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
215
if(AIO_IOPool_threadPoolActive(ctx))
216
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
217
}
218
219
/* AIO_IOPool_releaseIoJob:
220
* Releases an acquired job back to the pool. Doesn't execute the job. */
221
static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
222
IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
223
AIO_IOPool_lockJobsMutex(ctx);
224
assert(ctx->availableJobsCount < ctx->totalIoJobs);
225
ctx->availableJobs[ctx->availableJobsCount++] = job;
226
AIO_IOPool_unlockJobsMutex(ctx);
227
}
228
229
/* AIO_IOPool_join:
230
* Waits for all tasks in the pool to finish executing. */
231
static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
232
if(AIO_IOPool_threadPoolActive(ctx))
233
POOL_joinJobs(ctx->threadPool);
234
}
235
236
/* AIO_IOPool_setThreaded:
237
* Allows (de)activating threaded mode, to be used when the expected overhead
238
* of threading costs more than the expected gains. */
239
static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
240
assert(threaded == 0 || threaded == 1);
241
assert(ctx != NULL);
242
if(ctx->threadPoolActive != threaded) {
243
AIO_IOPool_join(ctx);
244
ctx->threadPoolActive = threaded;
245
}
246
}
247
248
/* AIO_IOPool_free:
249
* Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
250
static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
251
int i;
252
if(ctx->threadPool) {
253
/* Make sure we finish all tasks and then free the resources */
254
AIO_IOPool_join(ctx);
255
/* Make sure we are not leaking availableJobs */
256
assert(ctx->availableJobsCount == ctx->totalIoJobs);
257
POOL_free(ctx->threadPool);
258
ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
259
}
260
assert(ctx->file == NULL);
261
for(i=0; i<ctx->availableJobsCount; i++) {
262
IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
263
free(job->buffer);
264
free(job);
265
}
266
}
267
268
/* AIO_IOPool_acquireJob:
269
* Returns an available io job to be used for a future io. */
270
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
271
IOJob_t* job;
272
assert(ctx->file != NULL || ctx->prefs->testMode);
273
AIO_IOPool_lockJobsMutex(ctx);
274
assert(ctx->availableJobsCount > 0);
275
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
276
AIO_IOPool_unlockJobsMutex(ctx);
277
job->usedBufferSize = 0;
278
job->file = ctx->file;
279
job->offset = 0;
280
return job;
281
}
282
283
284
/* AIO_IOPool_setFile:
285
* Sets the destination file for future files in the pool.
286
* Requires completion of all queued jobs and release of all otherwise acquired jobs. */
287
static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
288
assert(ctx!=NULL);
289
AIO_IOPool_join(ctx);
290
assert(ctx->availableJobsCount == ctx->totalIoJobs);
291
ctx->file = file;
292
}
293
294
static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
295
return ctx->file;
296
}
297
298
/* AIO_IOPool_enqueueJob:
299
* Enqueues an io job for execution.
300
* The queued job shouldn't be used directly after queueing it. */
301
static void AIO_IOPool_enqueueJob(IOJob_t* job) {
302
IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
303
if(AIO_IOPool_threadPoolActive(ctx))
304
POOL_add(ctx->threadPool, ctx->poolFunction, job);
305
else
306
ctx->poolFunction(job);
307
}
308
309
/* ***********************************
310
* WritePool implementation
311
*************************************/
312
313
/* AIO_WritePool_acquireJob:
314
* Returns an available write job to be used for a future write. */
315
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
316
return AIO_IOPool_acquireJob(&ctx->base);
317
}
318
319
/* AIO_WritePool_enqueueAndReacquireWriteJob:
320
* Queues a write job for execution and acquires a new one.
321
* After execution `job`'s pointed value would change to the newly acquired job.
322
* Make sure to set `usedBufferSize` to the wanted length before call.
323
* The queued job shouldn't be used directly after queueing it. */
324
void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
325
AIO_IOPool_enqueueJob(*job);
326
*job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
327
}
328
329
/* AIO_WritePool_sparseWriteEnd:
330
* Ends sparse writes to the current file.
331
* Blocks on completion of all current write jobs before executing. */
332
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
333
assert(ctx != NULL);
334
AIO_IOPool_join(&ctx->base);
335
AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
336
ctx->storedSkips = 0;
337
}
338
339
/* AIO_WritePool_setFile:
340
* Sets the destination file for future writes in the pool.
341
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
342
* Also requires ending of sparse write if a previous file was used in sparse mode. */
343
void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
344
AIO_IOPool_setFile(&ctx->base, file);
345
assert(ctx->storedSkips == 0);
346
}
347
348
/* AIO_WritePool_getFile:
349
* Returns the file the writePool is currently set to write to. */
350
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
351
return AIO_IOPool_getFile(&ctx->base);
352
}
353
354
/* AIO_WritePool_releaseIoJob:
355
* Releases an acquired job back to the pool. Doesn't execute the job. */
356
void AIO_WritePool_releaseIoJob(IOJob_t* job) {
357
AIO_IOPool_releaseIoJob(job);
358
}
359
360
/* AIO_WritePool_closeFile:
361
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
362
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
363
int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
364
FILE* const dstFile = ctx->base.file;
365
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
366
AIO_WritePool_sparseWriteEnd(ctx);
367
AIO_IOPool_setFile(&ctx->base, NULL);
368
return fclose(dstFile);
369
}
370
371
/* AIO_WritePool_executeWriteJob:
372
* Executes a write job synchronously. Can be used as a function for a thread pool. */
373
static void AIO_WritePool_executeWriteJob(void* opaque){
374
IOJob_t* const job = (IOJob_t*) opaque;
375
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
376
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
377
AIO_IOPool_releaseIoJob(job);
378
}
379
380
/* AIO_WritePool_create:
381
* Allocates and sets and a new write pool including its included jobs. */
382
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
383
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
384
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
385
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
386
ctx->storedSkips = 0;
387
return ctx;
388
}
389
390
/* AIO_WritePool_free:
391
* Frees and releases a writePool and its resources. Closes destination file if needs to. */
392
void AIO_WritePool_free(WritePoolCtx_t* ctx) {
393
/* Make sure we finish all tasks and then free the resources */
394
if(AIO_WritePool_getFile(ctx))
395
AIO_WritePool_closeFile(ctx);
396
AIO_IOPool_destroy(&ctx->base);
397
assert(ctx->storedSkips==0);
398
free(ctx);
399
}
400
401
/* AIO_WritePool_setAsync:
402
* Allows (de)activating async mode, to be used when the expected overhead
403
* of asyncio costs more than the expected gains. */
404
void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
405
AIO_IOPool_setThreaded(&ctx->base, async);
406
}
407
408
409
/* ***********************************
410
* ReadPool implementation
411
*************************************/
412
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
413
int i;
414
for(i=0; i<ctx->completedJobsCount; i++) {
415
IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
416
AIO_IOPool_releaseIoJob(job);
417
}
418
ctx->completedJobsCount = 0;
419
}
420
421
static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
422
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
423
AIO_IOPool_lockJobsMutex(&ctx->base);
424
assert(ctx->completedJobsCount < MAX_IO_JOBS);
425
ctx->completedJobs[ctx->completedJobsCount++] = job;
426
if(AIO_IOPool_threadPoolActive(&ctx->base)) {
427
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
428
}
429
AIO_IOPool_unlockJobsMutex(&ctx->base);
430
}
431
432
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
433
* Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
434
* if job wasn't found returns NULL.
435
* IMPORTANT: assumes ioJobsMutex is locked. */
436
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
437
IOJob_t *job = NULL;
438
int i;
439
/* This implementation goes through all completed jobs and looks for the one matching the next offset.
440
* While not strictly needed for a single threaded reader implementation (as in such a case we could expect
441
* reads to be completed in order) this implementation was chosen as it better fits other asyncio
442
* interfaces (such as io_uring) that do not provide promises regarding order of completion. */
443
for (i=0; i<ctx->completedJobsCount; i++) {
444
job = (IOJob_t *) ctx->completedJobs[i];
445
if (job->offset == ctx->waitingOnOffset) {
446
ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
447
return job;
448
}
449
}
450
return NULL;
451
}
452
453
/* AIO_ReadPool_numReadsInFlight:
454
* Returns the number of IO read jobs currently in flight. */
455
static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
456
const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
457
return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld));
458
}
459
460
/* AIO_ReadPool_getNextCompletedJob:
461
* Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
462
* Would block. */
463
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
464
IOJob_t *job = NULL;
465
AIO_IOPool_lockJobsMutex(&ctx->base);
466
467
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
468
469
/* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
470
while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
471
assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
472
ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
473
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
474
}
475
476
if(job) {
477
assert(job->offset == ctx->waitingOnOffset);
478
ctx->waitingOnOffset += job->usedBufferSize;
479
}
480
481
AIO_IOPool_unlockJobsMutex(&ctx->base);
482
return job;
483
}
484
485
486
/* AIO_ReadPool_executeReadJob:
487
* Executes a read job synchronously. Can be used as a function for a thread pool. */
488
static void AIO_ReadPool_executeReadJob(void* opaque){
489
IOJob_t* const job = (IOJob_t*) opaque;
490
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
491
if(ctx->reachedEof) {
492
job->usedBufferSize = 0;
493
AIO_ReadPool_addJobToCompleted(job);
494
return;
495
}
496
job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
497
if(job->usedBufferSize < job->bufferSize) {
498
if(ferror(job->file)) {
499
EXM_THROW(37, "Read error");
500
} else if(feof(job->file)) {
501
ctx->reachedEof = 1;
502
} else {
503
EXM_THROW(37, "Unexpected short read");
504
}
505
}
506
AIO_ReadPool_addJobToCompleted(job);
507
}
508
509
static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
510
IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
511
job->offset = ctx->nextReadOffset;
512
ctx->nextReadOffset += job->bufferSize;
513
AIO_IOPool_enqueueJob(job);
514
}
515
516
static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
517
while(ctx->base.availableJobsCount) {
518
AIO_ReadPool_enqueueRead(ctx);
519
}
520
}
521
522
/* AIO_ReadPool_setFile:
523
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
524
* Waits for all current enqueued tasks to complete if a previous file was set. */
525
void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
526
assert(ctx!=NULL);
527
AIO_IOPool_join(&ctx->base);
528
AIO_ReadPool_releaseAllCompletedJobs(ctx);
529
if (ctx->currentJobHeld) {
530
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
531
ctx->currentJobHeld = NULL;
532
}
533
AIO_IOPool_setFile(&ctx->base, file);
534
ctx->nextReadOffset = 0;
535
ctx->waitingOnOffset = 0;
536
ctx->srcBuffer = ctx->coalesceBuffer;
537
ctx->srcBufferLoaded = 0;
538
ctx->reachedEof = 0;
539
if(file != NULL)
540
AIO_ReadPool_startReading(ctx);
541
}
542
543
/* AIO_ReadPool_create:
544
* Allocates and sets and a new readPool including its included jobs.
545
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
546
* as our basic read size. */
547
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
548
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
549
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
550
AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
551
552
ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
553
if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory");
554
ctx->srcBuffer = ctx->coalesceBuffer;
555
ctx->srcBufferLoaded = 0;
556
ctx->completedJobsCount = 0;
557
ctx->currentJobHeld = NULL;
558
559
if(ctx->base.threadPool)
560
if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
561
EXM_THROW(103,"Failed creating jobCompletedCond cond");
562
563
return ctx;
564
}
565
566
/* AIO_ReadPool_free:
567
* Frees and releases a readPool and its resources. Closes source file. */
568
void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
569
if(AIO_ReadPool_getFile(ctx))
570
AIO_ReadPool_closeFile(ctx);
571
if(ctx->base.threadPool)
572
ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
573
AIO_IOPool_destroy(&ctx->base);
574
free(ctx->coalesceBuffer);
575
free(ctx);
576
}
577
578
/* AIO_ReadPool_consumeBytes:
579
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
580
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
581
assert(n <= ctx->srcBufferLoaded);
582
ctx->srcBufferLoaded -= n;
583
ctx->srcBuffer += n;
584
}
585
586
/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
587
* Release the current held job and get the next one, returns NULL if no next job available. */
588
static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
589
if (ctx->currentJobHeld) {
590
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
591
ctx->currentJobHeld = NULL;
592
AIO_ReadPool_enqueueRead(ctx);
593
}
594
ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
595
return (IOJob_t*) ctx->currentJobHeld;
596
}
597
598
/* AIO_ReadPool_fillBuffer:
599
* Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
600
* Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
601
* Return value is the number of bytes added to the buffer.
602
* Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
603
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
604
IOJob_t *job;
605
int useCoalesce = 0;
606
if(n > ctx->base.jobBufferSize)
607
n = ctx->base.jobBufferSize;
608
609
/* We are good, don't read anything */
610
if (ctx->srcBufferLoaded >= n)
611
return 0;
612
613
/* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
614
* and coalesce the remaining bytes with the next job's buffer */
615
if (ctx->srcBufferLoaded > 0) {
616
useCoalesce = 1;
617
memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
618
ctx->srcBuffer = ctx->coalesceBuffer;
619
}
620
621
/* Read the next chunk */
622
job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
623
if(!job)
624
return 0;
625
if(useCoalesce) {
626
assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
627
memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
628
ctx->srcBufferLoaded += job->usedBufferSize;
629
}
630
else {
631
ctx->srcBuffer = (U8 *) job->buffer;
632
ctx->srcBufferLoaded = job->usedBufferSize;
633
}
634
return job->usedBufferSize;
635
}
636
637
/* AIO_ReadPool_consumeAndRefill:
638
* Consumes the current buffer and refills it with bufferSize bytes. */
639
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
640
AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
641
return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
642
}
643
644
/* AIO_ReadPool_getFile:
645
* Returns the current file set for the read pool. */
646
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
647
return AIO_IOPool_getFile(&ctx->base);
648
}
649
650
/* AIO_ReadPool_closeFile:
651
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
652
int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
653
FILE* const file = AIO_ReadPool_getFile(ctx);
654
AIO_ReadPool_setFile(ctx, NULL);
655
return fclose(file);
656
}
657
658
/* AIO_ReadPool_setAsync:
659
* Allows (de)activating async mode, to be used when the expected overhead
660
* of asyncio costs more than the expected gains. */
661
void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
662
AIO_IOPool_setThreaded(&ctx->base, async);
663
}
664
665