JobScheduler.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
  2. * This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  5. #include "JobScheduler.h"
  6. #include "Logging.h"
  7. namespace mozilla {
  8. namespace gfx {
  9. JobScheduler* JobScheduler::sSingleton = nullptr;
  10. bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues)
  11. {
  12. MOZ_ASSERT(!sSingleton);
  13. MOZ_ASSERT(aNumThreads >= aNumQueues);
  14. sSingleton = new JobScheduler();
  15. sSingleton->mNextQueue = 0;
  16. for (uint32_t i = 0; i < aNumQueues; ++i) {
  17. sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
  18. }
  19. for (uint32_t i = 0; i < aNumThreads; ++i) {
  20. sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues]));
  21. }
  22. return true;
  23. }
  24. void JobScheduler::ShutDown()
  25. {
  26. MOZ_ASSERT(IsEnabled());
  27. if (!IsEnabled()) {
  28. return;
  29. }
  30. for (auto queue : sSingleton->mDrawingQueues) {
  31. queue->ShutDown();
  32. delete queue;
  33. }
  34. for (WorkerThread* thread : sSingleton->mWorkerThreads) {
  35. // this will block until the thread is joined.
  36. delete thread;
  37. }
  38. sSingleton->mWorkerThreads.clear();
  39. delete sSingleton;
  40. sSingleton = nullptr;
  41. }
  42. JobStatus
  43. JobScheduler::ProcessJob(Job* aJob)
  44. {
  45. MOZ_ASSERT(aJob);
  46. auto status = aJob->Run();
  47. if (status == JobStatus::Error || status == JobStatus::Complete) {
  48. delete aJob;
  49. }
  50. return status;
  51. }
  52. void
  53. JobScheduler::SubmitJob(Job* aJob)
  54. {
  55. MOZ_ASSERT(aJob);
  56. RefPtr<SyncObject> start = aJob->GetStartSync();
  57. if (start && start->Register(aJob)) {
  58. // The Job buffer starts with a non-signaled sync object, it
  59. // is now registered in the list of task buffers waiting on the
  60. // sync object, so we should not place it in the queue.
  61. return;
  62. }
  63. GetQueueForJob(aJob)->SubmitJob(aJob);
  64. }
  65. void
  66. JobScheduler::Join(SyncObject* aCompletion)
  67. {
  68. RefPtr<EventObject> waitForCompletion = new EventObject();
  69. JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
  70. waitForCompletion->Wait();
  71. }
  72. MultiThreadedJobQueue*
  73. JobScheduler::GetQueueForJob(Job* aJob)
  74. {
  75. return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
  76. : GetDrawingQueue();
  77. }
  78. Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
  79. : mNextWaitingJob(nullptr)
  80. , mStartSync(aStart)
  81. , mCompletionSync(aCompletion)
  82. , mPinToThread(aThread)
  83. {
  84. if (mStartSync) {
  85. mStartSync->AddSubsequent(this);
  86. }
  87. if (mCompletionSync) {
  88. mCompletionSync->AddPrerequisite(this);
  89. }
  90. }
  91. Job::~Job()
  92. {
  93. if (mCompletionSync) {
  94. //printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
  95. mCompletionSync->Signal();
  96. mCompletionSync = nullptr;
  97. }
  98. }
  99. JobStatus
  100. SetEventJob::Run()
  101. {
  102. mEvent->Set();
  103. return JobStatus::Complete;
  104. }
  105. SetEventJob::SetEventJob(EventObject* aEvent,
  106. SyncObject* aStart, SyncObject* aCompletion,
  107. WorkerThread* aWorker)
  108. : Job(aStart, aCompletion, aWorker)
  109. , mEvent(aEvent)
  110. {}
  111. SetEventJob::~SetEventJob()
  112. {}
  113. SyncObject::SyncObject(uint32_t aNumPrerequisites)
  114. : mSignals(aNumPrerequisites)
  115. , mFirstWaitingJob(nullptr)
  116. #ifdef DEBUG
  117. , mNumPrerequisites(aNumPrerequisites)
  118. , mAddedPrerequisites(0)
  119. #endif
  120. {}
  121. SyncObject::~SyncObject()
  122. {
  123. MOZ_ASSERT(mFirstWaitingJob == nullptr);
  124. }
  125. bool
  126. SyncObject::Register(Job* aJob)
  127. {
  128. MOZ_ASSERT(aJob);
  129. // For now, ensure that when we schedule the first subsequent, we have already
  130. // created all of the prerequisites. This is an arbitrary restriction because
  131. // we specify the number of prerequisites in the constructor, but in the typical
  132. // scenario, if the assertion FreezePrerequisite blows up here it probably means
  133. // we got the initial nmber of prerequisites wrong. We can decide to remove
  134. // this restriction if needed.
  135. FreezePrerequisites();
  136. int32_t signals = mSignals;
  137. if (signals > 0) {
  138. AddWaitingJob(aJob);
  139. // Since Register and Signal can be called concurrently, it can happen that
  140. // reading mSignals in Register happens before decrementing mSignals in Signal,
  141. // but SubmitWaitingJobs happens before AddWaitingJob. This ordering means
  142. // the SyncObject ends up in the signaled state with a task sitting in the
  143. // waiting list. To prevent that we check mSignals a second time and submit
  144. // again if signals reached zero in the mean time.
  145. // We do this instead of holding a mutex around mSignals+mJobs to reduce
  146. // lock contention.
  147. int32_t signals2 = mSignals;
  148. if (signals2 == 0) {
  149. SubmitWaitingJobs();
  150. }
  151. return true;
  152. }
  153. return false;
  154. }
  155. void
  156. SyncObject::Signal()
  157. {
  158. int32_t signals = --mSignals;
  159. MOZ_ASSERT(signals >= 0);
  160. if (signals == 0) {
  161. SubmitWaitingJobs();
  162. }
  163. }
  164. void
  165. SyncObject::AddWaitingJob(Job* aJob)
  166. {
  167. // Push (using atomics) the task into the list of waiting tasks.
  168. for (;;) {
  169. Job* first = mFirstWaitingJob;
  170. aJob->mNextWaitingJob = first;
  171. if (mFirstWaitingJob.compareExchange(first, aJob)) {
  172. break;
  173. }
  174. }
  175. }
  176. void SyncObject::SubmitWaitingJobs()
  177. {
  178. // Scheduling the tasks can cause code that modifies <this>'s reference
  179. // count to run concurrently, and cause the caller of this function to
  180. // be owned by another thread. We need to make sure the reference count
  181. // does not reach 0 on another thread before the end of this method, so
  182. // hold a strong ref to prevent that!
  183. RefPtr<SyncObject> kungFuDeathGrip(this);
  184. // First atomically swap mFirstWaitingJob and waitingJobs...
  185. Job* waitingJobs = nullptr;
  186. for (;;) {
  187. waitingJobs = mFirstWaitingJob;
  188. if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
  189. break;
  190. }
  191. }
  192. // ... and submit all of the waiting tasks in waitingJob now that they belong
  193. // to this thread.
  194. while (waitingJobs) {
  195. Job* next = waitingJobs->mNextWaitingJob;
  196. waitingJobs->mNextWaitingJob = nullptr;
  197. JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
  198. waitingJobs = next;
  199. }
  200. }
  201. bool
  202. SyncObject::IsSignaled()
  203. {
  204. return mSignals == 0;
  205. }
  206. void
  207. SyncObject::FreezePrerequisites()
  208. {
  209. MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
  210. }
  211. void
  212. SyncObject::AddPrerequisite(Job* aJob)
  213. {
  214. MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
  215. }
  216. void
  217. SyncObject::AddSubsequent(Job* aJob)
  218. {
  219. }
  220. WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
  221. : mQueue(aJobQueue)
  222. {
  223. aJobQueue->RegisterThread();
  224. }
  225. void
  226. WorkerThread::Run()
  227. {
  228. SetName("gfx worker");
  229. for (;;) {
  230. Job* commands = nullptr;
  231. if (!mQueue->WaitForJob(commands)) {
  232. mQueue->UnregisterThread();
  233. return;
  234. }
  235. JobStatus status = JobScheduler::ProcessJob(commands);
  236. if (status == JobStatus::Error) {
  237. // Don't try to handle errors for now, but that's open to discussions.
  238. // I expect errors to be mostly OOM issues.
  239. gfxDevCrash(LogReason::JobStatusError) << "Invalid job status " << (int)status;
  240. }
  241. }
  242. }
  243. } //namespace
  244. } //namespace