Path: blob/master/thirdparty/astcenc/astcenc_internal_entry.h
9903 views
// SPDX-License-Identifier: Apache-2.01// ----------------------------------------------------------------------------2// Copyright 2011-2025 Arm Limited3//4// Licensed under the Apache License, Version 2.0 (the "License"); you may not5// use this file except in compliance with the License. You may obtain a copy6// of the License at:7//8// http://www.apache.org/licenses/LICENSE-2.09//10// Unless required by applicable law or agreed to in writing, software11// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13// License for the specific language governing permissions and limitations14// under the License.15// ----------------------------------------------------------------------------1617/**18* @brief Functions and data declarations for the outer context.19*20* The outer context includes thread-pool management, which is slower to21* compile due to increased use of C++ stdlib. The inner context used in the22* majority of the codec library does not include this.23*/2425#ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED26#define ASTCENC_INTERNAL_ENTRY_INCLUDED2728#include <atomic>29#include <condition_variable>30#include <functional>31#include <mutex>3233#include "astcenc_internal.h"3435/* ============================================================================36Parallel execution control37============================================================================ */3839/**40* @brief A simple counter-based manager for parallel task execution.41*42* The task processing execution consists of:43*44* * A single-threaded init stage.45* * A multi-threaded processing stage.46* * A condition variable so threads can wait for processing completion.47*48* The init stage will be executed by the first thread to arrive in the critical section, there is49* no main thread in the thread pool.50*51* The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand52* basis. Threads may each therefore executed different numbers of tasks, depending on their53* processing complexity. The task queue and the task tickets are just counters; the caller must map54* these integers to an actual processing partition in a specific problem domain.55*56* The exit wait condition is needed to ensure processing has finished before a worker thread can57* progress to the next stage of the pipeline. Specifically a worker may exit the processing stage58* because there are no new tasks to assign to it while other worker threads are still processing.59* Calling @c wait() will ensure that all other worker have finished before the thread can proceed.60*61* The basic usage model:62*63* // --------- From single-threaded code ---------64*65* // Reset the tracker state66* manager->reset()67*68* // --------- From multi-threaded code ---------69*70* // Run the stage init; only first thread actually runs the lambda71* manager->init(<lambda>)72*73* do74* {75* // Request a task assignment76* uint task_count;77* uint base_index = manager->get_tasks(<granule>, task_count);78*79* // Process any tasks we were given (task_count <= granule size)80* if (task_count)81* {82* // Run the user task processing code for N tasks here83* ...84*85* // Flag these tasks as complete86* manager->complete_tasks(task_count);87* }88* } while (task_count);89*90* // Wait for all threads to complete tasks before progressing91* manager->wait()92*93* // Run the stage term; only first thread actually runs the lambda94* manager->term(<lambda>)95*/96class ParallelManager97{98private:99/** @brief Lock used for critical section and condition synchronization. */100std::mutex m_lock;101102/** @brief True if the current operation is cancelled. */103std::atomic<bool> m_is_cancelled;104105/** @brief True if the stage init() step has been executed. */106bool m_init_done;107108/** @brief True if the stage term() step has been executed. */109bool m_term_done;110111/** @brief Condition variable for tracking stage processing completion. */112std::condition_variable m_complete;113114/** @brief Number of tasks started, but not necessarily finished. */115std::atomic<unsigned int> m_start_count;116117/** @brief Number of tasks finished. */118unsigned int m_done_count;119120/** @brief Number of tasks that need to be processed. */121unsigned int m_task_count;122123/** @brief Progress callback (optional). */124astcenc_progress_callback m_callback;125126/** @brief Lock used for callback synchronization. */127std::mutex m_callback_lock;128129/** @brief Minimum progress before making a callback. */130float m_callback_min_diff;131132/** @brief Last progress callback value. */133float m_callback_last_value;134135public:136/** @brief Create a new ParallelManager. */137ParallelManager()138{139reset();140}141142/**143* @brief Reset the tracker for a new processing batch.144*145* This must be called from single-threaded code before starting the multi-threaded processing146* operations.147*/148void reset()149{150m_init_done = false;151m_term_done = false;152m_is_cancelled = false;153m_start_count = 0;154m_done_count = 0;155m_task_count = 0;156m_callback = nullptr;157m_callback_last_value = 0.0f;158m_callback_min_diff = 1.0f;159}160161/**162* @brief Clear the tracker and stop new tasks being assigned.163*164* Note, all in-flight tasks in a worker will still complete normally.165*/166void cancel()167{168m_is_cancelled = true;169}170171/**172* @brief Trigger the pipeline stage init step.173*174* This can be called from multi-threaded code. The first thread to hit this will process the175* initialization. Other threads will block and wait for it to complete.176*177* @param init_func Callable which executes the stage initialization. It must return the178* total number of tasks in the stage.179*/180void init(std::function<unsigned int(void)> init_func)181{182std::lock_guard<std::mutex> lck(m_lock);183if (!m_init_done)184{185m_task_count = init_func();186m_init_done = true;187}188}189190/**191* @brief Trigger the pipeline stage init step.192*193* This can be called from multi-threaded code. The first thread to hit this will process the194* initialization. Other threads will block and wait for it to complete.195*196* @param task_count Total number of tasks needing processing.197* @param callback Function pointer for progress status callbacks.198*/199void init(unsigned int task_count, astcenc_progress_callback callback)200{201std::lock_guard<std::mutex> lck(m_lock);202if (!m_init_done)203{204m_callback = callback;205m_task_count = task_count;206m_init_done = true;207208// Report every 1% or 4096 blocks, whichever is larger, to avoid callback overhead209float min_diff = (4096.0f / static_cast<float>(task_count)) * 100.0f;210m_callback_min_diff = astc::max(min_diff, 1.0f);211}212}213214/**215* @brief Request a task assignment.216*217* Assign up to @c granule tasks to the caller for processing.218*219* @param granule Maximum number of tasks that can be assigned.220* @param[out] count Actual number of tasks assigned, or zero if no tasks were assigned.221*222* @return Task index of the first assigned task; assigned tasks increment from this.223*/224unsigned int get_task_assignment(unsigned int granule, unsigned int& count)225{226unsigned int base = m_start_count.fetch_add(granule, std::memory_order_relaxed);227if (m_is_cancelled || base >= m_task_count)228{229count = 0;230return 0;231}232233count = astc::min(m_task_count - base, granule);234return base;235}236237/**238* @brief Complete a task assignment.239*240* Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this241* completes the processing of the stage.242*243* @param count The number of completed tasks.244*/245void complete_task_assignment(unsigned int count)246{247// Note: m_done_count cannot use an atomic without the mutex; this has a race between the248// update here and the wait() for other threads249unsigned int local_count;250float local_last_value;251{252std::unique_lock<std::mutex> lck(m_lock);253m_done_count += count;254local_count = m_done_count;255local_last_value = m_callback_last_value;256257// Ensure the progress bar hits 100%258if (m_callback && m_done_count == m_task_count)259{260std::unique_lock<std::mutex> cblck(m_callback_lock);261m_callback(100.0f);262m_callback_last_value = 100.0f;263}264265// Notify if nothing left to do266if (m_is_cancelled || m_done_count == m_task_count)267{268lck.unlock();269m_complete.notify_all();270}271}272273// Process progress callback if we have one274if (m_callback)275{276// Initial lockless test - have we progressed enough to emit?277float num = static_cast<float>(local_count);278float den = static_cast<float>(m_task_count);279float this_value = (num / den) * 100.0f;280bool report_test = (this_value - local_last_value) > m_callback_min_diff;281282// Recheck under lock, because another thread might report first283if (report_test)284{285std::unique_lock<std::mutex> cblck(m_callback_lock);286bool report_retest = (this_value - m_callback_last_value) > m_callback_min_diff;287if (report_retest)288{289m_callback(this_value);290m_callback_last_value = this_value;291}292}293}294}295296/**297* @brief Wait for stage processing to complete.298*/299void wait()300{301std::unique_lock<std::mutex> lck(m_lock);302m_complete.wait(lck, [this]{ return m_is_cancelled || m_done_count == m_task_count; });303}304305/**306* @brief Trigger the pipeline stage term step.307*308* This can be called from multi-threaded code. The first thread to hit this will process the309* work pool termination. Caller must have called @c wait() prior to calling this function to310* ensure that processing is complete.311*312* @param term_func Callable which executes the stage termination.313*/314void term(std::function<void(void)> term_func)315{316std::lock_guard<std::mutex> lck(m_lock);317if (!m_term_done)318{319term_func();320m_term_done = true;321}322}323};324325/**326* @brief The astcenc compression context.327*/328struct astcenc_context329{330/** @brief The context internal state. */331astcenc_contexti context;332333#if !defined(ASTCENC_DECOMPRESS_ONLY)334/** @brief The parallel manager for averages computation. */335ParallelManager manage_avg;336337/** @brief The parallel manager for compression. */338ParallelManager manage_compress;339#endif340341/** @brief The parallel manager for decompression. */342ParallelManager manage_decompress;343};344345#endif346347348