123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578 |
- /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
- #include "mozilla/dom/cache/ReadStream.h"
- #include "mozilla/Unused.h"
- #include "mozilla/dom/cache/CacheStreamControlChild.h"
- #include "mozilla/dom/cache/CacheStreamControlParent.h"
- #include "mozilla/dom/cache/CacheTypes.h"
- #include "mozilla/ipc/IPCStreamUtils.h"
- #include "mozilla/SnappyUncompressInputStream.h"
- #include "nsIAsyncInputStream.h"
- #include "nsTArray.h"
- namespace mozilla {
- namespace dom {
- namespace cache {
- using mozilla::Unused;
- using mozilla::ipc::AutoIPCStream;
- using mozilla::ipc::IPCStream;
- // ----------------------------------------------------------------------------
- // The inner stream class. This is where all of the real work is done. As
- // an invariant Inner::Close() must be called before ~Inner(). This is
- // guaranteed by our outer ReadStream class.
- class ReadStream::Inner final : public ReadStream::Controllable
- {
- public:
- Inner(StreamControl* aControl, const nsID& aId,
- nsIInputStream* aStream);
- void
- Serialize(CacheReadStreamOrVoid* aReadStreamOut,
- nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
- ErrorResult& aRv);
- void
- Serialize(CacheReadStream* aReadStreamOut,
- nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
- ErrorResult& aRv);
- // ReadStream::Controllable methods
- virtual void
- CloseStream() override;
- virtual void
- CloseStreamWithoutReporting() override;
- virtual bool
- MatchId(const nsID& aId) const override;
- virtual bool
- HasEverBeenRead() const override;
- // Simulate nsIInputStream methods, but we don't actually inherit from it
- nsresult
- Close();
- nsresult
- Available(uint64_t *aNumAvailableOut);
- nsresult
- Read(char *aBuf, uint32_t aCount, uint32_t *aNumReadOut);
- nsresult
- ReadSegments(nsWriteSegmentFun aWriter, void *aClosure, uint32_t aCount,
- uint32_t *aNumReadOut);
- nsresult
- IsNonBlocking(bool *aNonBlockingOut);
- private:
- class NoteClosedRunnable;
- class ForgetRunnable;
- ~Inner();
- void
- NoteClosed();
- void
- Forget();
- void
- NoteClosedOnOwningThread();
- void
- ForgetOnOwningThread();
- // Weak ref to the stream control actor. The actor will always call either
- // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The
- // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
- // ForgetOnOwningThread() method call.
- StreamControl* mControl;
- const nsID mId;
- nsCOMPtr<nsIThread> mOwningThread;
- enum State
- {
- Open,
- Closed,
- NumStates
- };
- Atomic<State> mState;
- Atomic<bool> mHasEverBeenRead;
- // The wrapped stream objects may not be threadsafe. We need to be able
- // to close a stream on our owning thread while an IO thread is simultaneously
- // reading the same stream. Therefore, protect all access to these stream
- // objects with a mutex.
- Mutex mMutex;
- nsCOMPtr<nsIInputStream> mStream;
- nsCOMPtr<nsIInputStream> mSnappyStream;
- NS_INLINE_DECL_THREADSAFE_REFCOUNTING(cache::ReadStream::Inner, override)
- };
- // ----------------------------------------------------------------------------
- // Runnable to notify actors that the ReadStream has closed. This must
- // be done on the thread associated with the PBackground actor. Must be
- // cancelable to execute on Worker threads (which can occur when the
- // ReadStream is constructed on a child process Worker thread).
- class ReadStream::Inner::NoteClosedRunnable final : public CancelableRunnable
- {
- public:
- explicit NoteClosedRunnable(ReadStream::Inner* aStream)
- : mStream(aStream)
- { }
- NS_IMETHOD Run() override
- {
- mStream->NoteClosedOnOwningThread();
- mStream = nullptr;
- return NS_OK;
- }
- // Note, we must proceed with the Run() method since our actor will not
- // clean itself up until we note that the stream is closed.
- nsresult Cancel() override
- {
- Run();
- return NS_OK;
- }
- private:
- ~NoteClosedRunnable() { }
- RefPtr<ReadStream::Inner> mStream;
- };
- // ----------------------------------------------------------------------------
- // Runnable to clear actors without reporting that the ReadStream has
- // closed. Since this can trigger actor destruction, we need to do
- // it on the thread associated with the PBackground actor. Must be
- // cancelable to execute on Worker threads (which can occur when the
- // ReadStream is constructed on a child process Worker thread).
- class ReadStream::Inner::ForgetRunnable final : public CancelableRunnable
- {
- public:
- explicit ForgetRunnable(ReadStream::Inner* aStream)
- : mStream(aStream)
- { }
- NS_IMETHOD Run() override
- {
- mStream->ForgetOnOwningThread();
- mStream = nullptr;
- return NS_OK;
- }
- // Note, we must proceed with the Run() method so that we properly
- // call RemoveListener on the actor.
- nsresult Cancel() override
- {
- Run();
- return NS_OK;
- }
- private:
- ~ForgetRunnable() { }
- RefPtr<ReadStream::Inner> mStream;
- };
- // ----------------------------------------------------------------------------
- ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
- nsIInputStream* aStream)
- : mControl(aControl)
- , mId(aId)
- , mOwningThread(NS_GetCurrentThread())
- , mState(Open)
- , mHasEverBeenRead(false)
- , mMutex("dom::cache::ReadStream")
- , mStream(aStream)
- , mSnappyStream(new SnappyUncompressInputStream(aStream))
- {
- MOZ_DIAGNOSTIC_ASSERT(mStream);
- MOZ_DIAGNOSTIC_ASSERT(mControl);
- mControl->AddReadStream(this);
- }
- void
- ReadStream::Inner::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
- nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
- ErrorResult& aRv)
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
- MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
- *aReadStreamOut = CacheReadStream();
- Serialize(&aReadStreamOut->get_CacheReadStream(), aStreamCleanupList, aRv);
- }
- void
- ReadStream::Inner::Serialize(CacheReadStream* aReadStreamOut,
- nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
- ErrorResult& aRv)
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
- MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
- if (mState != Open) {
- aRv.ThrowTypeError<MSG_CACHE_STREAM_CLOSED>();
- return;
- }
- MOZ_DIAGNOSTIC_ASSERT(mControl);
- aReadStreamOut->id() = mId;
- mControl->SerializeControl(aReadStreamOut);
- {
- MutexAutoLock lock(mMutex);
- mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
- }
- MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().type() ==
- IPCStream::TInputStreamParamsWithFds);
- // We're passing ownership across the IPC barrier with the control, so
- // do not signal that the stream is closed here.
- Forget();
- }
- void
- ReadStream::Inner::CloseStream()
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
- Close();
- }
- void
- ReadStream::Inner::CloseStreamWithoutReporting()
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
- Forget();
- }
- bool
- ReadStream::Inner::MatchId(const nsID& aId) const
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
- return mId.Equals(aId);
- }
- bool
- ReadStream::Inner::HasEverBeenRead() const
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
- return mHasEverBeenRead;
- }
- nsresult
- ReadStream::Inner::Close()
- {
- // stream ops can happen on any thread
- nsresult rv = NS_OK;
- {
- MutexAutoLock lock(mMutex);
- rv = mSnappyStream->Close();
- }
- NoteClosed();
- return rv;
- }
- nsresult
- ReadStream::Inner::Available(uint64_t* aNumAvailableOut)
- {
- // stream ops can happen on any thread
- nsresult rv = NS_OK;
- {
- MutexAutoLock lock(mMutex);
- rv = mSnappyStream->Available(aNumAvailableOut);
- }
- if (NS_FAILED(rv)) {
- Close();
- }
- return rv;
- }
- nsresult
- ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
- {
- // stream ops can happen on any thread
- MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
- nsresult rv = NS_OK;
- {
- MutexAutoLock lock(mMutex);
- rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
- }
- if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
- *aNumReadOut == 0) {
- Close();
- }
- mHasEverBeenRead = true;
- return rv;
- }
- nsresult
- ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
- uint32_t aCount, uint32_t* aNumReadOut)
- {
- // stream ops can happen on any thread
- MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
- if (aCount) {
- mHasEverBeenRead = true;
- }
- nsresult rv = NS_OK;
- {
- MutexAutoLock lock(mMutex);
- rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
- }
- if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
- rv != NS_ERROR_NOT_IMPLEMENTED) || *aNumReadOut == 0) {
- Close();
- }
- // Verify bytes were actually read before marking as being ever read. For
- // example, code can test if the stream supports ReadSegments() by calling
- // this method with a dummy callback which doesn't read anything. We don't
- // want to trigger on that.
- if (*aNumReadOut) {
- mHasEverBeenRead = true;
- }
- return rv;
- }
- nsresult
- ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut)
- {
- // stream ops can happen on any thread
- MutexAutoLock lock(mMutex);
- return mSnappyStream->IsNonBlocking(aNonBlockingOut);
- }
- ReadStream::Inner::~Inner()
- {
- // Any thread
- MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
- MOZ_DIAGNOSTIC_ASSERT(!mControl);
- }
- void
- ReadStream::Inner::NoteClosed()
- {
- // Any thread
- if (mState == Closed) {
- return;
- }
- if (NS_GetCurrentThread() == mOwningThread) {
- NoteClosedOnOwningThread();
- return;
- }
- nsCOMPtr<nsIRunnable> runnable = new NoteClosedRunnable(this);
- MOZ_ALWAYS_SUCCEEDS(
- mOwningThread->Dispatch(runnable, nsIThread::DISPATCH_NORMAL));
- }
- void
- ReadStream::Inner::Forget()
- {
- // Any thread
- if (mState == Closed) {
- return;
- }
- if (NS_GetCurrentThread() == mOwningThread) {
- ForgetOnOwningThread();
- return;
- }
- nsCOMPtr<nsIRunnable> runnable = new ForgetRunnable(this);
- MOZ_ALWAYS_SUCCEEDS(
- mOwningThread->Dispatch(runnable, nsIThread::DISPATCH_NORMAL));
- }
- void
- ReadStream::Inner::NoteClosedOnOwningThread()
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
- // Mark closed and do nothing if we were already closed
- if (!mState.compareExchange(Open, Closed)) {
- return;
- }
- MOZ_DIAGNOSTIC_ASSERT(mControl);
- mControl->NoteClosed(this, mId);
- mControl = nullptr;
- }
- void
- ReadStream::Inner::ForgetOnOwningThread()
- {
- MOZ_ASSERT(NS_GetCurrentThread() == mOwningThread);
- // Mark closed and do nothing if we were already closed
- if (!mState.compareExchange(Open, Closed)) {
- return;
- }
- MOZ_DIAGNOSTIC_ASSERT(mControl);
- mControl->ForgetReadStream(this);
- mControl = nullptr;
- }
- // ----------------------------------------------------------------------------
- NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream);
- // static
- already_AddRefed<ReadStream>
- ReadStream::Create(const CacheReadStreamOrVoid& aReadStreamOrVoid)
- {
- if (aReadStreamOrVoid.type() == CacheReadStreamOrVoid::Tvoid_t) {
- return nullptr;
- }
- return Create(aReadStreamOrVoid.get_CacheReadStream());
- }
- // static
- already_AddRefed<ReadStream>
- ReadStream::Create(const CacheReadStream& aReadStream)
- {
- // The parameter may or may not be for a Cache created stream. The way we
- // tell is by looking at the stream control actor. If the actor exists,
- // then we know the Cache created it.
- if (!aReadStream.controlChild() && !aReadStream.controlParent()) {
- return nullptr;
- }
- MOZ_DIAGNOSTIC_ASSERT(aReadStream.stream().type() ==
- IPCStream::TInputStreamParamsWithFds);
- // Control is guaranteed to survive this method as ActorDestroy() cannot
- // run on this thread until we complete.
- StreamControl* control;
- if (aReadStream.controlChild()) {
- auto actor = static_cast<CacheStreamControlChild*>(aReadStream.controlChild());
- control = actor;
- } else {
- auto actor = static_cast<CacheStreamControlParent*>(aReadStream.controlParent());
- control = actor;
- }
- MOZ_DIAGNOSTIC_ASSERT(control);
- nsCOMPtr<nsIInputStream> stream = DeserializeIPCStream(aReadStream.stream());
- MOZ_DIAGNOSTIC_ASSERT(stream);
- // Currently we expect all cache read streams to be blocking file streams.
- #if !defined(RELEASE_OR_BETA)
- nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream);
- MOZ_DIAGNOSTIC_ASSERT(!asyncStream);
- #endif
- RefPtr<Inner> inner = new Inner(control, aReadStream.id(), stream);
- RefPtr<ReadStream> ref = new ReadStream(inner);
- return ref.forget();
- }
- // static
- already_AddRefed<ReadStream>
- ReadStream::Create(PCacheStreamControlParent* aControl, const nsID& aId,
- nsIInputStream* aStream)
- {
- MOZ_DIAGNOSTIC_ASSERT(aControl);
- auto actor = static_cast<CacheStreamControlParent*>(aControl);
- RefPtr<Inner> inner = new Inner(actor, aId, aStream);
- RefPtr<ReadStream> ref = new ReadStream(inner);
- return ref.forget();
- }
- void
- ReadStream::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
- nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
- ErrorResult& aRv)
- {
- mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
- }
- void
- ReadStream::Serialize(CacheReadStream* aReadStreamOut,
- nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
- ErrorResult& aRv)
- {
- mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
- }
- ReadStream::ReadStream(ReadStream::Inner* aInner)
- : mInner(aInner)
- {
- MOZ_DIAGNOSTIC_ASSERT(mInner);
- }
- ReadStream::~ReadStream()
- {
- // Explicitly close the inner stream so that it does not have to
- // deal with implicitly closing at destruction time.
- mInner->Close();
- }
- NS_IMETHODIMP
- ReadStream::Close()
- {
- return mInner->Close();
- }
- NS_IMETHODIMP
- ReadStream::Available(uint64_t* aNumAvailableOut)
- {
- return mInner->Available(aNumAvailableOut);
- }
- NS_IMETHODIMP
- ReadStream::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
- {
- return mInner->Read(aBuf, aCount, aNumReadOut);
- }
- NS_IMETHODIMP
- ReadStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
- uint32_t aCount, uint32_t* aNumReadOut)
- {
- return mInner->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
- }
- NS_IMETHODIMP
- ReadStream::IsNonBlocking(bool* aNonBlockingOut)
- {
- return mInner->IsNonBlocking(aNonBlockingOut);
- }
- } // namespace cache
- } // namespace dom
- } // namespace mozilla
|