nsAsyncStreamCopier.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "nsAsyncStreamCopier.h"
  5. #include "nsIOService.h"
  6. #include "nsIEventTarget.h"
  7. #include "nsStreamUtils.h"
  8. #include "nsThreadUtils.h"
  9. #include "nsNetUtil.h"
  10. #include "nsNetCID.h"
  11. #include "nsIBufferedStreams.h"
  12. #include "nsIRequestObserver.h"
  13. #include "mozilla/Logging.h"
  14. using namespace mozilla;
  15. using namespace mozilla::net;
  16. #undef LOG
  17. //
  18. // MOZ_LOG=nsStreamCopier:5
  19. //
  20. static LazyLogModule gStreamCopierLog("nsStreamCopier");
  21. #define LOG(args) MOZ_LOG(gStreamCopierLog, mozilla::LogLevel::Debug, args)
  22. /**
  23. * An event used to perform initialization off the main thread.
  24. */
  25. class AsyncApplyBufferingPolicyEvent final: public Runnable
  26. {
  27. public:
  28. /**
  29. * @param aCopier
  30. * The nsAsyncStreamCopier requesting the information.
  31. */
  32. explicit AsyncApplyBufferingPolicyEvent(nsAsyncStreamCopier* aCopier)
  33. : mCopier(aCopier)
  34. , mTarget(NS_GetCurrentThread())
  35. { }
  36. NS_IMETHOD Run() override
  37. {
  38. nsresult rv = mCopier->ApplyBufferingPolicy();
  39. if (NS_FAILED(rv)) {
  40. mCopier->Cancel(rv);
  41. return NS_OK;
  42. }
  43. rv = mTarget->Dispatch(NewRunnableMethod(mCopier,
  44. &nsAsyncStreamCopier::AsyncCopyInternal),
  45. NS_DISPATCH_NORMAL);
  46. MOZ_ASSERT(NS_SUCCEEDED(rv));
  47. if (NS_FAILED(rv)) {
  48. mCopier->Cancel(rv);
  49. }
  50. return NS_OK;
  51. }
  52. private:
  53. RefPtr<nsAsyncStreamCopier> mCopier;
  54. nsCOMPtr<nsIEventTarget> mTarget;
  55. };
  56. //-----------------------------------------------------------------------------
  57. nsAsyncStreamCopier::nsAsyncStreamCopier()
  58. : mLock("nsAsyncStreamCopier.mLock")
  59. , mMode(NS_ASYNCCOPY_VIA_READSEGMENTS)
  60. , mChunkSize(nsIOService::gDefaultSegmentSize)
  61. , mStatus(NS_OK)
  62. , mIsPending(false)
  63. , mShouldSniffBuffering(false)
  64. {
  65. LOG(("Creating nsAsyncStreamCopier @%x\n", this));
  66. }
  67. nsAsyncStreamCopier::~nsAsyncStreamCopier()
  68. {
  69. LOG(("Destroying nsAsyncStreamCopier @%x\n", this));
  70. }
  71. bool
  72. nsAsyncStreamCopier::IsComplete(nsresult *status)
  73. {
  74. MutexAutoLock lock(mLock);
  75. if (status)
  76. *status = mStatus;
  77. return !mIsPending;
  78. }
  79. nsIRequest*
  80. nsAsyncStreamCopier::AsRequest()
  81. {
  82. return static_cast<nsIRequest*>(static_cast<nsIAsyncStreamCopier*>(this));
  83. }
  84. void
  85. nsAsyncStreamCopier::Complete(nsresult status)
  86. {
  87. LOG(("nsAsyncStreamCopier::Complete [this=%p status=%x]\n", this, status));
  88. nsCOMPtr<nsIRequestObserver> observer;
  89. nsCOMPtr<nsISupports> ctx;
  90. {
  91. MutexAutoLock lock(mLock);
  92. mCopierCtx = nullptr;
  93. if (mIsPending) {
  94. mIsPending = false;
  95. mStatus = status;
  96. // setup OnStopRequest callback and release references...
  97. observer = mObserver;
  98. mObserver = nullptr;
  99. }
  100. }
  101. if (observer) {
  102. LOG((" calling OnStopRequest [status=%x]\n", status));
  103. observer->OnStopRequest(AsRequest(), ctx, status);
  104. }
  105. }
  106. void
  107. nsAsyncStreamCopier::OnAsyncCopyComplete(void *closure, nsresult status)
  108. {
  109. nsAsyncStreamCopier *self = (nsAsyncStreamCopier *) closure;
  110. self->Complete(status);
  111. NS_RELEASE(self); // addref'd in AsyncCopy
  112. }
  113. //-----------------------------------------------------------------------------
  114. // nsISupports
  115. // We cannot use simply NS_IMPL_ISUPPORTSx as both
  116. // nsIAsyncStreamCopier and nsIAsyncStreamCopier2 implement nsIRequest
  117. NS_IMPL_ADDREF(nsAsyncStreamCopier)
  118. NS_IMPL_RELEASE(nsAsyncStreamCopier)
  119. NS_INTERFACE_TABLE_HEAD(nsAsyncStreamCopier)
  120. NS_INTERFACE_TABLE_BEGIN
  121. NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier)
  122. NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier2)
  123. NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsIRequest, nsIAsyncStreamCopier)
  124. NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsISupports, nsIAsyncStreamCopier)
  125. NS_INTERFACE_TABLE_END
  126. NS_INTERFACE_TABLE_TAIL
  127. //-----------------------------------------------------------------------------
  128. // nsIRequest
  129. NS_IMETHODIMP
  130. nsAsyncStreamCopier::GetName(nsACString &name)
  131. {
  132. name.Truncate();
  133. return NS_OK;
  134. }
  135. NS_IMETHODIMP
  136. nsAsyncStreamCopier::IsPending(bool *result)
  137. {
  138. *result = !IsComplete();
  139. return NS_OK;
  140. }
  141. NS_IMETHODIMP
  142. nsAsyncStreamCopier::GetStatus(nsresult *status)
  143. {
  144. IsComplete(status);
  145. return NS_OK;
  146. }
  147. NS_IMETHODIMP
  148. nsAsyncStreamCopier::Cancel(nsresult status)
  149. {
  150. nsCOMPtr<nsISupports> copierCtx;
  151. {
  152. MutexAutoLock lock(mLock);
  153. if (!mIsPending)
  154. return NS_OK;
  155. copierCtx.swap(mCopierCtx);
  156. }
  157. if (NS_SUCCEEDED(status)) {
  158. NS_WARNING("cancel with non-failure status code");
  159. status = NS_BASE_STREAM_CLOSED;
  160. }
  161. if (copierCtx)
  162. NS_CancelAsyncCopy(copierCtx, status);
  163. return NS_OK;
  164. }
  165. NS_IMETHODIMP
  166. nsAsyncStreamCopier::Suspend()
  167. {
  168. NS_NOTREACHED("nsAsyncStreamCopier::Suspend");
  169. return NS_ERROR_NOT_IMPLEMENTED;
  170. }
  171. NS_IMETHODIMP
  172. nsAsyncStreamCopier::Resume()
  173. {
  174. NS_NOTREACHED("nsAsyncStreamCopier::Resume");
  175. return NS_ERROR_NOT_IMPLEMENTED;
  176. }
  177. NS_IMETHODIMP
  178. nsAsyncStreamCopier::GetLoadFlags(nsLoadFlags *aLoadFlags)
  179. {
  180. *aLoadFlags = LOAD_NORMAL;
  181. return NS_OK;
  182. }
  183. NS_IMETHODIMP
  184. nsAsyncStreamCopier::SetLoadFlags(nsLoadFlags aLoadFlags)
  185. {
  186. return NS_OK;
  187. }
  188. NS_IMETHODIMP
  189. nsAsyncStreamCopier::GetLoadGroup(nsILoadGroup **aLoadGroup)
  190. {
  191. *aLoadGroup = nullptr;
  192. return NS_OK;
  193. }
  194. NS_IMETHODIMP
  195. nsAsyncStreamCopier::SetLoadGroup(nsILoadGroup *aLoadGroup)
  196. {
  197. return NS_OK;
  198. }
  199. nsresult
  200. nsAsyncStreamCopier::InitInternal(nsIInputStream *source,
  201. nsIOutputStream *sink,
  202. nsIEventTarget *target,
  203. uint32_t chunkSize,
  204. bool closeSource,
  205. bool closeSink)
  206. {
  207. NS_ASSERTION(!mSource && !mSink, "Init() called more than once");
  208. if (chunkSize == 0) {
  209. chunkSize = nsIOService::gDefaultSegmentSize;
  210. }
  211. mChunkSize = chunkSize;
  212. mSource = source;
  213. mSink = sink;
  214. mCloseSource = closeSource;
  215. mCloseSink = closeSink;
  216. if (target) {
  217. mTarget = target;
  218. } else {
  219. nsresult rv;
  220. mTarget = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
  221. if (NS_FAILED(rv)) {
  222. return rv;
  223. }
  224. }
  225. return NS_OK;
  226. }
  227. //-----------------------------------------------------------------------------
  228. // nsIAsyncStreamCopier
  229. NS_IMETHODIMP
  230. nsAsyncStreamCopier::Init(nsIInputStream *source,
  231. nsIOutputStream *sink,
  232. nsIEventTarget *target,
  233. bool sourceBuffered,
  234. bool sinkBuffered,
  235. uint32_t chunkSize,
  236. bool closeSource,
  237. bool closeSink)
  238. {
  239. NS_ASSERTION(sourceBuffered || sinkBuffered, "at least one stream must be buffered");
  240. mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS
  241. : NS_ASYNCCOPY_VIA_WRITESEGMENTS;
  242. return InitInternal(source, sink, target, chunkSize, closeSource, closeSink);
  243. }
  244. //-----------------------------------------------------------------------------
  245. // nsIAsyncStreamCopier2
  246. NS_IMETHODIMP
  247. nsAsyncStreamCopier::Init(nsIInputStream *source,
  248. nsIOutputStream *sink,
  249. nsIEventTarget *target,
  250. uint32_t chunkSize,
  251. bool closeSource,
  252. bool closeSink)
  253. {
  254. mShouldSniffBuffering = true;
  255. return InitInternal(source, sink, target, chunkSize, closeSource, closeSink);
  256. }
  257. /**
  258. * Detect whether the input or the output stream is buffered,
  259. * bufferize one of them if neither is buffered.
  260. */
  261. nsresult
  262. nsAsyncStreamCopier::ApplyBufferingPolicy()
  263. {
  264. // This function causes I/O, it must not be executed on the main
  265. // thread.
  266. MOZ_ASSERT(!NS_IsMainThread());
  267. if (NS_OutputStreamIsBuffered(mSink)) {
  268. // Sink is buffered, no need to perform additional buffering
  269. mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS;
  270. return NS_OK;
  271. }
  272. if (NS_InputStreamIsBuffered(mSource)) {
  273. // Source is buffered, no need to perform additional buffering
  274. mMode = NS_ASYNCCOPY_VIA_READSEGMENTS;
  275. return NS_OK;
  276. }
  277. // No buffering, let's buffer the sink
  278. nsresult rv;
  279. nsCOMPtr<nsIBufferedOutputStream> sink =
  280. do_CreateInstance(NS_BUFFEREDOUTPUTSTREAM_CONTRACTID, &rv);
  281. if (NS_FAILED(rv)) {
  282. return rv;
  283. }
  284. rv = sink->Init(mSink, mChunkSize);
  285. if (NS_FAILED(rv)) {
  286. return rv;
  287. }
  288. mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS;
  289. mSink = sink;
  290. return NS_OK;
  291. }
  292. //-----------------------------------------------------------------------------
  293. // Both nsIAsyncStreamCopier and nsIAsyncStreamCopier2
  294. NS_IMETHODIMP
  295. nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver *observer, nsISupports *ctx)
  296. {
  297. LOG(("nsAsyncStreamCopier::AsyncCopy [this=%p observer=%x]\n", this, observer));
  298. NS_ASSERTION(mSource && mSink, "not initialized");
  299. nsresult rv;
  300. if (observer) {
  301. // build proxy for observer events
  302. rv = NS_NewRequestObserverProxy(getter_AddRefs(mObserver), observer, ctx);
  303. if (NS_FAILED(rv)) return rv;
  304. }
  305. // from this point forward, AsyncCopy is going to return NS_OK. any errors
  306. // will be reported via OnStopRequest.
  307. mIsPending = true;
  308. if (mObserver) {
  309. rv = mObserver->OnStartRequest(AsRequest(), nullptr);
  310. if (NS_FAILED(rv))
  311. Cancel(rv);
  312. }
  313. if (!mShouldSniffBuffering) {
  314. // No buffer sniffing required, let's proceed
  315. AsyncCopyInternal();
  316. return NS_OK;
  317. }
  318. if (NS_IsMainThread()) {
  319. // Don't perform buffer sniffing on the main thread
  320. nsCOMPtr<nsIRunnable> event = new AsyncApplyBufferingPolicyEvent(this);
  321. rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL);
  322. if (NS_FAILED(rv)) {
  323. Cancel(rv);
  324. }
  325. return NS_OK;
  326. }
  327. // We're not going to block the main thread, so let's sniff here
  328. rv = ApplyBufferingPolicy();
  329. if (NS_FAILED(rv)) {
  330. Cancel(rv);
  331. }
  332. AsyncCopyInternal();
  333. return NS_OK;
  334. }
  335. // Launch async copy.
  336. // All errors are reported through the observer.
  337. void
  338. nsAsyncStreamCopier::AsyncCopyInternal()
  339. {
  340. MOZ_ASSERT(mMode == NS_ASYNCCOPY_VIA_READSEGMENTS
  341. || mMode == NS_ASYNCCOPY_VIA_WRITESEGMENTS);
  342. nsresult rv;
  343. // we want to receive progress notifications; release happens in
  344. // OnAsyncCopyComplete.
  345. NS_ADDREF_THIS();
  346. {
  347. MutexAutoLock lock(mLock);
  348. rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize,
  349. OnAsyncCopyComplete, this, mCloseSource, mCloseSink,
  350. getter_AddRefs(mCopierCtx));
  351. }
  352. if (NS_FAILED(rv)) {
  353. NS_RELEASE_THIS();
  354. Cancel(rv);
  355. }
  356. }