ThrottleQueue.cpp 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /* This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  5. #include "ThrottleQueue.h"
  6. #include "nsISeekableStream.h"
  7. #include "nsIEventTarget.h"
  8. #include "nsIAsyncInputStream.h"
  9. #include "nsSocketTransportService2.h"
  10. #include "nsStreamUtils.h"
  11. #include "nsNetUtil.h"
  12. namespace mozilla {
  13. namespace net {
  14. //-----------------------------------------------------------------------------
  15. class ThrottleInputStream final
  16. : public nsIAsyncInputStream
  17. , public nsISeekableStream
  18. {
  19. public:
  20. ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
  21. NS_DECL_THREADSAFE_ISUPPORTS
  22. NS_DECL_NSIINPUTSTREAM
  23. NS_DECL_NSISEEKABLESTREAM
  24. NS_DECL_NSIASYNCINPUTSTREAM
  25. void AllowInput();
  26. private:
  27. ~ThrottleInputStream();
  28. nsCOMPtr<nsIInputStream> mStream;
  29. RefPtr<ThrottleQueue> mQueue;
  30. nsresult mClosedStatus;
  31. nsCOMPtr<nsIInputStreamCallback> mCallback;
  32. nsCOMPtr<nsIEventTarget> mEventTarget;
  33. };
  34. NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, nsISeekableStream)
  35. ThrottleInputStream::ThrottleInputStream(nsIInputStream *aStream, ThrottleQueue* aQueue)
  36. : mStream(aStream)
  37. , mQueue(aQueue)
  38. , mClosedStatus(NS_OK)
  39. {
  40. MOZ_ASSERT(aQueue != nullptr);
  41. }
  42. ThrottleInputStream::~ThrottleInputStream()
  43. {
  44. Close();
  45. }
  46. NS_IMETHODIMP
  47. ThrottleInputStream::Close()
  48. {
  49. if (NS_FAILED(mClosedStatus)) {
  50. return mClosedStatus;
  51. }
  52. if (mQueue) {
  53. mQueue->DequeueStream(this);
  54. mQueue = nullptr;
  55. mClosedStatus = NS_BASE_STREAM_CLOSED;
  56. }
  57. return mStream->Close();
  58. }
  59. NS_IMETHODIMP
  60. ThrottleInputStream::Available(uint64_t* aResult)
  61. {
  62. if (NS_FAILED(mClosedStatus)) {
  63. return mClosedStatus;
  64. }
  65. return mStream->Available(aResult);
  66. }
  67. NS_IMETHODIMP
  68. ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
  69. {
  70. if (NS_FAILED(mClosedStatus)) {
  71. return mClosedStatus;
  72. }
  73. uint32_t realCount;
  74. nsresult rv = mQueue->Available(aCount, &realCount);
  75. if (NS_FAILED(rv)) {
  76. return rv;
  77. }
  78. if (realCount == 0) {
  79. return NS_BASE_STREAM_WOULD_BLOCK;
  80. }
  81. rv = mStream->Read(aBuf, realCount, aResult);
  82. if (NS_SUCCEEDED(rv) && *aResult > 0) {
  83. mQueue->RecordRead(*aResult);
  84. }
  85. return rv;
  86. }
  87. NS_IMETHODIMP
  88. ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
  89. uint32_t aCount, uint32_t* aResult)
  90. {
  91. if (NS_FAILED(mClosedStatus)) {
  92. return mClosedStatus;
  93. }
  94. uint32_t realCount;
  95. nsresult rv = mQueue->Available(aCount, &realCount);
  96. if (NS_FAILED(rv)) {
  97. return rv;
  98. }
  99. if (realCount == 0) {
  100. return NS_BASE_STREAM_WOULD_BLOCK;
  101. }
  102. rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
  103. if (NS_SUCCEEDED(rv) && *aResult > 0) {
  104. mQueue->RecordRead(*aResult);
  105. }
  106. return rv;
  107. }
  108. NS_IMETHODIMP
  109. ThrottleInputStream::IsNonBlocking(bool* aNonBlocking)
  110. {
  111. *aNonBlocking = true;
  112. return NS_OK;
  113. }
  114. NS_IMETHODIMP
  115. ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset)
  116. {
  117. if (NS_FAILED(mClosedStatus)) {
  118. return mClosedStatus;
  119. }
  120. nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
  121. if (!sstream) {
  122. return NS_ERROR_FAILURE;
  123. }
  124. return sstream->Seek(aWhence, aOffset);
  125. }
  126. NS_IMETHODIMP
  127. ThrottleInputStream::Tell(int64_t* aResult)
  128. {
  129. if (NS_FAILED(mClosedStatus)) {
  130. return mClosedStatus;
  131. }
  132. nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
  133. if (!sstream) {
  134. return NS_ERROR_FAILURE;
  135. }
  136. return sstream->Tell(aResult);
  137. }
  138. NS_IMETHODIMP
  139. ThrottleInputStream::SetEOF()
  140. {
  141. if (NS_FAILED(mClosedStatus)) {
  142. return mClosedStatus;
  143. }
  144. nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
  145. if (!sstream) {
  146. return NS_ERROR_FAILURE;
  147. }
  148. return sstream->SetEOF();
  149. }
  150. NS_IMETHODIMP
  151. ThrottleInputStream::CloseWithStatus(nsresult aStatus)
  152. {
  153. if (NS_FAILED(mClosedStatus)) {
  154. // Already closed, ignore.
  155. return NS_OK;
  156. }
  157. if (NS_SUCCEEDED(aStatus)) {
  158. aStatus = NS_BASE_STREAM_CLOSED;
  159. }
  160. mClosedStatus = Close();
  161. if (NS_SUCCEEDED(mClosedStatus)) {
  162. mClosedStatus = aStatus;
  163. }
  164. return NS_OK;
  165. }
  166. NS_IMETHODIMP
  167. ThrottleInputStream::AsyncWait(nsIInputStreamCallback *aCallback,
  168. uint32_t aFlags,
  169. uint32_t aRequestedCount,
  170. nsIEventTarget *aEventTarget)
  171. {
  172. if (aFlags != 0) {
  173. return NS_ERROR_ILLEGAL_VALUE;
  174. }
  175. mCallback = aCallback;
  176. mEventTarget = aEventTarget;
  177. if (mCallback) {
  178. mQueue->QueueStream(this);
  179. } else {
  180. mQueue->DequeueStream(this);
  181. }
  182. return NS_OK;
  183. }
  184. void
  185. ThrottleInputStream::AllowInput()
  186. {
  187. MOZ_ASSERT(mCallback);
  188. nsCOMPtr<nsIInputStreamCallback> callbackEvent =
  189. NS_NewInputStreamReadyEvent(mCallback, mEventTarget);
  190. mCallback = nullptr;
  191. mEventTarget = nullptr;
  192. callbackEvent->OnInputStreamReady(this);
  193. }
  194. //-----------------------------------------------------------------------------
  195. NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback)
  196. ThrottleQueue::ThrottleQueue()
  197. : mMeanBytesPerSecond(0)
  198. , mMaxBytesPerSecond(0)
  199. , mBytesProcessed(0)
  200. , mTimerArmed(false)
  201. {
  202. nsresult rv;
  203. nsCOMPtr<nsIEventTarget> sts;
  204. nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
  205. if (NS_SUCCEEDED(rv))
  206. sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
  207. if (NS_SUCCEEDED(rv))
  208. mTimer = do_CreateInstance("@mozilla.org/timer;1");
  209. if (mTimer)
  210. mTimer->SetTarget(sts);
  211. }
  212. ThrottleQueue::~ThrottleQueue()
  213. {
  214. if (mTimer && mTimerArmed) {
  215. mTimer->Cancel();
  216. }
  217. mTimer = nullptr;
  218. }
  219. NS_IMETHODIMP
  220. ThrottleQueue::RecordRead(uint32_t aBytesRead)
  221. {
  222. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  223. ThrottleEntry entry;
  224. entry.mTime = TimeStamp::Now();
  225. entry.mBytesRead = aBytesRead;
  226. mReadEvents.AppendElement(entry);
  227. mBytesProcessed += aBytesRead;
  228. return NS_OK;
  229. }
  230. NS_IMETHODIMP
  231. ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable)
  232. {
  233. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  234. TimeStamp now = TimeStamp::Now();
  235. TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
  236. size_t i;
  237. // Remove all stale events.
  238. for (i = 0; i < mReadEvents.Length(); ++i) {
  239. if (mReadEvents[i].mTime >= oneSecondAgo) {
  240. break;
  241. }
  242. }
  243. mReadEvents.RemoveElementsAt(0, i);
  244. uint32_t totalBytes = 0;
  245. for (i = 0; i < mReadEvents.Length(); ++i) {
  246. totalBytes += mReadEvents[i].mBytesRead;
  247. }
  248. uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
  249. double prob = static_cast<double>(rand()) / RAND_MAX;
  250. uint32_t thisSliceBytes = mMeanBytesPerSecond - spread +
  251. static_cast<uint32_t>(2 * spread * prob);
  252. if (totalBytes >= thisSliceBytes) {
  253. *aAvailable = 0;
  254. } else {
  255. *aAvailable = thisSliceBytes;
  256. }
  257. return NS_OK;
  258. }
  259. NS_IMETHODIMP
  260. ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond)
  261. {
  262. // Can be called on any thread.
  263. if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || aMaxBytesPerSecond < aMeanBytesPerSecond) {
  264. return NS_ERROR_ILLEGAL_VALUE;
  265. }
  266. mMeanBytesPerSecond = aMeanBytesPerSecond;
  267. mMaxBytesPerSecond = aMaxBytesPerSecond;
  268. return NS_OK;
  269. }
  270. NS_IMETHODIMP
  271. ThrottleQueue::BytesProcessed(uint64_t* aResult)
  272. {
  273. *aResult = mBytesProcessed;
  274. return NS_OK;
  275. }
  276. NS_IMETHODIMP
  277. ThrottleQueue::WrapStream(nsIInputStream* aInputStream, nsIAsyncInputStream** aResult)
  278. {
  279. nsCOMPtr<nsIAsyncInputStream> result = new ThrottleInputStream(aInputStream, this);
  280. result.forget(aResult);
  281. return NS_OK;
  282. }
  283. NS_IMETHODIMP
  284. ThrottleQueue::Notify(nsITimer* aTimer)
  285. {
  286. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  287. // A notified reader may need to push itself back on the queue.
  288. // Swap out the list of readers so that this works properly.
  289. nsTArray<RefPtr<ThrottleInputStream>> events;
  290. events.SwapElements(mAsyncEvents);
  291. // Optimistically notify all the waiting readers, and then let them
  292. // requeue if there isn't enough bandwidth.
  293. for (size_t i = 0; i < events.Length(); ++i) {
  294. events[i]->AllowInput();
  295. }
  296. mTimerArmed = false;
  297. return NS_OK;
  298. }
  299. void
  300. ThrottleQueue::QueueStream(ThrottleInputStream* aStream)
  301. {
  302. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  303. if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) {
  304. mAsyncEvents.AppendElement(aStream);
  305. if (!mTimerArmed) {
  306. uint32_t ms = 1000;
  307. if (mReadEvents.Length() > 0) {
  308. TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
  309. TimeStamp now = TimeStamp::Now();
  310. if (t > now) {
  311. ms = static_cast<uint32_t>((t - now).ToMilliseconds());
  312. } else {
  313. ms = 1;
  314. }
  315. }
  316. if (NS_SUCCEEDED(mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
  317. mTimerArmed = true;
  318. }
  319. }
  320. }
  321. }
  322. void
  323. ThrottleQueue::DequeueStream(ThrottleInputStream* aStream)
  324. {
  325. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  326. mAsyncEvents.RemoveElement(aStream);
  327. }
  328. }
  329. }