123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564 |
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
- #include "nsStreamTransportService.h"
- #include "nsXPCOMCIDInternal.h"
- #include "nsNetSegmentUtils.h"
- #include "nsTransportUtils.h"
- #include "nsStreamUtils.h"
- #include "nsError.h"
- #include "nsNetCID.h"
- #include "nsIAsyncInputStream.h"
- #include "nsIAsyncOutputStream.h"
- #include "nsISeekableStream.h"
- #include "nsIPipe.h"
- #include "nsITransport.h"
- #include "nsIObserverService.h"
- #include "nsIThreadPool.h"
- #include "mozilla/Services.h"
- namespace mozilla {
- namespace net {
- //-----------------------------------------------------------------------------
- // nsInputStreamTransport
- //
- // Implements nsIInputStream as a wrapper around the real input stream. This
- // allows the transport to support seeking, range-limiting, progress reporting,
- // and close-when-done semantics while utilizing NS_AsyncCopy.
- //-----------------------------------------------------------------------------
- class nsInputStreamTransport : public nsITransport
- , public nsIInputStream
- {
- public:
- NS_DECL_THREADSAFE_ISUPPORTS
- NS_DECL_NSITRANSPORT
- NS_DECL_NSIINPUTSTREAM
- nsInputStreamTransport(nsIInputStream *source,
- uint64_t offset,
- uint64_t limit,
- bool closeWhenDone)
- : mSource(source)
- , mOffset(offset)
- , mLimit(limit)
- , mCloseWhenDone(closeWhenDone)
- , mFirstTime(true)
- , mInProgress(false)
- {
- }
- private:
- virtual ~nsInputStreamTransport()
- {
- }
- nsCOMPtr<nsIAsyncInputStream> mPipeIn;
- // while the copy is active, these members may only be accessed from the
- // nsIInputStream implementation.
- nsCOMPtr<nsITransportEventSink> mEventSink;
- nsCOMPtr<nsIInputStream> mSource;
- int64_t mOffset;
- int64_t mLimit;
- bool mCloseWhenDone;
- bool mFirstTime;
- // this variable serves as a lock to prevent the state of the transport
- // from being modified once the copy is in progress.
- bool mInProgress;
- };
- NS_IMPL_ISUPPORTS(nsInputStreamTransport,
- nsITransport,
- nsIInputStream)
- /** nsITransport **/
- NS_IMETHODIMP
- nsInputStreamTransport::OpenInputStream(uint32_t flags,
- uint32_t segsize,
- uint32_t segcount,
- nsIInputStream **result)
- {
- NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
- nsresult rv;
- nsCOMPtr<nsIEventTarget> target =
- do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
- if (NS_FAILED(rv)) return rv;
- // XXX if the caller requests an unbuffered stream, then perhaps
- // we'd want to simply return mSource; however, then we would
- // not be reading mSource on a background thread. is this ok?
-
- bool nonblocking = !(flags & OPEN_BLOCKING);
- net_ResolveSegmentParams(segsize, segcount);
- nsCOMPtr<nsIAsyncOutputStream> pipeOut;
- rv = NS_NewPipe2(getter_AddRefs(mPipeIn),
- getter_AddRefs(pipeOut),
- nonblocking, true,
- segsize, segcount);
- if (NS_FAILED(rv)) return rv;
- mInProgress = true;
- // startup async copy process...
- rv = NS_AsyncCopy(this, pipeOut, target,
- NS_ASYNCCOPY_VIA_WRITESEGMENTS, segsize);
- if (NS_SUCCEEDED(rv))
- NS_ADDREF(*result = mPipeIn);
- return rv;
- }
- NS_IMETHODIMP
- nsInputStreamTransport::OpenOutputStream(uint32_t flags,
- uint32_t segsize,
- uint32_t segcount,
- nsIOutputStream **result)
- {
- // this transport only supports reading!
- NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream");
- return NS_ERROR_UNEXPECTED;
- }
- NS_IMETHODIMP
- nsInputStreamTransport::Close(nsresult reason)
- {
- if (NS_SUCCEEDED(reason))
- reason = NS_BASE_STREAM_CLOSED;
- return mPipeIn->CloseWithStatus(reason);
- }
- NS_IMETHODIMP
- nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink,
- nsIEventTarget *target)
- {
- NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
- if (target)
- return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink),
- sink, target);
- mEventSink = sink;
- return NS_OK;
- }
- /** nsIInputStream **/
- NS_IMETHODIMP
- nsInputStreamTransport::Close()
- {
- if (mCloseWhenDone)
- mSource->Close();
- // make additional reads return early...
- mOffset = mLimit = 0;
- return NS_OK;
- }
- NS_IMETHODIMP
- nsInputStreamTransport::Available(uint64_t *result)
- {
- return NS_ERROR_NOT_IMPLEMENTED;
- }
- NS_IMETHODIMP
- nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result)
- {
- if (mFirstTime) {
- mFirstTime = false;
- if (mOffset != 0) {
- // read from current position if offset equal to max
- if (mOffset != -1) {
- nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSource);
- if (seekable)
- seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset);
- }
- // reset offset to zero so we can use it to enforce limit
- mOffset = 0;
- }
- }
- // limit amount read
- uint64_t max = count;
- if (mLimit != -1) {
- max = mLimit - mOffset;
- if (max == 0) {
- *result = 0;
- return NS_OK;
- }
- }
- if (count > max)
- count = static_cast<uint32_t>(max);
- nsresult rv = mSource->Read(buf, count, result);
- if (NS_SUCCEEDED(rv)) {
- mOffset += *result;
- if (mEventSink)
- mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset,
- mLimit);
- }
- return rv;
- }
- NS_IMETHODIMP
- nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure,
- uint32_t count, uint32_t *result)
- {
- return NS_ERROR_NOT_IMPLEMENTED;
- }
- NS_IMETHODIMP
- nsInputStreamTransport::IsNonBlocking(bool *result)
- {
- *result = false;
- return NS_OK;
- }
- //-----------------------------------------------------------------------------
- // nsOutputStreamTransport
- //
- // Implements nsIOutputStream as a wrapper around the real input stream. This
- // allows the transport to support seeking, range-limiting, progress reporting,
- // and close-when-done semantics while utilizing NS_AsyncCopy.
- //-----------------------------------------------------------------------------
- class nsOutputStreamTransport : public nsITransport
- , public nsIOutputStream
- {
- public:
- NS_DECL_THREADSAFE_ISUPPORTS
- NS_DECL_NSITRANSPORT
- NS_DECL_NSIOUTPUTSTREAM
- nsOutputStreamTransport(nsIOutputStream *sink,
- int64_t offset,
- int64_t limit,
- bool closeWhenDone)
- : mSink(sink)
- , mOffset(offset)
- , mLimit(limit)
- , mCloseWhenDone(closeWhenDone)
- , mFirstTime(true)
- , mInProgress(false)
- {
- }
- private:
- virtual ~nsOutputStreamTransport()
- {
- }
- nsCOMPtr<nsIAsyncOutputStream> mPipeOut;
-
- // while the copy is active, these members may only be accessed from the
- // nsIOutputStream implementation.
- nsCOMPtr<nsITransportEventSink> mEventSink;
- nsCOMPtr<nsIOutputStream> mSink;
- int64_t mOffset;
- int64_t mLimit;
- bool mCloseWhenDone;
- bool mFirstTime;
- // this variable serves as a lock to prevent the state of the transport
- // from being modified once the copy is in progress.
- bool mInProgress;
- };
- NS_IMPL_ISUPPORTS(nsOutputStreamTransport,
- nsITransport,
- nsIOutputStream)
- /** nsITransport **/
- NS_IMETHODIMP
- nsOutputStreamTransport::OpenInputStream(uint32_t flags,
- uint32_t segsize,
- uint32_t segcount,
- nsIInputStream **result)
- {
- // this transport only supports writing!
- NS_NOTREACHED("nsOutputStreamTransport::OpenInputStream");
- return NS_ERROR_UNEXPECTED;
- }
- NS_IMETHODIMP
- nsOutputStreamTransport::OpenOutputStream(uint32_t flags,
- uint32_t segsize,
- uint32_t segcount,
- nsIOutputStream **result)
- {
- NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
- nsresult rv;
- nsCOMPtr<nsIEventTarget> target =
- do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
- if (NS_FAILED(rv)) return rv;
- // XXX if the caller requests an unbuffered stream, then perhaps
- // we'd want to simply return mSink; however, then we would
- // not be writing to mSink on a background thread. is this ok?
-
- bool nonblocking = !(flags & OPEN_BLOCKING);
- net_ResolveSegmentParams(segsize, segcount);
- nsCOMPtr<nsIAsyncInputStream> pipeIn;
- rv = NS_NewPipe2(getter_AddRefs(pipeIn),
- getter_AddRefs(mPipeOut),
- true, nonblocking,
- segsize, segcount);
- if (NS_FAILED(rv)) return rv;
- mInProgress = true;
- // startup async copy process...
- rv = NS_AsyncCopy(pipeIn, this, target,
- NS_ASYNCCOPY_VIA_READSEGMENTS, segsize);
- if (NS_SUCCEEDED(rv))
- NS_ADDREF(*result = mPipeOut);
- return rv;
- }
- NS_IMETHODIMP
- nsOutputStreamTransport::Close(nsresult reason)
- {
- if (NS_SUCCEEDED(reason))
- reason = NS_BASE_STREAM_CLOSED;
- return mPipeOut->CloseWithStatus(reason);
- }
- NS_IMETHODIMP
- nsOutputStreamTransport::SetEventSink(nsITransportEventSink *sink,
- nsIEventTarget *target)
- {
- NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
- if (target)
- return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink),
- sink, target);
- mEventSink = sink;
- return NS_OK;
- }
- /** nsIOutputStream **/
- NS_IMETHODIMP
- nsOutputStreamTransport::Close()
- {
- if (mCloseWhenDone)
- mSink->Close();
- // make additional writes return early...
- mOffset = mLimit = 0;
- return NS_OK;
- }
- NS_IMETHODIMP
- nsOutputStreamTransport::Flush()
- {
- return NS_OK;
- }
- NS_IMETHODIMP
- nsOutputStreamTransport::Write(const char *buf, uint32_t count, uint32_t *result)
- {
- if (mFirstTime) {
- mFirstTime = false;
- if (mOffset != 0) {
- // write to current position if offset equal to max
- if (mOffset != -1) {
- nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSink);
- if (seekable)
- seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset);
- }
- // reset offset to zero so we can use it to enforce limit
- mOffset = 0;
- }
- }
- // limit amount written
- uint64_t max = count;
- if (mLimit != -1) {
- max = mLimit - mOffset;
- if (max == 0) {
- *result = 0;
- return NS_OK;
- }
- }
- if (count > max)
- count = static_cast<uint32_t>(max);
- nsresult rv = mSink->Write(buf, count, result);
- if (NS_SUCCEEDED(rv)) {
- mOffset += *result;
- if (mEventSink)
- mEventSink->OnTransportStatus(this, NS_NET_STATUS_WRITING, mOffset,
- mLimit);
- }
- return rv;
- }
- NS_IMETHODIMP
- nsOutputStreamTransport::WriteSegments(nsReadSegmentFun reader, void *closure,
- uint32_t count, uint32_t *result)
- {
- return NS_ERROR_NOT_IMPLEMENTED;
- }
- NS_IMETHODIMP
- nsOutputStreamTransport::WriteFrom(nsIInputStream *in, uint32_t count, uint32_t *result)
- {
- return NS_ERROR_NOT_IMPLEMENTED;
- }
- NS_IMETHODIMP
- nsOutputStreamTransport::IsNonBlocking(bool *result)
- {
- *result = false;
- return NS_OK;
- }
- //-----------------------------------------------------------------------------
- // nsStreamTransportService
- //-----------------------------------------------------------------------------
- nsStreamTransportService::~nsStreamTransportService()
- {
- NS_ASSERTION(!mPool, "thread pool wasn't shutdown");
- }
- nsresult
- nsStreamTransportService::Init()
- {
- mPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID);
- NS_ENSURE_STATE(mPool);
- // Configure the pool
- mPool->SetName(NS_LITERAL_CSTRING("StreamTrans"));
- mPool->SetThreadLimit(25);
- mPool->SetIdleThreadLimit(1);
- mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30));
- nsCOMPtr<nsIObserverService> obsSvc =
- mozilla::services::GetObserverService();
- if (obsSvc)
- obsSvc->AddObserver(this, "xpcom-shutdown-threads", false);
- return NS_OK;
- }
- NS_IMPL_ISUPPORTS(nsStreamTransportService,
- nsIStreamTransportService,
- nsIEventTarget,
- nsIObserver)
- NS_IMETHODIMP
- nsStreamTransportService::DispatchFromScript(nsIRunnable *task, uint32_t flags)
- {
- nsCOMPtr<nsIRunnable> event(task);
- return Dispatch(event.forget(), flags);
- }
- NS_IMETHODIMP
- nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task, uint32_t flags)
- {
- nsCOMPtr<nsIRunnable> event(task); // so it gets released on failure paths
- nsCOMPtr<nsIThreadPool> pool;
- {
- mozilla::MutexAutoLock lock(mShutdownLock);
- if (mIsShutdown) {
- return NS_ERROR_NOT_INITIALIZED;
- }
- pool = mPool;
- }
- NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
- return pool->Dispatch(event.forget(), flags);
- }
- NS_IMETHODIMP
- nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable>, uint32_t)
- {
- return NS_ERROR_NOT_IMPLEMENTED;
- }
- NS_IMETHODIMP
- nsStreamTransportService::IsOnCurrentThread(bool *result)
- {
- nsCOMPtr<nsIThreadPool> pool;
- {
- mozilla::MutexAutoLock lock(mShutdownLock);
- if (mIsShutdown) {
- return NS_ERROR_NOT_INITIALIZED;
- }
- pool = mPool;
- }
- NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
- return pool->IsOnCurrentThread(result);
- }
- NS_IMETHODIMP
- nsStreamTransportService::CreateInputTransport(nsIInputStream *stream,
- int64_t offset,
- int64_t limit,
- bool closeWhenDone,
- nsITransport **result)
- {
- nsInputStreamTransport *trans =
- new nsInputStreamTransport(stream, offset, limit, closeWhenDone);
- if (!trans)
- return NS_ERROR_OUT_OF_MEMORY;
- NS_ADDREF(*result = trans);
- return NS_OK;
- }
- NS_IMETHODIMP
- nsStreamTransportService::CreateOutputTransport(nsIOutputStream *stream,
- int64_t offset,
- int64_t limit,
- bool closeWhenDone,
- nsITransport **result)
- {
- nsOutputStreamTransport *trans =
- new nsOutputStreamTransport(stream, offset, limit, closeWhenDone);
- if (!trans)
- return NS_ERROR_OUT_OF_MEMORY;
- NS_ADDREF(*result = trans);
- return NS_OK;
- }
- NS_IMETHODIMP
- nsStreamTransportService::Observe(nsISupports *subject, const char *topic,
- const char16_t *data)
- {
- NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops");
- {
- mozilla::MutexAutoLock lock(mShutdownLock);
- mIsShutdown = true;
- }
- if (mPool) {
- mPool->Shutdown();
- mPool = nullptr;
- }
- return NS_OK;
- }
- } // namespace net
- } // namespace mozilla
|