command_queue_mt.h 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. /**************************************************************************/
  2. /* command_queue_mt.h */
  3. /**************************************************************************/
  4. /* This file is part of: */
  5. /* GODOT ENGINE */
  6. /* https://godotengine.org */
  7. /**************************************************************************/
  8. /* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
  9. /* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
  10. /* */
  11. /* Permission is hereby granted, free of charge, to any person obtaining */
  12. /* a copy of this software and associated documentation files (the */
  13. /* "Software"), to deal in the Software without restriction, including */
  14. /* without limitation the rights to use, copy, modify, merge, publish, */
  15. /* distribute, sublicense, and/or sell copies of the Software, and to */
  16. /* permit persons to whom the Software is furnished to do so, subject to */
  17. /* the following conditions: */
  18. /* */
  19. /* The above copyright notice and this permission notice shall be */
  20. /* included in all copies or substantial portions of the Software. */
  21. /* */
  22. /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
  23. /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
  24. /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
  25. /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
  26. /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
  27. /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
  28. /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
  29. /**************************************************************************/
  30. #ifndef COMMAND_QUEUE_MT_H
  31. #define COMMAND_QUEUE_MT_H
  32. #include "core/object/worker_thread_pool.h"
  33. #include "core/os/condition_variable.h"
  34. #include "core/os/mutex.h"
  35. #include "core/templates/local_vector.h"
  36. #include "core/templates/simple_type.h"
  37. #include "core/templates/tuple.h"
  38. #include "core/typedefs.h"
  39. class CommandQueueMT {
  40. struct CommandBase {
  41. bool sync = false;
  42. virtual void call() = 0;
  43. virtual ~CommandBase() = default;
  44. CommandBase(bool p_sync) :
  45. sync(p_sync) {}
  46. };
  47. template <typename T, typename M, bool NeedsSync, typename... Args>
  48. struct Command : public CommandBase {
  49. T *instance;
  50. M method;
  51. Tuple<GetSimpleTypeT<Args>...> args;
  52. template <typename... FwdArgs>
  53. _FORCE_INLINE_ Command(T *p_instance, M p_method, FwdArgs &&...p_args) :
  54. CommandBase(NeedsSync), instance(p_instance), method(p_method), args(std::forward<FwdArgs>(p_args)...) {}
  55. void call() {
  56. call_impl(BuildIndexSequence<sizeof...(Args)>{});
  57. }
  58. private:
  59. template <size_t... I>
  60. _FORCE_INLINE_ void call_impl(IndexSequence<I...>) {
  61. // Move out of the Tuple, this will be destroyed as soon as the call is complete.
  62. (instance->*method)(std::move(get<I>())...);
  63. }
  64. // This method exists so we can call it in the parameter pack expansion in call_impl.
  65. template <size_t I>
  66. _FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
  67. };
  68. // Separate class from Command so we can save the space of the ret pointer for commands that don't return.
  69. template <typename T, typename M, typename R, typename... Args>
  70. struct CommandRet : public CommandBase {
  71. T *instance;
  72. M method;
  73. R *ret;
  74. Tuple<GetSimpleTypeT<Args>...> args;
  75. _FORCE_INLINE_ CommandRet(T *p_instance, M p_method, R *p_ret, GetSimpleTypeT<Args>... p_args) :
  76. CommandBase(true), instance(p_instance), method(p_method), ret(p_ret), args{ p_args... } {}
  77. void call() override {
  78. *ret = call_impl(BuildIndexSequence<sizeof...(Args)>{});
  79. }
  80. private:
  81. template <size_t... I>
  82. _FORCE_INLINE_ R call_impl(IndexSequence<I...>) {
  83. // Move out of the Tuple, this will be destroyed as soon as the call is complete.
  84. return (instance->*method)(std::move(get<I>())...);
  85. }
  86. // This method exists so we can call it in the parameter pack expansion in call_impl.
  87. template <size_t I>
  88. _FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
  89. };
  90. /***** BASE *******/
  91. static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
  92. BinaryMutex mutex;
  93. LocalVector<uint8_t> command_mem;
  94. ConditionVariable sync_cond_var;
  95. uint32_t sync_head = 0;
  96. uint32_t sync_tail = 0;
  97. uint32_t sync_awaiters = 0;
  98. WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
  99. uint64_t flush_read_ptr = 0;
  100. std::atomic<bool> pending;
  101. template <typename T, typename... Args>
  102. _FORCE_INLINE_ void create_command(Args &&...p_args) {
  103. // alloc size is size+T+safeguard
  104. constexpr uint64_t alloc_size = ((sizeof(T) + 8U - 1U) & ~(8U - 1U));
  105. static_assert(alloc_size < UINT32_MAX, "Type too large to fit in the command queue.");
  106. uint64_t size = command_mem.size();
  107. command_mem.resize(size + alloc_size + sizeof(uint64_t));
  108. *(uint64_t *)&command_mem[size] = alloc_size;
  109. void *cmd = &command_mem[size + sizeof(uint64_t)];
  110. new (cmd) T(std::forward<Args>(p_args)...);
  111. pending.store(true);
  112. }
  113. template <typename T, bool NeedsSync, typename... Args>
  114. _FORCE_INLINE_ void _push_internal(Args &&...args) {
  115. MutexLock mlock(mutex);
  116. create_command<T>(std::forward<Args>(args)...);
  117. if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) {
  118. WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id);
  119. }
  120. if constexpr (NeedsSync) {
  121. sync_tail++;
  122. _wait_for_sync(mlock);
  123. }
  124. }
  125. _FORCE_INLINE_ void _prevent_sync_wraparound() {
  126. bool safe_to_reset = !sync_awaiters;
  127. bool already_sync_to_latest = sync_head == sync_tail;
  128. if (safe_to_reset && already_sync_to_latest) {
  129. sync_head = 0;
  130. sync_tail = 0;
  131. }
  132. }
  133. void _flush() {
  134. if (unlikely(flush_read_ptr)) {
  135. // Re-entrant call.
  136. return;
  137. }
  138. MutexLock lock(mutex);
  139. while (flush_read_ptr < command_mem.size()) {
  140. uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
  141. flush_read_ptr += 8;
  142. CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
  143. uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock);
  144. cmd->call();
  145. WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
  146. // Handle potential realloc due to the command and unlock allowance.
  147. cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
  148. if (unlikely(cmd->sync)) {
  149. sync_head++;
  150. lock.~MutexLock(); // Give an opportunity to awaiters right away.
  151. sync_cond_var.notify_all();
  152. new (&lock) MutexLock(mutex);
  153. // Handle potential realloc happened during unlock.
  154. cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
  155. }
  156. cmd->~CommandBase();
  157. flush_read_ptr += size;
  158. }
  159. command_mem.clear();
  160. pending.store(false);
  161. flush_read_ptr = 0;
  162. _prevent_sync_wraparound();
  163. }
  164. _FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
  165. sync_awaiters++;
  166. uint32_t sync_head_goal = sync_tail;
  167. do {
  168. sync_cond_var.wait(p_lock);
  169. } while (sync_head < sync_head_goal);
  170. sync_awaiters--;
  171. _prevent_sync_wraparound();
  172. }
  173. void _no_op() {}
  174. public:
  175. template <typename T, typename M, typename... Args>
  176. void push(T *p_instance, M p_method, Args &&...p_args) {
  177. // Standard command, no sync.
  178. using CommandType = Command<T, M, false, Args...>;
  179. _push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
  180. }
  181. template <typename T, typename M, typename... Args>
  182. void push_and_sync(T *p_instance, M p_method, Args... p_args) {
  183. // Standard command, sync.
  184. using CommandType = Command<T, M, true, Args...>;
  185. _push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
  186. }
  187. template <typename T, typename M, typename R, typename... Args>
  188. void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
  189. // Command with return value, sync.
  190. using CommandType = CommandRet<T, M, R, Args...>;
  191. _push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
  192. }
  193. _FORCE_INLINE_ void flush_if_pending() {
  194. if (unlikely(pending.load())) {
  195. _flush();
  196. }
  197. }
  198. void flush_all() {
  199. _flush();
  200. }
  201. void sync() {
  202. push_and_sync(this, &CommandQueueMT::_no_op);
  203. }
  204. void wait_and_flush() {
  205. ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
  206. WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
  207. _flush();
  208. }
  209. void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
  210. MutexLock lock(mutex);
  211. pump_task_id = p_task_id;
  212. }
  213. CommandQueueMT();
  214. ~CommandQueueMT();
  215. };
  216. #endif // COMMAND_QUEUE_MT_H