Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/thirdparty/astcenc/astcenc_internal_entry.h
9903 views
1
// SPDX-License-Identifier: Apache-2.0
2
// ----------------------------------------------------------------------------
3
// Copyright 2011-2025 Arm Limited
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
6
// use this file except in compliance with the License. You may obtain a copy
7
// of the License at:
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
// License for the specific language governing permissions and limitations
15
// under the License.
16
// ----------------------------------------------------------------------------
17
18
/**
19
* @brief Functions and data declarations for the outer context.
20
*
21
* The outer context includes thread-pool management, which is slower to
22
* compile due to increased use of C++ stdlib. The inner context used in the
23
* majority of the codec library does not include this.
24
*/
25
26
#ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED
27
#define ASTCENC_INTERNAL_ENTRY_INCLUDED
28
29
#include <atomic>
30
#include <condition_variable>
31
#include <functional>
32
#include <mutex>
33
34
#include "astcenc_internal.h"
35
36
/* ============================================================================
37
Parallel execution control
38
============================================================================ */
39
40
/**
41
* @brief A simple counter-based manager for parallel task execution.
42
*
43
* The task processing execution consists of:
44
*
45
* * A single-threaded init stage.
46
* * A multi-threaded processing stage.
47
* * A condition variable so threads can wait for processing completion.
48
*
49
* The init stage will be executed by the first thread to arrive in the critical section, there is
50
* no main thread in the thread pool.
51
*
52
* The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand
53
* basis. Threads may each therefore executed different numbers of tasks, depending on their
54
* processing complexity. The task queue and the task tickets are just counters; the caller must map
55
* these integers to an actual processing partition in a specific problem domain.
56
*
57
* The exit wait condition is needed to ensure processing has finished before a worker thread can
58
* progress to the next stage of the pipeline. Specifically a worker may exit the processing stage
59
* because there are no new tasks to assign to it while other worker threads are still processing.
60
* Calling @c wait() will ensure that all other worker have finished before the thread can proceed.
61
*
62
* The basic usage model:
63
*
64
* // --------- From single-threaded code ---------
65
*
66
* // Reset the tracker state
67
* manager->reset()
68
*
69
* // --------- From multi-threaded code ---------
70
*
71
* // Run the stage init; only first thread actually runs the lambda
72
* manager->init(<lambda>)
73
*
74
* do
75
* {
76
* // Request a task assignment
77
* uint task_count;
78
* uint base_index = manager->get_tasks(<granule>, task_count);
79
*
80
* // Process any tasks we were given (task_count <= granule size)
81
* if (task_count)
82
* {
83
* // Run the user task processing code for N tasks here
84
* ...
85
*
86
* // Flag these tasks as complete
87
* manager->complete_tasks(task_count);
88
* }
89
* } while (task_count);
90
*
91
* // Wait for all threads to complete tasks before progressing
92
* manager->wait()
93
*
94
* // Run the stage term; only first thread actually runs the lambda
95
* manager->term(<lambda>)
96
*/
97
class ParallelManager
98
{
99
private:
100
/** @brief Lock used for critical section and condition synchronization. */
101
std::mutex m_lock;
102
103
/** @brief True if the current operation is cancelled. */
104
std::atomic<bool> m_is_cancelled;
105
106
/** @brief True if the stage init() step has been executed. */
107
bool m_init_done;
108
109
/** @brief True if the stage term() step has been executed. */
110
bool m_term_done;
111
112
/** @brief Condition variable for tracking stage processing completion. */
113
std::condition_variable m_complete;
114
115
/** @brief Number of tasks started, but not necessarily finished. */
116
std::atomic<unsigned int> m_start_count;
117
118
/** @brief Number of tasks finished. */
119
unsigned int m_done_count;
120
121
/** @brief Number of tasks that need to be processed. */
122
unsigned int m_task_count;
123
124
/** @brief Progress callback (optional). */
125
astcenc_progress_callback m_callback;
126
127
/** @brief Lock used for callback synchronization. */
128
std::mutex m_callback_lock;
129
130
/** @brief Minimum progress before making a callback. */
131
float m_callback_min_diff;
132
133
/** @brief Last progress callback value. */
134
float m_callback_last_value;
135
136
public:
137
/** @brief Create a new ParallelManager. */
138
ParallelManager()
139
{
140
reset();
141
}
142
143
/**
144
* @brief Reset the tracker for a new processing batch.
145
*
146
* This must be called from single-threaded code before starting the multi-threaded processing
147
* operations.
148
*/
149
void reset()
150
{
151
m_init_done = false;
152
m_term_done = false;
153
m_is_cancelled = false;
154
m_start_count = 0;
155
m_done_count = 0;
156
m_task_count = 0;
157
m_callback = nullptr;
158
m_callback_last_value = 0.0f;
159
m_callback_min_diff = 1.0f;
160
}
161
162
/**
163
* @brief Clear the tracker and stop new tasks being assigned.
164
*
165
* Note, all in-flight tasks in a worker will still complete normally.
166
*/
167
void cancel()
168
{
169
m_is_cancelled = true;
170
}
171
172
/**
173
* @brief Trigger the pipeline stage init step.
174
*
175
* This can be called from multi-threaded code. The first thread to hit this will process the
176
* initialization. Other threads will block and wait for it to complete.
177
*
178
* @param init_func Callable which executes the stage initialization. It must return the
179
* total number of tasks in the stage.
180
*/
181
void init(std::function<unsigned int(void)> init_func)
182
{
183
std::lock_guard<std::mutex> lck(m_lock);
184
if (!m_init_done)
185
{
186
m_task_count = init_func();
187
m_init_done = true;
188
}
189
}
190
191
/**
192
* @brief Trigger the pipeline stage init step.
193
*
194
* This can be called from multi-threaded code. The first thread to hit this will process the
195
* initialization. Other threads will block and wait for it to complete.
196
*
197
* @param task_count Total number of tasks needing processing.
198
* @param callback Function pointer for progress status callbacks.
199
*/
200
void init(unsigned int task_count, astcenc_progress_callback callback)
201
{
202
std::lock_guard<std::mutex> lck(m_lock);
203
if (!m_init_done)
204
{
205
m_callback = callback;
206
m_task_count = task_count;
207
m_init_done = true;
208
209
// Report every 1% or 4096 blocks, whichever is larger, to avoid callback overhead
210
float min_diff = (4096.0f / static_cast<float>(task_count)) * 100.0f;
211
m_callback_min_diff = astc::max(min_diff, 1.0f);
212
}
213
}
214
215
/**
216
* @brief Request a task assignment.
217
*
218
* Assign up to @c granule tasks to the caller for processing.
219
*
220
* @param granule Maximum number of tasks that can be assigned.
221
* @param[out] count Actual number of tasks assigned, or zero if no tasks were assigned.
222
*
223
* @return Task index of the first assigned task; assigned tasks increment from this.
224
*/
225
unsigned int get_task_assignment(unsigned int granule, unsigned int& count)
226
{
227
unsigned int base = m_start_count.fetch_add(granule, std::memory_order_relaxed);
228
if (m_is_cancelled || base >= m_task_count)
229
{
230
count = 0;
231
return 0;
232
}
233
234
count = astc::min(m_task_count - base, granule);
235
return base;
236
}
237
238
/**
239
* @brief Complete a task assignment.
240
*
241
* Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this
242
* completes the processing of the stage.
243
*
244
* @param count The number of completed tasks.
245
*/
246
void complete_task_assignment(unsigned int count)
247
{
248
// Note: m_done_count cannot use an atomic without the mutex; this has a race between the
249
// update here and the wait() for other threads
250
unsigned int local_count;
251
float local_last_value;
252
{
253
std::unique_lock<std::mutex> lck(m_lock);
254
m_done_count += count;
255
local_count = m_done_count;
256
local_last_value = m_callback_last_value;
257
258
// Ensure the progress bar hits 100%
259
if (m_callback && m_done_count == m_task_count)
260
{
261
std::unique_lock<std::mutex> cblck(m_callback_lock);
262
m_callback(100.0f);
263
m_callback_last_value = 100.0f;
264
}
265
266
// Notify if nothing left to do
267
if (m_is_cancelled || m_done_count == m_task_count)
268
{
269
lck.unlock();
270
m_complete.notify_all();
271
}
272
}
273
274
// Process progress callback if we have one
275
if (m_callback)
276
{
277
// Initial lockless test - have we progressed enough to emit?
278
float num = static_cast<float>(local_count);
279
float den = static_cast<float>(m_task_count);
280
float this_value = (num / den) * 100.0f;
281
bool report_test = (this_value - local_last_value) > m_callback_min_diff;
282
283
// Recheck under lock, because another thread might report first
284
if (report_test)
285
{
286
std::unique_lock<std::mutex> cblck(m_callback_lock);
287
bool report_retest = (this_value - m_callback_last_value) > m_callback_min_diff;
288
if (report_retest)
289
{
290
m_callback(this_value);
291
m_callback_last_value = this_value;
292
}
293
}
294
}
295
}
296
297
/**
298
* @brief Wait for stage processing to complete.
299
*/
300
void wait()
301
{
302
std::unique_lock<std::mutex> lck(m_lock);
303
m_complete.wait(lck, [this]{ return m_is_cancelled || m_done_count == m_task_count; });
304
}
305
306
/**
307
* @brief Trigger the pipeline stage term step.
308
*
309
* This can be called from multi-threaded code. The first thread to hit this will process the
310
* work pool termination. Caller must have called @c wait() prior to calling this function to
311
* ensure that processing is complete.
312
*
313
* @param term_func Callable which executes the stage termination.
314
*/
315
void term(std::function<void(void)> term_func)
316
{
317
std::lock_guard<std::mutex> lck(m_lock);
318
if (!m_term_done)
319
{
320
term_func();
321
m_term_done = true;
322
}
323
}
324
};
325
326
/**
327
* @brief The astcenc compression context.
328
*/
329
struct astcenc_context
330
{
331
/** @brief The context internal state. */
332
astcenc_contexti context;
333
334
#if !defined(ASTCENC_DECOMPRESS_ONLY)
335
/** @brief The parallel manager for averages computation. */
336
ParallelManager manage_avg;
337
338
/** @brief The parallel manager for compression. */
339
ParallelManager manage_compress;
340
#endif
341
342
/** @brief The parallel manager for decompression. */
343
ParallelManager manage_decompress;
344
};
345
346
#endif
347
348