123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- // SPDX-License-Identifier: Apache-2.0
- // ----------------------------------------------------------------------------
- // Copyright 2011-2022 Arm Limited
- //
- // Licensed under the Apache License, Version 2.0 (the "License"); you may not
- // use this file except in compliance with the License. You may obtain a copy
- // of the License at:
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- // License for the specific language governing permissions and limitations
- // under the License.
- // ----------------------------------------------------------------------------
- /**
- * @brief Functions and data declarations for the outer context.
- *
- * The outer context includes thread-pool management, which is slower to
- * compile due to increased use of C++ stdlib. The inner context used in the
- * majority of the codec library does not include this.
- */
- #ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED
- #define ASTCENC_INTERNAL_ENTRY_INCLUDED
- #include <atomic>
- #include <condition_variable>
- #include <functional>
- #include <mutex>
- #include "astcenc_internal.h"
- /* ============================================================================
- Parallel execution control
- ============================================================================ */
- /**
- * @brief A simple counter-based manager for parallel task execution.
- *
- * The task processing execution consists of:
- *
- * * A single-threaded init stage.
- * * A multi-threaded processing stage.
- * * A condition variable so threads can wait for processing completion.
- *
- * The init stage will be executed by the first thread to arrive in the critical section, there is
- * no main thread in the thread pool.
- *
- * The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand
- * basis. Threads may each therefore executed different numbers of tasks, depending on their
- * processing complexity. The task queue and the task tickets are just counters; the caller must map
- * these integers to an actual processing partition in a specific problem domain.
- *
- * The exit wait condition is needed to ensure processing has finished before a worker thread can
- * progress to the next stage of the pipeline. Specifically a worker may exit the processing stage
- * because there are no new tasks to assign to it while other worker threads are still processing.
- * Calling @c wait() will ensure that all other worker have finished before the thread can proceed.
- *
- * The basic usage model:
- *
- * // --------- From single-threaded code ---------
- *
- * // Reset the tracker state
- * manager->reset()
- *
- * // --------- From multi-threaded code ---------
- *
- * // Run the stage init; only first thread actually runs the lambda
- * manager->init(<lambda>)
- *
- * do
- * {
- * // Request a task assignment
- * uint task_count;
- * uint base_index = manager->get_tasks(<granule>, task_count);
- *
- * // Process any tasks we were given (task_count <= granule size)
- * if (task_count)
- * {
- * // Run the user task processing code for N tasks here
- * ...
- *
- * // Flag these tasks as complete
- * manager->complete_tasks(task_count);
- * }
- * } while (task_count);
- *
- * // Wait for all threads to complete tasks before progressing
- * manager->wait()
- *
- * // Run the stage term; only first thread actually runs the lambda
- * manager->term(<lambda>)
- */
- class ParallelManager
- {
- private:
- /** @brief Lock used for critical section and condition synchronization. */
- std::mutex m_lock;
- /** @brief True if the stage init() step has been executed. */
- bool m_init_done;
- /** @brief True if the stage term() step has been executed. */
- bool m_term_done;
- /** @brief Condition variable for tracking stage processing completion. */
- std::condition_variable m_complete;
- /** @brief Number of tasks started, but not necessarily finished. */
- std::atomic<unsigned int> m_start_count;
- /** @brief Number of tasks finished. */
- unsigned int m_done_count;
- /** @brief Number of tasks that need to be processed. */
- unsigned int m_task_count;
- public:
- /** @brief Create a new ParallelManager. */
- ParallelManager()
- {
- reset();
- }
- /**
- * @brief Reset the tracker for a new processing batch.
- *
- * This must be called from single-threaded code before starting the multi-threaded processing
- * operations.
- */
- void reset()
- {
- m_init_done = false;
- m_term_done = false;
- m_start_count = 0;
- m_done_count = 0;
- m_task_count = 0;
- }
- /**
- * @brief Trigger the pipeline stage init step.
- *
- * This can be called from multi-threaded code. The first thread to hit this will process the
- * initialization. Other threads will block and wait for it to complete.
- *
- * @param init_func Callable which executes the stage initialization. It must return the
- * total number of tasks in the stage.
- */
- void init(std::function<unsigned int(void)> init_func)
- {
- std::lock_guard<std::mutex> lck(m_lock);
- if (!m_init_done)
- {
- m_task_count = init_func();
- m_init_done = true;
- }
- }
- /**
- * @brief Trigger the pipeline stage init step.
- *
- * This can be called from multi-threaded code. The first thread to hit this will process the
- * initialization. Other threads will block and wait for it to complete.
- *
- * @param task_count Total number of tasks needing processing.
- */
- void init(unsigned int task_count)
- {
- std::lock_guard<std::mutex> lck(m_lock);
- if (!m_init_done)
- {
- m_task_count = task_count;
- m_init_done = true;
- }
- }
- /**
- * @brief Request a task assignment.
- *
- * Assign up to @c granule tasks to the caller for processing.
- *
- * @param granule Maximum number of tasks that can be assigned.
- * @param[out] count Actual number of tasks assigned, or zero if no tasks were assigned.
- *
- * @return Task index of the first assigned task; assigned tasks increment from this.
- */
- unsigned int get_task_assignment(unsigned int granule, unsigned int& count)
- {
- unsigned int base = m_start_count.fetch_add(granule, std::memory_order_relaxed);
- if (base >= m_task_count)
- {
- count = 0;
- return 0;
- }
- count = astc::min(m_task_count - base, granule);
- return base;
- }
- /**
- * @brief Complete a task assignment.
- *
- * Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this
- * completes the processing of the stage.
- *
- * @param count The number of completed tasks.
- */
- void complete_task_assignment(unsigned int count)
- {
- // Note: m_done_count cannot use an atomic without the mutex; this has a race between the
- // update here and the wait() for other threads
- std::unique_lock<std::mutex> lck(m_lock);
- this->m_done_count += count;
- if (m_done_count == m_task_count)
- {
- lck.unlock();
- m_complete.notify_all();
- }
- }
- /**
- * @brief Wait for stage processing to complete.
- */
- void wait()
- {
- std::unique_lock<std::mutex> lck(m_lock);
- m_complete.wait(lck, [this]{ return m_done_count == m_task_count; });
- }
- /**
- * @brief Trigger the pipeline stage term step.
- *
- * This can be called from multi-threaded code. The first thread to hit this will process the
- * work pool termination. Caller must have called @c wait() prior to calling this function to
- * ensure that processing is complete.
- *
- * @param term_func Callable which executes the stage termination.
- */
- void term(std::function<void(void)> term_func)
- {
- std::lock_guard<std::mutex> lck(m_lock);
- if (!m_term_done)
- {
- term_func();
- m_term_done = true;
- }
- }
- };
- /**
- * @brief The astcenc compression context.
- */
- struct astcenc_context
- {
- /** @brief The context internal state. */
- astcenc_contexti context;
- #if !defined(ASTCENC_DECOMPRESS_ONLY)
- /** @brief The parallel manager for averages computation. */
- ParallelManager manage_avg;
- /** @brief The parallel manager for compression. */
- ParallelManager manage_compress;
- #endif
- /** @brief The parallel manager for decompression. */
- ParallelManager manage_decompress;
- };
- #endif
|