123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
- #include "nsAsyncStreamCopier.h"
- #include "nsIOService.h"
- #include "nsIEventTarget.h"
- #include "nsStreamUtils.h"
- #include "nsThreadUtils.h"
- #include "nsNetUtil.h"
- #include "nsNetCID.h"
- #include "nsIBufferedStreams.h"
- #include "nsIRequestObserver.h"
- #include "mozilla/Logging.h"
- using namespace mozilla;
- using namespace mozilla::net;
- #undef LOG
- //
- // MOZ_LOG=nsStreamCopier:5
- //
- static LazyLogModule gStreamCopierLog("nsStreamCopier");
- #define LOG(args) MOZ_LOG(gStreamCopierLog, mozilla::LogLevel::Debug, args)
- /**
- * An event used to perform initialization off the main thread.
- */
- class AsyncApplyBufferingPolicyEvent final: public Runnable
- {
- public:
- /**
- * @param aCopier
- * The nsAsyncStreamCopier requesting the information.
- */
- explicit AsyncApplyBufferingPolicyEvent(nsAsyncStreamCopier* aCopier)
- : mCopier(aCopier)
- , mTarget(NS_GetCurrentThread())
- { }
- NS_IMETHOD Run() override
- {
- nsresult rv = mCopier->ApplyBufferingPolicy();
- if (NS_FAILED(rv)) {
- mCopier->Cancel(rv);
- return NS_OK;
- }
- rv = mTarget->Dispatch(NewRunnableMethod(mCopier,
- &nsAsyncStreamCopier::AsyncCopyInternal),
- NS_DISPATCH_NORMAL);
- MOZ_ASSERT(NS_SUCCEEDED(rv));
- if (NS_FAILED(rv)) {
- mCopier->Cancel(rv);
- }
- return NS_OK;
- }
- private:
- RefPtr<nsAsyncStreamCopier> mCopier;
- nsCOMPtr<nsIEventTarget> mTarget;
- };
- //-----------------------------------------------------------------------------
- nsAsyncStreamCopier::nsAsyncStreamCopier()
- : mLock("nsAsyncStreamCopier.mLock")
- , mMode(NS_ASYNCCOPY_VIA_READSEGMENTS)
- , mChunkSize(nsIOService::gDefaultSegmentSize)
- , mStatus(NS_OK)
- , mIsPending(false)
- , mShouldSniffBuffering(false)
- {
- LOG(("Creating nsAsyncStreamCopier @%x\n", this));
- }
- nsAsyncStreamCopier::~nsAsyncStreamCopier()
- {
- LOG(("Destroying nsAsyncStreamCopier @%x\n", this));
- }
- bool
- nsAsyncStreamCopier::IsComplete(nsresult *status)
- {
- MutexAutoLock lock(mLock);
- if (status)
- *status = mStatus;
- return !mIsPending;
- }
- nsIRequest*
- nsAsyncStreamCopier::AsRequest()
- {
- return static_cast<nsIRequest*>(static_cast<nsIAsyncStreamCopier*>(this));
- }
- void
- nsAsyncStreamCopier::Complete(nsresult status)
- {
- LOG(("nsAsyncStreamCopier::Complete [this=%p status=%x]\n", this, status));
- nsCOMPtr<nsIRequestObserver> observer;
- nsCOMPtr<nsISupports> ctx;
- {
- MutexAutoLock lock(mLock);
- mCopierCtx = nullptr;
- if (mIsPending) {
- mIsPending = false;
- mStatus = status;
- // setup OnStopRequest callback and release references...
- observer = mObserver;
- mObserver = nullptr;
- }
- }
- if (observer) {
- LOG((" calling OnStopRequest [status=%x]\n", status));
- observer->OnStopRequest(AsRequest(), ctx, status);
- }
- }
- void
- nsAsyncStreamCopier::OnAsyncCopyComplete(void *closure, nsresult status)
- {
- nsAsyncStreamCopier *self = (nsAsyncStreamCopier *) closure;
- self->Complete(status);
- NS_RELEASE(self); // addref'd in AsyncCopy
- }
- //-----------------------------------------------------------------------------
- // nsISupports
- // We cannot use simply NS_IMPL_ISUPPORTSx as both
- // nsIAsyncStreamCopier and nsIAsyncStreamCopier2 implement nsIRequest
- NS_IMPL_ADDREF(nsAsyncStreamCopier)
- NS_IMPL_RELEASE(nsAsyncStreamCopier)
- NS_INTERFACE_TABLE_HEAD(nsAsyncStreamCopier)
- NS_INTERFACE_TABLE_BEGIN
- NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier)
- NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier2)
- NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsIRequest, nsIAsyncStreamCopier)
- NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsISupports, nsIAsyncStreamCopier)
- NS_INTERFACE_TABLE_END
- NS_INTERFACE_TABLE_TAIL
- //-----------------------------------------------------------------------------
- // nsIRequest
- NS_IMETHODIMP
- nsAsyncStreamCopier::GetName(nsACString &name)
- {
- name.Truncate();
- return NS_OK;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::IsPending(bool *result)
- {
- *result = !IsComplete();
- return NS_OK;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::GetStatus(nsresult *status)
- {
- IsComplete(status);
- return NS_OK;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::Cancel(nsresult status)
- {
- nsCOMPtr<nsISupports> copierCtx;
- {
- MutexAutoLock lock(mLock);
- if (!mIsPending)
- return NS_OK;
- copierCtx.swap(mCopierCtx);
- }
- if (NS_SUCCEEDED(status)) {
- NS_WARNING("cancel with non-failure status code");
- status = NS_BASE_STREAM_CLOSED;
- }
- if (copierCtx)
- NS_CancelAsyncCopy(copierCtx, status);
- return NS_OK;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::Suspend()
- {
- NS_NOTREACHED("nsAsyncStreamCopier::Suspend");
- return NS_ERROR_NOT_IMPLEMENTED;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::Resume()
- {
- NS_NOTREACHED("nsAsyncStreamCopier::Resume");
- return NS_ERROR_NOT_IMPLEMENTED;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::GetLoadFlags(nsLoadFlags *aLoadFlags)
- {
- *aLoadFlags = LOAD_NORMAL;
- return NS_OK;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::SetLoadFlags(nsLoadFlags aLoadFlags)
- {
- return NS_OK;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::GetLoadGroup(nsILoadGroup **aLoadGroup)
- {
- *aLoadGroup = nullptr;
- return NS_OK;
- }
- NS_IMETHODIMP
- nsAsyncStreamCopier::SetLoadGroup(nsILoadGroup *aLoadGroup)
- {
- return NS_OK;
- }
- nsresult
- nsAsyncStreamCopier::InitInternal(nsIInputStream *source,
- nsIOutputStream *sink,
- nsIEventTarget *target,
- uint32_t chunkSize,
- bool closeSource,
- bool closeSink)
- {
- NS_ASSERTION(!mSource && !mSink, "Init() called more than once");
- if (chunkSize == 0) {
- chunkSize = nsIOService::gDefaultSegmentSize;
- }
- mChunkSize = chunkSize;
- mSource = source;
- mSink = sink;
- mCloseSource = closeSource;
- mCloseSink = closeSink;
- if (target) {
- mTarget = target;
- } else {
- nsresult rv;
- mTarget = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
- if (NS_FAILED(rv)) {
- return rv;
- }
- }
- return NS_OK;
- }
- //-----------------------------------------------------------------------------
- // nsIAsyncStreamCopier
- NS_IMETHODIMP
- nsAsyncStreamCopier::Init(nsIInputStream *source,
- nsIOutputStream *sink,
- nsIEventTarget *target,
- bool sourceBuffered,
- bool sinkBuffered,
- uint32_t chunkSize,
- bool closeSource,
- bool closeSink)
- {
- NS_ASSERTION(sourceBuffered || sinkBuffered, "at least one stream must be buffered");
- mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS
- : NS_ASYNCCOPY_VIA_WRITESEGMENTS;
- return InitInternal(source, sink, target, chunkSize, closeSource, closeSink);
- }
- //-----------------------------------------------------------------------------
- // nsIAsyncStreamCopier2
- NS_IMETHODIMP
- nsAsyncStreamCopier::Init(nsIInputStream *source,
- nsIOutputStream *sink,
- nsIEventTarget *target,
- uint32_t chunkSize,
- bool closeSource,
- bool closeSink)
- {
- mShouldSniffBuffering = true;
- return InitInternal(source, sink, target, chunkSize, closeSource, closeSink);
- }
- /**
- * Detect whether the input or the output stream is buffered,
- * bufferize one of them if neither is buffered.
- */
- nsresult
- nsAsyncStreamCopier::ApplyBufferingPolicy()
- {
- // This function causes I/O, it must not be executed on the main
- // thread.
- MOZ_ASSERT(!NS_IsMainThread());
- if (NS_OutputStreamIsBuffered(mSink)) {
- // Sink is buffered, no need to perform additional buffering
- mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS;
- return NS_OK;
- }
- if (NS_InputStreamIsBuffered(mSource)) {
- // Source is buffered, no need to perform additional buffering
- mMode = NS_ASYNCCOPY_VIA_READSEGMENTS;
- return NS_OK;
- }
- // No buffering, let's buffer the sink
- nsresult rv;
- nsCOMPtr<nsIBufferedOutputStream> sink =
- do_CreateInstance(NS_BUFFEREDOUTPUTSTREAM_CONTRACTID, &rv);
- if (NS_FAILED(rv)) {
- return rv;
- }
- rv = sink->Init(mSink, mChunkSize);
- if (NS_FAILED(rv)) {
- return rv;
- }
- mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS;
- mSink = sink;
- return NS_OK;
- }
- //-----------------------------------------------------------------------------
- // Both nsIAsyncStreamCopier and nsIAsyncStreamCopier2
- NS_IMETHODIMP
- nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver *observer, nsISupports *ctx)
- {
- LOG(("nsAsyncStreamCopier::AsyncCopy [this=%p observer=%x]\n", this, observer));
- NS_ASSERTION(mSource && mSink, "not initialized");
- nsresult rv;
- if (observer) {
- // build proxy for observer events
- rv = NS_NewRequestObserverProxy(getter_AddRefs(mObserver), observer, ctx);
- if (NS_FAILED(rv)) return rv;
- }
- // from this point forward, AsyncCopy is going to return NS_OK. any errors
- // will be reported via OnStopRequest.
- mIsPending = true;
- if (mObserver) {
- rv = mObserver->OnStartRequest(AsRequest(), nullptr);
- if (NS_FAILED(rv))
- Cancel(rv);
- }
- if (!mShouldSniffBuffering) {
- // No buffer sniffing required, let's proceed
- AsyncCopyInternal();
- return NS_OK;
- }
- if (NS_IsMainThread()) {
- // Don't perform buffer sniffing on the main thread
- nsCOMPtr<nsIRunnable> event = new AsyncApplyBufferingPolicyEvent(this);
- rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL);
- if (NS_FAILED(rv)) {
- Cancel(rv);
- }
- return NS_OK;
- }
- // We're not going to block the main thread, so let's sniff here
- rv = ApplyBufferingPolicy();
- if (NS_FAILED(rv)) {
- Cancel(rv);
- }
- AsyncCopyInternal();
- return NS_OK;
- }
- // Launch async copy.
- // All errors are reported through the observer.
- void
- nsAsyncStreamCopier::AsyncCopyInternal()
- {
- MOZ_ASSERT(mMode == NS_ASYNCCOPY_VIA_READSEGMENTS
- || mMode == NS_ASYNCCOPY_VIA_WRITESEGMENTS);
- nsresult rv;
- // we want to receive progress notifications; release happens in
- // OnAsyncCopyComplete.
- NS_ADDREF_THIS();
- {
- MutexAutoLock lock(mLock);
- rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize,
- OnAsyncCopyComplete, this, mCloseSource, mCloseSink,
- getter_AddRefs(mCopierCtx));
- }
- if (NS_FAILED(rv)) {
- NS_RELEASE_THIS();
- Cancel(rv);
- }
- }
|