Path: blob/main/sys/contrib/zstd/programs/fileio_asyncio.c
289024 views
/*1* Copyright (c) Meta Platforms, Inc. and affiliates.2* All rights reserved.3*4* This source code is licensed under both the BSD-style license (found in the5* LICENSE file in the root directory of this source tree) and the GPLv2 (found6* in the COPYING file in the root directory of this source tree).7* You may select, at your option, one of the above-listed licenses.8*/910#include "platform.h"11#include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */12#include <stdlib.h> /* malloc, free */13#include <assert.h>14#include <errno.h> /* errno */1516#if defined (_MSC_VER)17# include <sys/stat.h>18# include <io.h>19#endif2021#include "fileio_asyncio.h"22#include "fileio_common.h"2324/* **********************************************************************25* Sparse write26************************************************************************/2728/** AIO_fwriteSparse() :29* @return : storedSkips,30* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */31static unsigned32AIO_fwriteSparse(FILE* file,33const void* buffer, size_t bufferSize,34const FIO_prefs_t* const prefs,35unsigned storedSkips)36{37const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */38size_t bufferSizeT = bufferSize / sizeof(size_t);39const size_t* const bufferTEnd = bufferT + bufferSizeT;40const size_t* ptrT = bufferT;41static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */4243if (prefs->testMode) return 0; /* do not output anything in test mode */4445if (!prefs->sparseFileSupport) { /* normal write */46size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);47if (sizeCheck != bufferSize)48EXM_THROW(70, "Write error : cannot write block : %s",49strerror(errno));50return 0;51}5253/* avoid int overflow */54if (storedSkips > 1 GB) {55if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)56EXM_THROW(91, "1 GB skip error (sparse file support)");57storedSkips -= 1 GB;58}5960while (ptrT < bufferTEnd) {61size_t nb0T;6263/* adjust last segment if < 32 KB */64size_t seg0SizeT = segmentSizeT;65if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;66bufferSizeT -= seg0SizeT;6768/* count leading zeroes */69for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;70storedSkips += (unsigned)(nb0T * sizeof(size_t));7172if (nb0T != seg0SizeT) { /* not all 0s */73size_t const nbNon0ST = seg0SizeT - nb0T;74/* skip leading zeros */75if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)76EXM_THROW(92, "Sparse skip error ; try --no-sparse");77storedSkips = 0;78/* write the rest */79if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)80EXM_THROW(93, "Write error : cannot write block : %s",81strerror(errno));82}83ptrT += seg0SizeT;84}8586{ static size_t const maskT = sizeof(size_t)-1;87if (bufferSize & maskT) {88/* size not multiple of sizeof(size_t) : implies end of block */89const char* const restStart = (const char*)bufferTEnd;90const char* restPtr = restStart;91const char* const restEnd = (const char*)buffer + bufferSize;92assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));93for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;94storedSkips += (unsigned) (restPtr - restStart);95if (restPtr != restEnd) {96/* not all remaining bytes are 0 */97size_t const restSize = (size_t)(restEnd - restPtr);98if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)99EXM_THROW(92, "Sparse skip error ; try --no-sparse");100if (fwrite(restPtr, 1, restSize, file) != restSize)101EXM_THROW(95, "Write error : cannot write end of decoded block : %s",102strerror(errno));103storedSkips = 0;104} } }105106return storedSkips;107}108109static void110AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)111{112if (prefs->testMode) assert(storedSkips == 0);113if (storedSkips>0) {114assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */115(void)prefs; /* assert can be disabled, in which case prefs becomes unused */116if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)117EXM_THROW(69, "Final skip error (sparse file support)");118/* last zero must be explicitly written,119* so that skipped ones get implicitly translated as zero by FS */120{ const char lastZeroByte[1] = { 0 };121if (fwrite(lastZeroByte, 1, 1, file) != 1)122EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));123} }124}125126127/* **********************************************************************128* AsyncIO functionality129************************************************************************/130131/* AIO_supported:132* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */133int AIO_supported(void) {134#ifdef ZSTD_MULTITHREAD135return 1;136#else137return 0;138#endif139}140141/* ***********************************142* Generic IoPool implementation143*************************************/144145static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {146IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));147void* const buffer = malloc(bufferSize);148if(!job || !buffer)149EXM_THROW(101, "Allocation error : not enough memory");150job->buffer = buffer;151job->bufferSize = bufferSize;152job->usedBufferSize = 0;153job->file = NULL;154job->ctx = ctx;155job->offset = 0;156return job;157}158159160/* AIO_IOPool_createThreadPool:161* Creates a thread pool and a mutex for threaded IO pool.162* Displays warning if asyncio is requested but MT isn't available. */163static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {164ctx->threadPool = NULL;165ctx->threadPoolActive = 0;166if(prefs->asyncIO) {167if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))168EXM_THROW(102,"Failed creating ioJobsMutex mutex");169/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to170* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */171assert(MAX_IO_JOBS >= 2);172ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);173ctx->threadPoolActive = 1;174if (!ctx->threadPool)175EXM_THROW(104, "Failed creating I/O thread pool");176}177}178179/* AIO_IOPool_init:180* Allocates and sets and a new I/O thread pool including its included availableJobs. */181static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {182int i;183AIO_IOPool_createThreadPool(ctx, prefs);184ctx->prefs = prefs;185ctx->poolFunction = poolFunction;186ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;187ctx->availableJobsCount = ctx->totalIoJobs;188for(i=0; i < ctx->availableJobsCount; i++) {189ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);190}191ctx->jobBufferSize = bufferSize;192ctx->file = NULL;193}194195196/* AIO_IOPool_threadPoolActive:197* Check if current operation uses thread pool.198* Note that in some cases we have a thread pool initialized but choose not to use it. */199static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {200return ctx->threadPool && ctx->threadPoolActive;201}202203204/* AIO_IOPool_lockJobsMutex:205* Locks the IO jobs mutex if threading is active */206static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {207if(AIO_IOPool_threadPoolActive(ctx))208ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);209}210211/* AIO_IOPool_unlockJobsMutex:212* Unlocks the IO jobs mutex if threading is active */213static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {214if(AIO_IOPool_threadPoolActive(ctx))215ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);216}217218/* AIO_IOPool_releaseIoJob:219* Releases an acquired job back to the pool. Doesn't execute the job. */220static void AIO_IOPool_releaseIoJob(IOJob_t* job) {221IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;222AIO_IOPool_lockJobsMutex(ctx);223assert(ctx->availableJobsCount < ctx->totalIoJobs);224ctx->availableJobs[ctx->availableJobsCount++] = job;225AIO_IOPool_unlockJobsMutex(ctx);226}227228/* AIO_IOPool_join:229* Waits for all tasks in the pool to finish executing. */230static void AIO_IOPool_join(IOPoolCtx_t* ctx) {231if(AIO_IOPool_threadPoolActive(ctx))232POOL_joinJobs(ctx->threadPool);233}234235/* AIO_IOPool_setThreaded:236* Allows (de)activating threaded mode, to be used when the expected overhead237* of threading costs more than the expected gains. */238static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {239assert(threaded == 0 || threaded == 1);240assert(ctx != NULL);241if(ctx->threadPoolActive != threaded) {242AIO_IOPool_join(ctx);243ctx->threadPoolActive = threaded;244}245}246247/* AIO_IOPool_free:248* Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */249static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {250int i;251if(ctx->threadPool) {252/* Make sure we finish all tasks and then free the resources */253AIO_IOPool_join(ctx);254/* Make sure we are not leaking availableJobs */255assert(ctx->availableJobsCount == ctx->totalIoJobs);256POOL_free(ctx->threadPool);257ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);258}259assert(ctx->file == NULL);260for(i=0; i<ctx->availableJobsCount; i++) {261IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];262free(job->buffer);263free(job);264}265}266267/* AIO_IOPool_acquireJob:268* Returns an available io job to be used for a future io. */269static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {270IOJob_t* job;271assert(ctx->file != NULL || ctx->prefs->testMode);272AIO_IOPool_lockJobsMutex(ctx);273assert(ctx->availableJobsCount > 0);274job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];275AIO_IOPool_unlockJobsMutex(ctx);276job->usedBufferSize = 0;277job->file = ctx->file;278job->offset = 0;279return job;280}281282283/* AIO_IOPool_setFile:284* Sets the destination file for future files in the pool.285* Requires completion of all queued jobs and release of all otherwise acquired jobs. */286static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {287assert(ctx!=NULL);288AIO_IOPool_join(ctx);289assert(ctx->availableJobsCount == ctx->totalIoJobs);290ctx->file = file;291}292293static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {294return ctx->file;295}296297/* AIO_IOPool_enqueueJob:298* Enqueues an io job for execution.299* The queued job shouldn't be used directly after queueing it. */300static void AIO_IOPool_enqueueJob(IOJob_t* job) {301IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;302if(AIO_IOPool_threadPoolActive(ctx))303POOL_add(ctx->threadPool, ctx->poolFunction, job);304else305ctx->poolFunction(job);306}307308/* ***********************************309* WritePool implementation310*************************************/311312/* AIO_WritePool_acquireJob:313* Returns an available write job to be used for a future write. */314IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {315return AIO_IOPool_acquireJob(&ctx->base);316}317318/* AIO_WritePool_enqueueAndReacquireWriteJob:319* Queues a write job for execution and acquires a new one.320* After execution `job`'s pointed value would change to the newly acquired job.321* Make sure to set `usedBufferSize` to the wanted length before call.322* The queued job shouldn't be used directly after queueing it. */323void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {324AIO_IOPool_enqueueJob(*job);325*job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);326}327328/* AIO_WritePool_sparseWriteEnd:329* Ends sparse writes to the current file.330* Blocks on completion of all current write jobs before executing. */331void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {332assert(ctx != NULL);333AIO_IOPool_join(&ctx->base);334AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);335ctx->storedSkips = 0;336}337338/* AIO_WritePool_setFile:339* Sets the destination file for future writes in the pool.340* Requires completion of all queues write jobs and release of all otherwise acquired jobs.341* Also requires ending of sparse write if a previous file was used in sparse mode. */342void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {343AIO_IOPool_setFile(&ctx->base, file);344assert(ctx->storedSkips == 0);345}346347/* AIO_WritePool_getFile:348* Returns the file the writePool is currently set to write to. */349FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {350return AIO_IOPool_getFile(&ctx->base);351}352353/* AIO_WritePool_releaseIoJob:354* Releases an acquired job back to the pool. Doesn't execute the job. */355void AIO_WritePool_releaseIoJob(IOJob_t* job) {356AIO_IOPool_releaseIoJob(job);357}358359/* AIO_WritePool_closeFile:360* Ends sparse write and closes the writePool's current file and sets the file to NULL.361* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */362int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {363FILE* const dstFile = ctx->base.file;364assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);365AIO_WritePool_sparseWriteEnd(ctx);366AIO_IOPool_setFile(&ctx->base, NULL);367return fclose(dstFile);368}369370/* AIO_WritePool_executeWriteJob:371* Executes a write job synchronously. Can be used as a function for a thread pool. */372static void AIO_WritePool_executeWriteJob(void* opaque){373IOJob_t* const job = (IOJob_t*) opaque;374WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;375ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);376AIO_IOPool_releaseIoJob(job);377}378379/* AIO_WritePool_create:380* Allocates and sets and a new write pool including its included jobs. */381WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {382WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));383if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");384AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);385ctx->storedSkips = 0;386return ctx;387}388389/* AIO_WritePool_free:390* Frees and releases a writePool and its resources. Closes destination file if needs to. */391void AIO_WritePool_free(WritePoolCtx_t* ctx) {392/* Make sure we finish all tasks and then free the resources */393if(AIO_WritePool_getFile(ctx))394AIO_WritePool_closeFile(ctx);395AIO_IOPool_destroy(&ctx->base);396assert(ctx->storedSkips==0);397free(ctx);398}399400/* AIO_WritePool_setAsync:401* Allows (de)activating async mode, to be used when the expected overhead402* of asyncio costs more than the expected gains. */403void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {404AIO_IOPool_setThreaded(&ctx->base, async);405}406407408/* ***********************************409* ReadPool implementation410*************************************/411static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {412int i;413for(i=0; i<ctx->completedJobsCount; i++) {414IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];415AIO_IOPool_releaseIoJob(job);416}417ctx->completedJobsCount = 0;418}419420static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {421ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;422AIO_IOPool_lockJobsMutex(&ctx->base);423assert(ctx->completedJobsCount < MAX_IO_JOBS);424ctx->completedJobs[ctx->completedJobsCount++] = job;425if(AIO_IOPool_threadPoolActive(&ctx->base)) {426ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);427}428AIO_IOPool_unlockJobsMutex(&ctx->base);429}430431/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:432* Looks through the completed jobs for a job matching the waitingOnOffset and returns it,433* if job wasn't found returns NULL.434* IMPORTANT: assumes ioJobsMutex is locked. */435static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {436IOJob_t *job = NULL;437int i;438/* This implementation goes through all completed jobs and looks for the one matching the next offset.439* While not strictly needed for a single threaded reader implementation (as in such a case we could expect440* reads to be completed in order) this implementation was chosen as it better fits other asyncio441* interfaces (such as io_uring) that do not provide promises regarding order of completion. */442for (i=0; i<ctx->completedJobsCount; i++) {443job = (IOJob_t *) ctx->completedJobs[i];444if (job->offset == ctx->waitingOnOffset) {445ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];446return job;447}448}449return NULL;450}451452/* AIO_ReadPool_numReadsInFlight:453* Returns the number of IO read jobs currently in flight. */454static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {455const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);456return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld));457}458459/* AIO_ReadPool_getNextCompletedJob:460* Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.461* Would block. */462static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {463IOJob_t *job = NULL;464AIO_IOPool_lockJobsMutex(&ctx->base);465466job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);467468/* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */469while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {470assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */471ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);472job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);473}474475if(job) {476assert(job->offset == ctx->waitingOnOffset);477ctx->waitingOnOffset += job->usedBufferSize;478}479480AIO_IOPool_unlockJobsMutex(&ctx->base);481return job;482}483484485/* AIO_ReadPool_executeReadJob:486* Executes a read job synchronously. Can be used as a function for a thread pool. */487static void AIO_ReadPool_executeReadJob(void* opaque){488IOJob_t* const job = (IOJob_t*) opaque;489ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;490if(ctx->reachedEof) {491job->usedBufferSize = 0;492AIO_ReadPool_addJobToCompleted(job);493return;494}495job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);496if(job->usedBufferSize < job->bufferSize) {497if(ferror(job->file)) {498EXM_THROW(37, "Read error");499} else if(feof(job->file)) {500ctx->reachedEof = 1;501} else {502EXM_THROW(37, "Unexpected short read");503}504}505AIO_ReadPool_addJobToCompleted(job);506}507508static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {509IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);510job->offset = ctx->nextReadOffset;511ctx->nextReadOffset += job->bufferSize;512AIO_IOPool_enqueueJob(job);513}514515static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {516while(ctx->base.availableJobsCount) {517AIO_ReadPool_enqueueRead(ctx);518}519}520521/* AIO_ReadPool_setFile:522* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.523* Waits for all current enqueued tasks to complete if a previous file was set. */524void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {525assert(ctx!=NULL);526AIO_IOPool_join(&ctx->base);527AIO_ReadPool_releaseAllCompletedJobs(ctx);528if (ctx->currentJobHeld) {529AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);530ctx->currentJobHeld = NULL;531}532AIO_IOPool_setFile(&ctx->base, file);533ctx->nextReadOffset = 0;534ctx->waitingOnOffset = 0;535ctx->srcBuffer = ctx->coalesceBuffer;536ctx->srcBufferLoaded = 0;537ctx->reachedEof = 0;538if(file != NULL)539AIO_ReadPool_startReading(ctx);540}541542/* AIO_ReadPool_create:543* Allocates and sets and a new readPool including its included jobs.544* bufferSize should be set to the maximal buffer we want to read at a time, will also be used545* as our basic read size. */546ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {547ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));548if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");549AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);550551ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);552if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory");553ctx->srcBuffer = ctx->coalesceBuffer;554ctx->srcBufferLoaded = 0;555ctx->completedJobsCount = 0;556ctx->currentJobHeld = NULL;557558if(ctx->base.threadPool)559if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))560EXM_THROW(103,"Failed creating jobCompletedCond cond");561562return ctx;563}564565/* AIO_ReadPool_free:566* Frees and releases a readPool and its resources. Closes source file. */567void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {568if(AIO_ReadPool_getFile(ctx))569AIO_ReadPool_closeFile(ctx);570if(ctx->base.threadPool)571ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);572AIO_IOPool_destroy(&ctx->base);573free(ctx->coalesceBuffer);574free(ctx);575}576577/* AIO_ReadPool_consumeBytes:578* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */579void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {580assert(n <= ctx->srcBufferLoaded);581ctx->srcBufferLoaded -= n;582ctx->srcBuffer += n;583}584585/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:586* Release the current held job and get the next one, returns NULL if no next job available. */587static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {588if (ctx->currentJobHeld) {589AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);590ctx->currentJobHeld = NULL;591AIO_ReadPool_enqueueRead(ctx);592}593ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);594return (IOJob_t*) ctx->currentJobHeld;595}596597/* AIO_ReadPool_fillBuffer:598* Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).599* Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.600* Return value is the number of bytes added to the buffer.601* Note that srcBuffer might have up to 2 times jobBufferSize bytes. */602size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {603IOJob_t *job;604int useCoalesce = 0;605if(n > ctx->base.jobBufferSize)606n = ctx->base.jobBufferSize;607608/* We are good, don't read anything */609if (ctx->srcBufferLoaded >= n)610return 0;611612/* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job613* and coalesce the remaining bytes with the next job's buffer */614if (ctx->srcBufferLoaded > 0) {615useCoalesce = 1;616memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);617ctx->srcBuffer = ctx->coalesceBuffer;618}619620/* Read the next chunk */621job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);622if(!job)623return 0;624if(useCoalesce) {625assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);626memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);627ctx->srcBufferLoaded += job->usedBufferSize;628}629else {630ctx->srcBuffer = (U8 *) job->buffer;631ctx->srcBufferLoaded = job->usedBufferSize;632}633return job->usedBufferSize;634}635636/* AIO_ReadPool_consumeAndRefill:637* Consumes the current buffer and refills it with bufferSize bytes. */638size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {639AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);640return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);641}642643/* AIO_ReadPool_getFile:644* Returns the current file set for the read pool. */645FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {646return AIO_IOPool_getFile(&ctx->base);647}648649/* AIO_ReadPool_closeFile:650* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */651int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {652FILE* const file = AIO_ReadPool_getFile(ctx);653AIO_ReadPool_setFile(ctx, NULL);654return fclose(file);655}656657/* AIO_ReadPool_setAsync:658* Allows (de)activating async mode, to be used when the expected overhead659* of asyncio costs more than the expected gains. */660void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {661AIO_IOPool_setThreaded(&ctx->base, async);662}663664665