EventTokenBucket.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /* vim:set ts=2 sw=2 sts=2 et cindent: */
  3. /* This Source Code Form is subject to the terms of the Mozilla Public
  4. * License, v. 2.0. If a copy of the MPL was not distributed with this
  5. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  6. #include "EventTokenBucket.h"
  7. #include "nsICancelable.h"
  8. #include "nsIIOService.h"
  9. #include "nsNetCID.h"
  10. #include "nsNetUtil.h"
  11. #include "nsServiceManagerUtils.h"
  12. #include "nsSocketTransportService2.h"
  13. #ifdef DEBUG
  14. #include "MainThreadUtils.h"
  15. #endif
  16. #ifdef XP_WIN
  17. #include <windows.h>
  18. #include <mmsystem.h>
  19. #endif
  20. namespace mozilla {
  21. namespace net {
  22. ////////////////////////////////////////////
  23. // EventTokenBucketCancelable
  24. ////////////////////////////////////////////
  25. class TokenBucketCancelable : public nsICancelable
  26. {
  27. public:
  28. NS_DECL_THREADSAFE_ISUPPORTS
  29. NS_DECL_NSICANCELABLE
  30. explicit TokenBucketCancelable(class ATokenBucketEvent *event);
  31. void Fire();
  32. private:
  33. virtual ~TokenBucketCancelable() {}
  34. friend class EventTokenBucket;
  35. ATokenBucketEvent *mEvent;
  36. };
  37. NS_IMPL_ISUPPORTS(TokenBucketCancelable, nsICancelable)
  38. TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent *event)
  39. : mEvent(event)
  40. {
  41. }
  42. NS_IMETHODIMP
  43. TokenBucketCancelable::Cancel(nsresult reason)
  44. {
  45. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  46. mEvent = nullptr;
  47. return NS_OK;
  48. }
  49. void
  50. TokenBucketCancelable::Fire()
  51. {
  52. if (!mEvent)
  53. return;
  54. ATokenBucketEvent *event = mEvent;
  55. mEvent = nullptr;
  56. event->OnTokenBucketAdmitted();
  57. }
  58. ////////////////////////////////////////////
  59. // EventTokenBucket
  60. ////////////////////////////////////////////
  61. NS_IMPL_ISUPPORTS(EventTokenBucket, nsITimerCallback)
  62. // by default 1hz with no burst
  63. EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond,
  64. uint32_t burstSize)
  65. : mUnitCost(kUsecPerSec)
  66. , mMaxCredit(kUsecPerSec)
  67. , mCredit(kUsecPerSec)
  68. , mPaused(false)
  69. , mStopped(false)
  70. , mTimerArmed(false)
  71. #ifdef XP_WIN
  72. , mFineGrainTimerInUse(false)
  73. , mFineGrainResetTimerArmed(false)
  74. #endif
  75. {
  76. mLastUpdate = TimeStamp::Now();
  77. MOZ_ASSERT(NS_IsMainThread());
  78. nsresult rv;
  79. nsCOMPtr<nsIEventTarget> sts;
  80. nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
  81. if (NS_SUCCEEDED(rv))
  82. sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
  83. if (NS_SUCCEEDED(rv))
  84. mTimer = do_CreateInstance("@mozilla.org/timer;1");
  85. if (mTimer)
  86. mTimer->SetTarget(sts);
  87. SetRate(eventsPerSecond, burstSize);
  88. }
  89. EventTokenBucket::~EventTokenBucket()
  90. {
  91. SOCKET_LOG(("EventTokenBucket::dtor %p events=%d\n",
  92. this, mEvents.GetSize()));
  93. CleanupTimers();
  94. // Complete any queued events to prevent hangs
  95. while (mEvents.GetSize()) {
  96. RefPtr<TokenBucketCancelable> cancelable =
  97. dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
  98. cancelable->Fire();
  99. }
  100. }
  101. void
  102. EventTokenBucket::CleanupTimers()
  103. {
  104. if (mTimer && mTimerArmed) {
  105. mTimer->Cancel();
  106. }
  107. mTimer = nullptr;
  108. mTimerArmed = false;
  109. #ifdef XP_WIN
  110. NormalTimers();
  111. if (mFineGrainResetTimer && mFineGrainResetTimerArmed) {
  112. mFineGrainResetTimer->Cancel();
  113. }
  114. mFineGrainResetTimer = nullptr;
  115. mFineGrainResetTimerArmed = false;
  116. #endif
  117. }
  118. void
  119. EventTokenBucket::SetRate(uint32_t eventsPerSecond,
  120. uint32_t burstSize)
  121. {
  122. SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n",
  123. this, eventsPerSecond, burstSize));
  124. if (eventsPerSecond > kMaxHz) {
  125. eventsPerSecond = kMaxHz;
  126. SOCKET_LOG((" eventsPerSecond out of range\n"));
  127. }
  128. if (!eventsPerSecond) {
  129. eventsPerSecond = 1;
  130. SOCKET_LOG((" eventsPerSecond out of range\n"));
  131. }
  132. mUnitCost = kUsecPerSec / eventsPerSecond;
  133. mMaxCredit = mUnitCost * burstSize;
  134. if (mMaxCredit > kUsecPerSec * 60 * 15) {
  135. SOCKET_LOG((" burstSize out of range\n"));
  136. mMaxCredit = kUsecPerSec * 60 * 15;
  137. }
  138. mCredit = mMaxCredit;
  139. mLastUpdate = TimeStamp::Now();
  140. }
  141. void
  142. EventTokenBucket::ClearCredits()
  143. {
  144. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  145. SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this));
  146. mCredit = 0;
  147. }
  148. uint32_t
  149. EventTokenBucket::BurstEventsAvailable()
  150. {
  151. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  152. return static_cast<uint32_t>(mCredit / mUnitCost);
  153. }
  154. uint32_t
  155. EventTokenBucket::QueuedEvents()
  156. {
  157. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  158. return mEvents.GetSize();
  159. }
  160. void
  161. EventTokenBucket::Pause()
  162. {
  163. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  164. SOCKET_LOG(("EventTokenBucket::Pause %p\n", this));
  165. if (mPaused || mStopped)
  166. return;
  167. mPaused = true;
  168. if (mTimerArmed) {
  169. mTimer->Cancel();
  170. mTimerArmed = false;
  171. }
  172. }
  173. void
  174. EventTokenBucket::UnPause()
  175. {
  176. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  177. SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this));
  178. if (!mPaused || mStopped)
  179. return;
  180. mPaused = false;
  181. DispatchEvents();
  182. UpdateTimer();
  183. }
  184. void
  185. EventTokenBucket::Stop()
  186. {
  187. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  188. SOCKET_LOG(("EventTokenBucket::Stop %p armed=%d\n", this, mTimerArmed));
  189. mStopped = true;
  190. CleanupTimers();
  191. // Complete any queued events to prevent hangs
  192. while (mEvents.GetSize()) {
  193. RefPtr<TokenBucketCancelable> cancelable =
  194. dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
  195. cancelable->Fire();
  196. }
  197. }
  198. nsresult
  199. EventTokenBucket::SubmitEvent(ATokenBucketEvent *event, nsICancelable **cancelable)
  200. {
  201. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  202. SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this));
  203. if (mStopped || !mTimer)
  204. return NS_ERROR_FAILURE;
  205. UpdateCredits();
  206. RefPtr<TokenBucketCancelable> cancelEvent = new TokenBucketCancelable(event);
  207. // When this function exits the cancelEvent needs 2 references, one for the
  208. // mEvents queue and one for the caller of SubmitEvent()
  209. NS_ADDREF(*cancelable = cancelEvent.get());
  210. if (mPaused || !TryImmediateDispatch(cancelEvent.get())) {
  211. // queue it
  212. SOCKET_LOG((" queued\n"));
  213. mEvents.Push(cancelEvent.forget().take());
  214. UpdateTimer();
  215. }
  216. else {
  217. SOCKET_LOG((" dispatched synchronously\n"));
  218. }
  219. return NS_OK;
  220. }
  221. bool
  222. EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable *cancelable)
  223. {
  224. if (mCredit < mUnitCost)
  225. return false;
  226. mCredit -= mUnitCost;
  227. cancelable->Fire();
  228. return true;
  229. }
  230. void
  231. EventTokenBucket::DispatchEvents()
  232. {
  233. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  234. SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused));
  235. if (mPaused || mStopped)
  236. return;
  237. while (mEvents.GetSize() && mUnitCost <= mCredit) {
  238. RefPtr<TokenBucketCancelable> cancelable =
  239. dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
  240. if (cancelable->mEvent) {
  241. SOCKET_LOG(("EventTokenBucket::DispachEvents [%p] "
  242. "Dispatching queue token bucket event cost=%lu credit=%lu\n",
  243. this, mUnitCost, mCredit));
  244. mCredit -= mUnitCost;
  245. cancelable->Fire();
  246. }
  247. }
  248. #ifdef XP_WIN
  249. if (!mEvents.GetSize())
  250. WantNormalTimers();
  251. #endif
  252. }
  253. void
  254. EventTokenBucket::UpdateTimer()
  255. {
  256. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  257. if (mTimerArmed || mPaused || mStopped || !mEvents.GetSize() || !mTimer)
  258. return;
  259. if (mCredit >= mUnitCost)
  260. return;
  261. // determine the time needed to wait to accumulate enough credits to admit
  262. // one more event and set the timer for that point. Always round it
  263. // up because firing early doesn't help.
  264. //
  265. uint64_t deficit = mUnitCost - mCredit;
  266. uint64_t msecWait = (deficit + (kUsecPerMsec - 1)) / kUsecPerMsec;
  267. if (msecWait < 4) // minimum wait
  268. msecWait = 4;
  269. else if (msecWait > 60000) // maximum wait
  270. msecWait = 60000;
  271. #ifdef XP_WIN
  272. FineGrainTimers();
  273. #endif
  274. SOCKET_LOG(("EventTokenBucket::UpdateTimer %p for %dms\n",
  275. this, msecWait));
  276. nsresult rv = mTimer->InitWithCallback(this, static_cast<uint32_t>(msecWait),
  277. nsITimer::TYPE_ONE_SHOT);
  278. mTimerArmed = NS_SUCCEEDED(rv);
  279. }
  280. NS_IMETHODIMP
  281. EventTokenBucket::Notify(nsITimer *timer)
  282. {
  283. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  284. #ifdef XP_WIN
  285. if (timer == mFineGrainResetTimer) {
  286. FineGrainResetTimerNotify();
  287. return NS_OK;
  288. }
  289. #endif
  290. SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this));
  291. mTimerArmed = false;
  292. if (mStopped)
  293. return NS_OK;
  294. UpdateCredits();
  295. DispatchEvents();
  296. UpdateTimer();
  297. return NS_OK;
  298. }
  299. void
  300. EventTokenBucket::UpdateCredits()
  301. {
  302. MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  303. TimeStamp now = TimeStamp::Now();
  304. TimeDuration elapsed = now - mLastUpdate;
  305. mLastUpdate = now;
  306. mCredit += static_cast<uint64_t>(elapsed.ToMicroseconds());
  307. if (mCredit > mMaxCredit)
  308. mCredit = mMaxCredit;
  309. SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %lu (%lu each.. %3.2f)\n",
  310. this, mCredit, mUnitCost, (double)mCredit / mUnitCost));
  311. }
  312. #ifdef XP_WIN
  313. void
  314. EventTokenBucket::FineGrainTimers()
  315. {
  316. SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n",
  317. this, mFineGrainTimerInUse));
  318. mLastFineGrainTimerUse = TimeStamp::Now();
  319. if (mFineGrainTimerInUse)
  320. return;
  321. if (mUnitCost > kCostFineGrainThreshold)
  322. return;
  323. SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n",
  324. this));
  325. mFineGrainTimerInUse = true;
  326. timeBeginPeriod(1);
  327. }
  328. void
  329. EventTokenBucket::NormalTimers()
  330. {
  331. if (!mFineGrainTimerInUse)
  332. return;
  333. mFineGrainTimerInUse = false;
  334. SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this));
  335. timeEndPeriod(1);
  336. }
  337. void
  338. EventTokenBucket::WantNormalTimers()
  339. {
  340. if (!mFineGrainTimerInUse)
  341. return;
  342. if (mFineGrainResetTimerArmed)
  343. return;
  344. TimeDuration elapsed(TimeStamp::Now() - mLastFineGrainTimerUse);
  345. static const TimeDuration fiveSeconds = TimeDuration::FromSeconds(5);
  346. if (elapsed >= fiveSeconds) {
  347. NormalTimers();
  348. return;
  349. }
  350. if (!mFineGrainResetTimer)
  351. mFineGrainResetTimer = do_CreateInstance("@mozilla.org/timer;1");
  352. // if we can't delay the reset, just do it now
  353. if (!mFineGrainResetTimer) {
  354. NormalTimers();
  355. return;
  356. }
  357. // pad the callback out 100ms to avoid having to round trip this again if the
  358. // timer calls back just a tad early.
  359. SOCKET_LOG(("EventTokenBucket::WantNormalTimers %p "
  360. "Will reset timer granularity after delay", this));
  361. mFineGrainResetTimer->InitWithCallback(
  362. this,
  363. static_cast<uint32_t>((fiveSeconds - elapsed).ToMilliseconds()) + 100,
  364. nsITimer::TYPE_ONE_SHOT);
  365. mFineGrainResetTimerArmed = true;
  366. }
  367. void
  368. EventTokenBucket::FineGrainResetTimerNotify()
  369. {
  370. SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify() events = %d\n",
  371. this, mEvents.GetSize()));
  372. mFineGrainResetTimerArmed = false;
  373. // If we are currently processing events then wait for the queue to drain
  374. // before trying to reset back to normal timers again
  375. if (!mEvents.GetSize())
  376. WantNormalTimers();
  377. }
  378. #endif
  379. } // namespace net
  380. } // namespace mozilla