throttle.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. /* -*- indent-tabs-mode: nil; js-indent-level: 2 -*- */
  2. /* This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this
  4. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  5. "use strict";
  6. const {CC, Ci, Cu, Cc} = require("chrome");
  7. const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
  8. "nsIArrayBufferInputStream");
  9. const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
  10. "nsIBinaryInputStream", "setInputStream");
  11. loader.lazyServiceGetter(this, "gActivityDistributor",
  12. "@mozilla.org/network/http-activity-distributor;1",
  13. "nsIHttpActivityDistributor");
  14. const {XPCOMUtils} = require("resource://gre/modules/XPCOMUtils.jsm");
  15. const {setTimeout} = Cu.import("resource://gre/modules/Timer.jsm", {});
  16. /**
  17. * Construct a new nsIStreamListener that buffers data and provides a
  18. * method to notify another listener when data is available. This is
  19. * used to throttle network data on a per-channel basis.
  20. *
  21. * After construction, @see setOriginalListener must be called on the
  22. * new object.
  23. *
  24. * @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
  25. * which status changes should be reported
  26. */
  27. function NetworkThrottleListener(queue) {
  28. this.queue = queue;
  29. this.pendingData = [];
  30. this.pendingException = null;
  31. this.offset = 0;
  32. this.responseStarted = false;
  33. this.activities = {};
  34. }
  35. NetworkThrottleListener.prototype = {
  36. QueryInterface:
  37. XPCOMUtils.generateQI([Ci.nsIStreamListener, Ci.nsIInterfaceRequestor,
  38. Ci.nsISupports]),
  39. /**
  40. * Set the original listener for this object. The original listener
  41. * will receive requests from this object when the queue allows data
  42. * through.
  43. *
  44. * @param {nsIStreamListener} originalListener the original listener
  45. * for the channel, to which all requests will be sent
  46. */
  47. setOriginalListener: function (originalListener) {
  48. this.originalListener = originalListener;
  49. },
  50. /**
  51. * @see nsIStreamListener.onStartRequest.
  52. */
  53. onStartRequest: function (request, context) {
  54. this.originalListener.onStartRequest(request, context);
  55. this.queue.start(this);
  56. },
  57. /**
  58. * @see nsIStreamListener.onStopRequest.
  59. */
  60. onStopRequest: function (request, context, statusCode) {
  61. this.pendingData.push({request, context, statusCode});
  62. this.queue.dataAvailable(this);
  63. },
  64. /**
  65. * @see nsIStreamListener.onDataAvailable.
  66. */
  67. onDataAvailable: function (request, context, inputStream, offset, count) {
  68. if (this.pendingException) {
  69. throw this.pendingException;
  70. }
  71. const bin = new BinaryInputStream(inputStream);
  72. const bytes = new ArrayBuffer(count);
  73. bin.readArrayBuffer(count, bytes);
  74. const stream = new ArrayBufferInputStream();
  75. stream.setData(bytes, 0, count);
  76. this.pendingData.push({request, context, stream, count});
  77. this.queue.dataAvailable(this);
  78. },
  79. /**
  80. * Allow some buffered data from this object to be forwarded to this
  81. * object's originalListener.
  82. *
  83. * @param {Number} bytesPermitted The maximum number of bytes
  84. * permitted to be sent.
  85. * @return {Object} an object of the form {length, done}, where
  86. * |length| is the number of bytes actually forwarded, and
  87. * |done| is a boolean indicating whether this particular
  88. * request has been completed. (A NetworkThrottleListener
  89. * may be queued multiple times, so this does not mean that
  90. * all available data has been sent.)
  91. */
  92. sendSomeData: function (bytesPermitted) {
  93. if (this.pendingData.length === 0) {
  94. // Shouldn't happen.
  95. return {length: 0, done: true};
  96. }
  97. const {request, context, stream, count, statusCode} = this.pendingData[0];
  98. if (statusCode !== undefined) {
  99. this.pendingData.shift();
  100. this.originalListener.onStopRequest(request, context, statusCode);
  101. return {length: 0, done: true};
  102. }
  103. if (bytesPermitted > count) {
  104. bytesPermitted = count;
  105. }
  106. try {
  107. this.originalListener.onDataAvailable(request, context, stream,
  108. this.offset, bytesPermitted);
  109. } catch (e) {
  110. this.pendingException = e;
  111. }
  112. let done = false;
  113. if (bytesPermitted === count) {
  114. this.pendingData.shift();
  115. done = true;
  116. } else {
  117. this.pendingData[0].count -= bytesPermitted;
  118. }
  119. this.offset += bytesPermitted;
  120. // Maybe our state has changed enough to emit an event.
  121. this.maybeEmitEvents();
  122. return {length: bytesPermitted, done};
  123. },
  124. /**
  125. * Return the number of pending data requests available for this
  126. * listener.
  127. */
  128. pendingCount: function () {
  129. return this.pendingData.length;
  130. },
  131. /**
  132. * This is called when an http activity event is delivered. This
  133. * object delays the event until the appropriate moment.
  134. */
  135. addActivityCallback: function (callback, httpActivity, channel, activityType,
  136. activitySubtype, timestamp, extraSizeData,
  137. extraStringData) {
  138. let datum = {callback, httpActivity, channel, activityType,
  139. activitySubtype, extraSizeData,
  140. extraStringData};
  141. this.activities[activitySubtype] = datum;
  142. if (activitySubtype ===
  143. gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE) {
  144. this.totalSize = extraSizeData;
  145. }
  146. this.maybeEmitEvents();
  147. },
  148. /**
  149. * This is called for a download throttler when the latency timeout
  150. * has ended.
  151. */
  152. responseStart: function () {
  153. this.responseStarted = true;
  154. this.maybeEmitEvents();
  155. },
  156. /**
  157. * Check our internal state and emit any http activity events as
  158. * needed. Note that we wait until both our internal state has
  159. * changed and we've received the real http activity event from
  160. * platform. This approach ensures we can both pass on the correct
  161. * data from the original event, and update the reported time to be
  162. * consistent with the delay we're introducing.
  163. */
  164. maybeEmitEvents: function () {
  165. if (this.responseStarted) {
  166. this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_START);
  167. this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_HEADER);
  168. }
  169. if (this.totalSize !== undefined && this.offset >= this.totalSize) {
  170. this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE);
  171. this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_TRANSACTION_CLOSE);
  172. }
  173. },
  174. /**
  175. * Emit an event for |code|, if the appropriate entry in
  176. * |activities| is defined.
  177. */
  178. maybeEmit: function (code) {
  179. if (this.activities[code] !== undefined) {
  180. let {callback, httpActivity, channel, activityType,
  181. activitySubtype, extraSizeData,
  182. extraStringData} = this.activities[code];
  183. let now = Date.now() * 1000;
  184. callback(httpActivity, channel, activityType, activitySubtype,
  185. now, extraSizeData, extraStringData);
  186. this.activities[code] = undefined;
  187. }
  188. },
  189. };
  190. /**
  191. * Construct a new queue that can be used to throttle the network for
  192. * a group of related network requests.
  193. *
  194. * meanBPS {Number} Mean bytes per second.
  195. * maxBPS {Number} Maximum bytes per second.
  196. * roundTripTimeMean {Number} Mean round trip time in milliseconds.
  197. * roundTripTimeMax {Number} Maximum round trip time in milliseconds.
  198. */
  199. function NetworkThrottleQueue(meanBPS, maxBPS,
  200. roundTripTimeMean, roundTripTimeMax) {
  201. this.meanBPS = meanBPS;
  202. this.maxBPS = maxBPS;
  203. this.roundTripTimeMean = roundTripTimeMean;
  204. this.roundTripTimeMax = roundTripTimeMax;
  205. this.pendingRequests = new Set();
  206. this.downloadQueue = [];
  207. this.previousReads = [];
  208. this.pumping = false;
  209. }
  210. NetworkThrottleQueue.prototype = {
  211. /**
  212. * A helper function that, given a mean and a maximum, returns a
  213. * random integer between (mean - (max - mean)) and max.
  214. */
  215. random: function (mean, max) {
  216. return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
  217. },
  218. /**
  219. * A helper function that lets the indicating listener start sending
  220. * data. This is called after the initial round trip time for the
  221. * listener has elapsed.
  222. */
  223. allowDataFrom: function (throttleListener) {
  224. throttleListener.responseStart();
  225. this.pendingRequests.delete(throttleListener);
  226. const count = throttleListener.pendingCount();
  227. for (let i = 0; i < count; ++i) {
  228. this.downloadQueue.push(throttleListener);
  229. }
  230. this.pump();
  231. },
  232. /**
  233. * Notice a new listener object. This is called by the
  234. * NetworkThrottleListener when the request has started. Initially
  235. * a new listener object is put into a "pending" state, until the
  236. * round-trip time has elapsed. This is used to simulate latency.
  237. *
  238. * @param {NetworkThrottleListener} throttleListener the new listener
  239. */
  240. start: function (throttleListener) {
  241. this.pendingRequests.add(throttleListener);
  242. let delay = this.random(this.roundTripTimeMean, this.roundTripTimeMax);
  243. if (delay > 0) {
  244. setTimeout(() => this.allowDataFrom(throttleListener), delay);
  245. } else {
  246. this.allowDataFrom(throttleListener);
  247. }
  248. },
  249. /**
  250. * Note that new data is available for a given listener. Each time
  251. * data is available, the listener will be re-queued.
  252. *
  253. * @param {NetworkThrottleListener} throttleListener the listener
  254. * which has data available.
  255. */
  256. dataAvailable: function (throttleListener) {
  257. if (!this.pendingRequests.has(throttleListener)) {
  258. this.downloadQueue.push(throttleListener);
  259. this.pump();
  260. }
  261. },
  262. /**
  263. * An internal function that permits individual listeners to send
  264. * data.
  265. */
  266. pump: function () {
  267. // A redirect will cause two NetworkThrottleListeners to be on a
  268. // listener chain. In this case, we might recursively call into
  269. // this method. Avoid infinite recursion here.
  270. if (this.pumping) {
  271. return;
  272. }
  273. this.pumping = true;
  274. const now = Date.now();
  275. const oneSecondAgo = now - 1000;
  276. while (this.previousReads.length &&
  277. this.previousReads[0].when < oneSecondAgo) {
  278. this.previousReads.shift();
  279. }
  280. const totalBytes = this.previousReads.reduce((sum, elt) => {
  281. return sum + elt.numBytes;
  282. }, 0);
  283. let thisSliceBytes = this.random(this.meanBPS, this.maxBPS);
  284. if (totalBytes < thisSliceBytes) {
  285. thisSliceBytes -= totalBytes;
  286. let readThisTime = 0;
  287. while (thisSliceBytes > 0 && this.downloadQueue.length) {
  288. let {length, done} = this.downloadQueue[0].sendSomeData(thisSliceBytes);
  289. thisSliceBytes -= length;
  290. readThisTime += length;
  291. if (done) {
  292. this.downloadQueue.shift();
  293. }
  294. }
  295. this.previousReads.push({when: now, numBytes: readThisTime});
  296. }
  297. // If there is more data to download, then schedule ourselves for
  298. // one second after the oldest previous read.
  299. if (this.downloadQueue.length) {
  300. const when = this.previousReads[0].when + 1000;
  301. setTimeout(this.pump.bind(this), when - now);
  302. }
  303. this.pumping = false;
  304. },
  305. };
  306. /**
  307. * Construct a new object that can be used to throttle the network for
  308. * a group of related network requests.
  309. *
  310. * @param {Object} An object with the following attributes:
  311. * roundTripTimeMean {Number} Mean round trip time in milliseconds.
  312. * roundTripTimeMax {Number} Maximum round trip time in milliseconds.
  313. * downloadBPSMean {Number} Mean bytes per second for downloads.
  314. * downloadBPSMax {Number} Maximum bytes per second for downloads.
  315. * uploadBPSMean {Number} Mean bytes per second for uploads.
  316. * uploadBPSMax {Number} Maximum bytes per second for uploads.
  317. *
  318. * Download throttling will not be done if downloadBPSMean and
  319. * downloadBPSMax are <= 0. Upload throttling will not be done if
  320. * uploadBPSMean and uploadBPSMax are <= 0.
  321. */
  322. function NetworkThrottleManager({roundTripTimeMean, roundTripTimeMax,
  323. downloadBPSMean, downloadBPSMax,
  324. uploadBPSMean, uploadBPSMax}) {
  325. if (downloadBPSMax <= 0 && downloadBPSMean <= 0) {
  326. this.downloadQueue = null;
  327. } else {
  328. this.downloadQueue =
  329. new NetworkThrottleQueue(downloadBPSMean, downloadBPSMax,
  330. roundTripTimeMean, roundTripTimeMax);
  331. }
  332. if (uploadBPSMax <= 0 && uploadBPSMean <= 0) {
  333. this.uploadQueue = null;
  334. } else {
  335. this.uploadQueue = Cc["@mozilla.org/network/throttlequeue;1"]
  336. .createInstance(Ci.nsIInputChannelThrottleQueue);
  337. this.uploadQueue.init(uploadBPSMean, uploadBPSMax);
  338. }
  339. }
  340. exports.NetworkThrottleManager = NetworkThrottleManager;
  341. NetworkThrottleManager.prototype = {
  342. /**
  343. * Create a new NetworkThrottleListener for a given channel and
  344. * install it using |setNewListener|.
  345. *
  346. * @param {nsITraceableChannel} channel the channel to manage
  347. * @return {NetworkThrottleListener} the new listener, or null if
  348. * download throttling is not being done.
  349. */
  350. manage: function (channel) {
  351. if (this.downloadQueue) {
  352. let listener = new NetworkThrottleListener(this.downloadQueue);
  353. let originalListener = channel.setNewListener(listener);
  354. listener.setOriginalListener(originalListener);
  355. return listener;
  356. }
  357. return null;
  358. },
  359. /**
  360. * Throttle uploads taking place on the given channel.
  361. *
  362. * @param {nsITraceableChannel} channel the channel to manage
  363. */
  364. manageUpload: function (channel) {
  365. if (this.uploadQueue) {
  366. channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
  367. channel.throttleQueue = this.uploadQueue;
  368. }
  369. },
  370. };