123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- /**************************************************************************/
- /* command_queue_mt.h */
- /**************************************************************************/
- /* This file is part of: */
- /* GODOT ENGINE */
- /* https://godotengine.org */
- /**************************************************************************/
- /* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
- /* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
- /* */
- /* Permission is hereby granted, free of charge, to any person obtaining */
- /* a copy of this software and associated documentation files (the */
- /* "Software"), to deal in the Software without restriction, including */
- /* without limitation the rights to use, copy, modify, merge, publish, */
- /* distribute, sublicense, and/or sell copies of the Software, and to */
- /* permit persons to whom the Software is furnished to do so, subject to */
- /* the following conditions: */
- /* */
- /* The above copyright notice and this permission notice shall be */
- /* included in all copies or substantial portions of the Software. */
- /* */
- /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
- /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
- /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
- /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
- /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
- /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
- /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
- /**************************************************************************/
- #ifndef COMMAND_QUEUE_MT_H
- #define COMMAND_QUEUE_MT_H
- #include "core/object/worker_thread_pool.h"
- #include "core/os/condition_variable.h"
- #include "core/os/mutex.h"
- #include "core/templates/local_vector.h"
- #include "core/templates/simple_type.h"
- #include "core/templates/tuple.h"
- #include "core/typedefs.h"
- class CommandQueueMT {
- struct CommandBase {
- bool sync = false;
- virtual void call() = 0;
- virtual ~CommandBase() = default;
- CommandBase(bool p_sync) :
- sync(p_sync) {}
- };
- template <typename T, typename M, bool NeedsSync, typename... Args>
- struct Command : public CommandBase {
- T *instance;
- M method;
- Tuple<GetSimpleTypeT<Args>...> args;
- template <typename... FwdArgs>
- _FORCE_INLINE_ Command(T *p_instance, M p_method, FwdArgs &&...p_args) :
- CommandBase(NeedsSync), instance(p_instance), method(p_method), args(std::forward<FwdArgs>(p_args)...) {}
- void call() {
- call_impl(BuildIndexSequence<sizeof...(Args)>{});
- }
- private:
- template <size_t... I>
- _FORCE_INLINE_ void call_impl(IndexSequence<I...>) {
- // Move out of the Tuple, this will be destroyed as soon as the call is complete.
- (instance->*method)(std::move(get<I>())...);
- }
- // This method exists so we can call it in the parameter pack expansion in call_impl.
- template <size_t I>
- _FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
- };
- // Separate class from Command so we can save the space of the ret pointer for commands that don't return.
- template <typename T, typename M, typename R, typename... Args>
- struct CommandRet : public CommandBase {
- T *instance;
- M method;
- R *ret;
- Tuple<GetSimpleTypeT<Args>...> args;
- _FORCE_INLINE_ CommandRet(T *p_instance, M p_method, R *p_ret, GetSimpleTypeT<Args>... p_args) :
- CommandBase(true), instance(p_instance), method(p_method), ret(p_ret), args{ p_args... } {}
- void call() override {
- *ret = call_impl(BuildIndexSequence<sizeof...(Args)>{});
- }
- private:
- template <size_t... I>
- _FORCE_INLINE_ R call_impl(IndexSequence<I...>) {
- // Move out of the Tuple, this will be destroyed as soon as the call is complete.
- return (instance->*method)(std::move(get<I>())...);
- }
- // This method exists so we can call it in the parameter pack expansion in call_impl.
- template <size_t I>
- _FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
- };
- /***** BASE *******/
- static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
- BinaryMutex mutex;
- LocalVector<uint8_t> command_mem;
- ConditionVariable sync_cond_var;
- uint32_t sync_head = 0;
- uint32_t sync_tail = 0;
- uint32_t sync_awaiters = 0;
- WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
- uint64_t flush_read_ptr = 0;
- std::atomic<bool> pending;
- template <typename T, typename... Args>
- _FORCE_INLINE_ void create_command(Args &&...p_args) {
- // alloc size is size+T+safeguard
- constexpr uint64_t alloc_size = ((sizeof(T) + 8U - 1U) & ~(8U - 1U));
- static_assert(alloc_size < UINT32_MAX, "Type too large to fit in the command queue.");
- uint64_t size = command_mem.size();
- command_mem.resize(size + alloc_size + sizeof(uint64_t));
- *(uint64_t *)&command_mem[size] = alloc_size;
- void *cmd = &command_mem[size + sizeof(uint64_t)];
- new (cmd) T(std::forward<Args>(p_args)...);
- pending.store(true);
- }
- template <typename T, bool NeedsSync, typename... Args>
- _FORCE_INLINE_ void _push_internal(Args &&...args) {
- MutexLock mlock(mutex);
- create_command<T>(std::forward<Args>(args)...);
- if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) {
- WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id);
- }
- if constexpr (NeedsSync) {
- sync_tail++;
- _wait_for_sync(mlock);
- }
- }
- _FORCE_INLINE_ void _prevent_sync_wraparound() {
- bool safe_to_reset = !sync_awaiters;
- bool already_sync_to_latest = sync_head == sync_tail;
- if (safe_to_reset && already_sync_to_latest) {
- sync_head = 0;
- sync_tail = 0;
- }
- }
- void _flush() {
- if (unlikely(flush_read_ptr)) {
- // Re-entrant call.
- return;
- }
- MutexLock lock(mutex);
- while (flush_read_ptr < command_mem.size()) {
- uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
- flush_read_ptr += 8;
- CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
- uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock);
- cmd->call();
- WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
- // Handle potential realloc due to the command and unlock allowance.
- cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
- if (unlikely(cmd->sync)) {
- sync_head++;
- lock.~MutexLock(); // Give an opportunity to awaiters right away.
- sync_cond_var.notify_all();
- new (&lock) MutexLock(mutex);
- // Handle potential realloc happened during unlock.
- cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
- }
- cmd->~CommandBase();
- flush_read_ptr += size;
- }
- command_mem.clear();
- pending.store(false);
- flush_read_ptr = 0;
- _prevent_sync_wraparound();
- }
- _FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
- sync_awaiters++;
- uint32_t sync_head_goal = sync_tail;
- do {
- sync_cond_var.wait(p_lock);
- } while (sync_head < sync_head_goal);
- sync_awaiters--;
- _prevent_sync_wraparound();
- }
- void _no_op() {}
- public:
- template <typename T, typename M, typename... Args>
- void push(T *p_instance, M p_method, Args &&...p_args) {
- // Standard command, no sync.
- using CommandType = Command<T, M, false, Args...>;
- _push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
- }
- template <typename T, typename M, typename... Args>
- void push_and_sync(T *p_instance, M p_method, Args... p_args) {
- // Standard command, sync.
- using CommandType = Command<T, M, true, Args...>;
- _push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
- }
- template <typename T, typename M, typename R, typename... Args>
- void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
- // Command with return value, sync.
- using CommandType = CommandRet<T, M, R, Args...>;
- _push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
- }
- _FORCE_INLINE_ void flush_if_pending() {
- if (unlikely(pending.load())) {
- _flush();
- }
- }
- void flush_all() {
- _flush();
- }
- void sync() {
- push_and_sync(this, &CommandQueueMT::_no_op);
- }
- void wait_and_flush() {
- ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
- WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
- _flush();
- }
- void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
- MutexLock lock(mutex);
- pump_task_id = p_task_id;
- }
- CommandQueueMT();
- ~CommandQueueMT();
- };
- #endif // COMMAND_QUEUE_MT_H
|