123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- /* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
- #include "JobScheduler.h"
- #include "Logging.h"
- namespace mozilla {
- namespace gfx {
- JobScheduler* JobScheduler::sSingleton = nullptr;
- bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues)
- {
- MOZ_ASSERT(!sSingleton);
- MOZ_ASSERT(aNumThreads >= aNumQueues);
- sSingleton = new JobScheduler();
- sSingleton->mNextQueue = 0;
- for (uint32_t i = 0; i < aNumQueues; ++i) {
- sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
- }
- for (uint32_t i = 0; i < aNumThreads; ++i) {
- sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues]));
- }
- return true;
- }
- void JobScheduler::ShutDown()
- {
- MOZ_ASSERT(IsEnabled());
- if (!IsEnabled()) {
- return;
- }
- for (auto queue : sSingleton->mDrawingQueues) {
- queue->ShutDown();
- delete queue;
- }
- for (WorkerThread* thread : sSingleton->mWorkerThreads) {
- // this will block until the thread is joined.
- delete thread;
- }
- sSingleton->mWorkerThreads.clear();
- delete sSingleton;
- sSingleton = nullptr;
- }
- JobStatus
- JobScheduler::ProcessJob(Job* aJob)
- {
- MOZ_ASSERT(aJob);
- auto status = aJob->Run();
- if (status == JobStatus::Error || status == JobStatus::Complete) {
- delete aJob;
- }
- return status;
- }
- void
- JobScheduler::SubmitJob(Job* aJob)
- {
- MOZ_ASSERT(aJob);
- RefPtr<SyncObject> start = aJob->GetStartSync();
- if (start && start->Register(aJob)) {
- // The Job buffer starts with a non-signaled sync object, it
- // is now registered in the list of task buffers waiting on the
- // sync object, so we should not place it in the queue.
- return;
- }
- GetQueueForJob(aJob)->SubmitJob(aJob);
- }
- void
- JobScheduler::Join(SyncObject* aCompletion)
- {
- RefPtr<EventObject> waitForCompletion = new EventObject();
- JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
- waitForCompletion->Wait();
- }
- MultiThreadedJobQueue*
- JobScheduler::GetQueueForJob(Job* aJob)
- {
- return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
- : GetDrawingQueue();
- }
- Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
- : mNextWaitingJob(nullptr)
- , mStartSync(aStart)
- , mCompletionSync(aCompletion)
- , mPinToThread(aThread)
- {
- if (mStartSync) {
- mStartSync->AddSubsequent(this);
- }
- if (mCompletionSync) {
- mCompletionSync->AddPrerequisite(this);
- }
- }
- Job::~Job()
- {
- if (mCompletionSync) {
- //printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
- mCompletionSync->Signal();
- mCompletionSync = nullptr;
- }
- }
- JobStatus
- SetEventJob::Run()
- {
- mEvent->Set();
- return JobStatus::Complete;
- }
- SetEventJob::SetEventJob(EventObject* aEvent,
- SyncObject* aStart, SyncObject* aCompletion,
- WorkerThread* aWorker)
- : Job(aStart, aCompletion, aWorker)
- , mEvent(aEvent)
- {}
- SetEventJob::~SetEventJob()
- {}
- SyncObject::SyncObject(uint32_t aNumPrerequisites)
- : mSignals(aNumPrerequisites)
- , mFirstWaitingJob(nullptr)
- #ifdef DEBUG
- , mNumPrerequisites(aNumPrerequisites)
- , mAddedPrerequisites(0)
- #endif
- {}
- SyncObject::~SyncObject()
- {
- MOZ_ASSERT(mFirstWaitingJob == nullptr);
- }
- bool
- SyncObject::Register(Job* aJob)
- {
- MOZ_ASSERT(aJob);
- // For now, ensure that when we schedule the first subsequent, we have already
- // created all of the prerequisites. This is an arbitrary restriction because
- // we specify the number of prerequisites in the constructor, but in the typical
- // scenario, if the assertion FreezePrerequisite blows up here it probably means
- // we got the initial nmber of prerequisites wrong. We can decide to remove
- // this restriction if needed.
- FreezePrerequisites();
- int32_t signals = mSignals;
- if (signals > 0) {
- AddWaitingJob(aJob);
- // Since Register and Signal can be called concurrently, it can happen that
- // reading mSignals in Register happens before decrementing mSignals in Signal,
- // but SubmitWaitingJobs happens before AddWaitingJob. This ordering means
- // the SyncObject ends up in the signaled state with a task sitting in the
- // waiting list. To prevent that we check mSignals a second time and submit
- // again if signals reached zero in the mean time.
- // We do this instead of holding a mutex around mSignals+mJobs to reduce
- // lock contention.
- int32_t signals2 = mSignals;
- if (signals2 == 0) {
- SubmitWaitingJobs();
- }
- return true;
- }
- return false;
- }
- void
- SyncObject::Signal()
- {
- int32_t signals = --mSignals;
- MOZ_ASSERT(signals >= 0);
- if (signals == 0) {
- SubmitWaitingJobs();
- }
- }
- void
- SyncObject::AddWaitingJob(Job* aJob)
- {
- // Push (using atomics) the task into the list of waiting tasks.
- for (;;) {
- Job* first = mFirstWaitingJob;
- aJob->mNextWaitingJob = first;
- if (mFirstWaitingJob.compareExchange(first, aJob)) {
- break;
- }
- }
- }
- void SyncObject::SubmitWaitingJobs()
- {
- // Scheduling the tasks can cause code that modifies <this>'s reference
- // count to run concurrently, and cause the caller of this function to
- // be owned by another thread. We need to make sure the reference count
- // does not reach 0 on another thread before the end of this method, so
- // hold a strong ref to prevent that!
- RefPtr<SyncObject> kungFuDeathGrip(this);
- // First atomically swap mFirstWaitingJob and waitingJobs...
- Job* waitingJobs = nullptr;
- for (;;) {
- waitingJobs = mFirstWaitingJob;
- if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
- break;
- }
- }
- // ... and submit all of the waiting tasks in waitingJob now that they belong
- // to this thread.
- while (waitingJobs) {
- Job* next = waitingJobs->mNextWaitingJob;
- waitingJobs->mNextWaitingJob = nullptr;
- JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
- waitingJobs = next;
- }
- }
- bool
- SyncObject::IsSignaled()
- {
- return mSignals == 0;
- }
- void
- SyncObject::FreezePrerequisites()
- {
- MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
- }
- void
- SyncObject::AddPrerequisite(Job* aJob)
- {
- MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
- }
- void
- SyncObject::AddSubsequent(Job* aJob)
- {
- }
- WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
- : mQueue(aJobQueue)
- {
- aJobQueue->RegisterThread();
- }
- void
- WorkerThread::Run()
- {
- SetName("gfx worker");
- for (;;) {
- Job* commands = nullptr;
- if (!mQueue->WaitForJob(commands)) {
- mQueue->UnregisterThread();
- return;
- }
- JobStatus status = JobScheduler::ProcessJob(commands);
- if (status == JobStatus::Error) {
- // Don't try to handle errors for now, but that's open to discussions.
- // I expect errors to be mostly OOM issues.
- gfxDevCrash(LogReason::JobStatusError) << "Invalid job status " << (int)status;
- }
- }
- }
- } //namespace
- } //namespace
|