Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/contrib/zstd/programs/fileio_asyncio.h
289024 views
1
/*
2
* Copyright (c) Meta Platforms, Inc. and affiliates.
3
* All rights reserved.
4
*
5
* This source code is licensed under both the BSD-style license (found in the
6
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
7
* in the COPYING file in the root directory of this source tree).
8
* You may select, at your option, one of the above-listed licenses.
9
*/
10
11
/*
12
* FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
13
* Current implementation relies on having one thread that reads and one that
14
* writes.
15
* Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
16
* are performed serially by the appropriate worker thread.
17
* Most systems exposes better primitives to perform asynchronous IO, such as
18
* io_uring on newer linux systems. The API is built in such a way that in the
19
* future we could replace the threads with better solutions when available.
20
*/
21
22
#ifndef ZSTD_FILEIO_ASYNCIO_H
23
#define ZSTD_FILEIO_ASYNCIO_H
24
25
#include "../lib/common/mem.h" /* U32, U64 */
26
#include "fileio_types.h"
27
#include "platform.h"
28
#include "util.h"
29
#include "../lib/common/pool.h"
30
#include "../lib/common/threading.h"
31
32
#define MAX_IO_JOBS (10)
33
34
typedef struct {
35
/* These struct fields should be set only on creation and not changed afterwards */
36
POOL_ctx* threadPool;
37
int threadPoolActive;
38
int totalIoJobs;
39
const FIO_prefs_t* prefs;
40
POOL_function poolFunction;
41
42
/* Controls the file we currently write to, make changes only by using provided utility functions */
43
FILE* file;
44
45
/* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
46
* only be mutated after locking the mutex */
47
ZSTD_pthread_mutex_t ioJobsMutex;
48
void* availableJobs[MAX_IO_JOBS];
49
int availableJobsCount;
50
size_t jobBufferSize;
51
} IOPoolCtx_t;
52
53
typedef struct {
54
IOPoolCtx_t base;
55
56
/* State regarding the currently read file */
57
int reachedEof;
58
U64 nextReadOffset;
59
U64 waitingOnOffset;
60
61
/* We may hold an IOJob object as needed if we actively expose its buffer. */
62
void *currentJobHeld;
63
64
/* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
65
* the first of them. Shouldn't be accessed from outside ot utility functions. */
66
U8 *coalesceBuffer;
67
68
/* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
69
* change when consuming / refilling buffer. */
70
U8 *srcBuffer;
71
size_t srcBufferLoaded;
72
73
/* We need to know what tasks completed so we can use their buffers when their time comes.
74
* Should only be accessed after locking base.ioJobsMutex . */
75
void* completedJobs[MAX_IO_JOBS];
76
int completedJobsCount;
77
ZSTD_pthread_cond_t jobCompletedCond;
78
} ReadPoolCtx_t;
79
80
typedef struct {
81
IOPoolCtx_t base;
82
unsigned storedSkips;
83
} WritePoolCtx_t;
84
85
typedef struct {
86
/* These fields are automatically set and shouldn't be changed by non WritePool code. */
87
void *ctx;
88
FILE* file;
89
void *buffer;
90
size_t bufferSize;
91
92
/* This field should be changed before a job is queued for execution and should contain the number
93
* of bytes to write from the buffer. */
94
size_t usedBufferSize;
95
U64 offset;
96
} IOJob_t;
97
98
/* AIO_supported:
99
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
100
int AIO_supported(void);
101
102
103
/* AIO_WritePool_releaseIoJob:
104
* Releases an acquired job back to the pool. Doesn't execute the job. */
105
void AIO_WritePool_releaseIoJob(IOJob_t *job);
106
107
/* AIO_WritePool_acquireJob:
108
* Returns an available write job to be used for a future write. */
109
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
110
111
/* AIO_WritePool_enqueueAndReacquireWriteJob:
112
* Enqueues a write job for execution and acquires a new one.
113
* After execution `job`'s pointed value would change to the newly acquired job.
114
* Make sure to set `usedBufferSize` to the wanted length before call.
115
* The queued job shouldn't be used directly after queueing it. */
116
void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
117
118
/* AIO_WritePool_sparseWriteEnd:
119
* Ends sparse writes to the current file.
120
* Blocks on completion of all current write jobs before executing. */
121
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
122
123
/* AIO_WritePool_setFile:
124
* Sets the destination file for future writes in the pool.
125
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
126
* Also requires ending of sparse write if a previous file was used in sparse mode. */
127
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
128
129
/* AIO_WritePool_getFile:
130
* Returns the file the writePool is currently set to write to. */
131
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
132
133
/* AIO_WritePool_closeFile:
134
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
135
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
136
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
137
138
/* AIO_WritePool_create:
139
* Allocates and sets and a new write pool including its included jobs.
140
* bufferSize should be set to the maximal buffer we want to write to at a time. */
141
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
142
143
/* AIO_WritePool_free:
144
* Frees and releases a writePool and its resources. Closes destination file. */
145
void AIO_WritePool_free(WritePoolCtx_t* ctx);
146
147
/* AIO_WritePool_setAsync:
148
* Allows (de)activating async mode, to be used when the expected overhead
149
* of asyncio costs more than the expected gains. */
150
void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
151
152
/* AIO_ReadPool_create:
153
* Allocates and sets and a new readPool including its included jobs.
154
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
155
* as our basic read size. */
156
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
157
158
/* AIO_ReadPool_free:
159
* Frees and releases a readPool and its resources. Closes source file. */
160
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
161
162
/* AIO_ReadPool_setAsync:
163
* Allows (de)activating async mode, to be used when the expected overhead
164
* of asyncio costs more than the expected gains. */
165
void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
166
167
/* AIO_ReadPool_consumeBytes:
168
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
169
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
170
171
/* AIO_ReadPool_fillBuffer:
172
* Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize).
173
* Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
174
* Return value is the number of bytes added to the buffer.
175
* Note that srcBuffer might have up to 2 times bufferSize bytes. */
176
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
177
178
/* AIO_ReadPool_consumeAndRefill:
179
* Consumes the current buffer and refills it with bufferSize bytes. */
180
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
181
182
/* AIO_ReadPool_setFile:
183
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
184
* Waits for all current enqueued tasks to complete if a previous file was set. */
185
void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
186
187
/* AIO_ReadPool_getFile:
188
* Returns the current file set for the read pool. */
189
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
190
191
/* AIO_ReadPool_closeFile:
192
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
193
int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
194
195
#endif /* ZSTD_FILEIO_ASYNCIO_H */
196
197