transportflow.cpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /* This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this file,
  4. * You can obtain one at http://mozilla.org/MPL/2.0/. */
  5. // Original author: ekr@rtfm.com
  6. #include <deque>
  7. #include "logging.h"
  8. #include "runnable_utils.h"
  9. #include "transportflow.h"
  10. #include "transportlayer.h"
  11. namespace mozilla {
  12. MOZ_MTLOG_MODULE("mtransport")
  13. NS_IMPL_ISUPPORTS0(TransportFlow)
  14. // There are some hacks here to allow destruction off of
  15. // the main thread.
  16. TransportFlow::~TransportFlow() {
  17. // Make sure that if we are off the right thread, we have
  18. // no more attached signals.
  19. if (!CheckThreadInt()) {
  20. MOZ_ASSERT(SignalStateChange.is_empty());
  21. MOZ_ASSERT(SignalPacketReceived.is_empty());
  22. }
  23. // Push the destruction onto the STS thread. Note that there
  24. // is still some possibility that someone is accessing this
  25. // object simultaneously, but as long as smart pointer discipline
  26. // is maintained, it shouldn't be possible to access and
  27. // destroy it simultaneously. The conversion to an nsAutoPtr
  28. // ensures automatic destruction of the queue at exit of
  29. // DestroyFinal.
  30. nsAutoPtr<std::deque<TransportLayer*>> layers_tmp(layers_.release());
  31. RUN_ON_THREAD(target_,
  32. WrapRunnableNM(&TransportFlow::DestroyFinal, layers_tmp),
  33. NS_DISPATCH_NORMAL);
  34. }
  35. void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers) {
  36. ClearLayers(layers.get());
  37. }
  38. void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) {
  39. while (!layers->empty()) {
  40. delete layers->front();
  41. layers->pop();
  42. }
  43. }
  44. void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
  45. while (!layers->empty()) {
  46. delete layers->front();
  47. layers->pop_front();
  48. }
  49. }
  50. nsresult TransportFlow::PushLayer(TransportLayer *layer) {
  51. CheckThread();
  52. UniquePtr<TransportLayer> layer_tmp(layer); // Destroy on failure.
  53. // Don't allow pushes once we are in error state.
  54. if (state_ == TransportLayer::TS_ERROR) {
  55. MOZ_MTLOG(ML_ERROR, id_ + ": Can't call PushLayer in error state for flow");
  56. return NS_ERROR_FAILURE;
  57. }
  58. nsresult rv = layer->Init();
  59. if (!NS_SUCCEEDED(rv)) {
  60. // Destroy the rest of the flow, because it's no longer in an acceptable
  61. // state.
  62. ClearLayers(layers_.get());
  63. // Set ourselves to have failed.
  64. MOZ_MTLOG(ML_ERROR, id_ << ": Layer initialization failed; invalidating");
  65. StateChangeInt(TransportLayer::TS_ERROR);
  66. return rv;
  67. }
  68. EnsureSameThread(layer);
  69. TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
  70. // Re-target my signals to the new layer
  71. if (old_layer) {
  72. old_layer->SignalStateChange.disconnect(this);
  73. old_layer->SignalPacketReceived.disconnect(this);
  74. }
  75. layers_->push_front(layer_tmp.release());
  76. layer->Inserted(this, old_layer);
  77. layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
  78. layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
  79. StateChangeInt(layer->state());
  80. return NS_OK;
  81. }
  82. // This is all-or-nothing.
  83. nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) {
  84. CheckThread();
  85. MOZ_ASSERT(!layers->empty());
  86. if (layers->empty()) {
  87. MOZ_MTLOG(ML_ERROR, id_ << ": Can't call PushLayers with empty layers");
  88. return NS_ERROR_INVALID_ARG;
  89. }
  90. // Don't allow pushes once we are in error state.
  91. if (state_ == TransportLayer::TS_ERROR) {
  92. MOZ_MTLOG(ML_ERROR,
  93. id_ << ": Can't call PushLayers in error state for flow ");
  94. ClearLayers(layers.get());
  95. return NS_ERROR_FAILURE;
  96. }
  97. nsresult rv = NS_OK;
  98. // Disconnect all the old signals.
  99. disconnect_all();
  100. TransportLayer *layer;
  101. while (!layers->empty()) {
  102. TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
  103. layer = layers->front();
  104. rv = layer->Init();
  105. if (NS_FAILED(rv)) {
  106. MOZ_MTLOG(ML_ERROR,
  107. id_ << ": Layer initialization failed; invalidating flow ");
  108. break;
  109. }
  110. EnsureSameThread(layer);
  111. // Push the layer onto the queue.
  112. layers_->push_front(layer);
  113. layers->pop();
  114. layer->Inserted(this, old_layer);
  115. }
  116. if (NS_FAILED(rv)) {
  117. // Destroy any layers we could not push.
  118. ClearLayers(layers.get());
  119. // Now destroy the rest of the flow, because it's no longer
  120. // in an acceptable state.
  121. ClearLayers(layers_.get());
  122. // Set ourselves to have failed.
  123. StateChangeInt(TransportLayer::TS_ERROR);
  124. // Return failure.
  125. return rv;
  126. }
  127. // Finally, attach ourselves to the top layer.
  128. layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
  129. layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
  130. StateChangeInt(layer->state()); // Signals if the state changes.
  131. return NS_OK;
  132. }
  133. TransportLayer *TransportFlow::top() const {
  134. CheckThread();
  135. return layers_->empty() ? nullptr : layers_->front();
  136. }
  137. TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
  138. CheckThread();
  139. for (std::deque<TransportLayer *>::const_iterator it = layers_->begin();
  140. it != layers_->end(); ++it) {
  141. if ((*it)->id() == id)
  142. return *it;
  143. }
  144. return nullptr;
  145. }
  146. TransportLayer::State TransportFlow::state() {
  147. CheckThread();
  148. return state_;
  149. }
  150. TransportResult TransportFlow::SendPacket(const unsigned char *data,
  151. size_t len) {
  152. CheckThread();
  153. if (state_ != TransportLayer::TS_OPEN) {
  154. return TE_ERROR;
  155. }
  156. return top() ? top()->SendPacket(data, len) : TE_ERROR;
  157. }
  158. bool TransportFlow::Contains(TransportLayer *layer) const {
  159. if (layers_) {
  160. for (auto l = layers_->begin(); l != layers_->end(); ++l) {
  161. if (*l == layer) {
  162. return true;
  163. }
  164. }
  165. }
  166. return false;
  167. }
  168. void TransportFlow::EnsureSameThread(TransportLayer *layer) {
  169. // Enforce that if any of the layers have a thread binding,
  170. // they all have the same binding.
  171. if (target_) {
  172. const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread();
  173. if (lthread && (lthread != target_))
  174. MOZ_CRASH();
  175. }
  176. else {
  177. target_ = layer->GetThread();
  178. }
  179. }
  180. void TransportFlow::StateChangeInt(TransportLayer::State state) {
  181. CheckThread();
  182. if (state == state_) {
  183. return;
  184. }
  185. state_ = state;
  186. SignalStateChange(this, state_);
  187. }
  188. void TransportFlow::StateChange(TransportLayer *layer,
  189. TransportLayer::State state) {
  190. CheckThread();
  191. StateChangeInt(state);
  192. }
  193. void TransportFlow::PacketReceived(TransportLayer* layer,
  194. const unsigned char *data,
  195. size_t len) {
  196. CheckThread();
  197. SignalPacketReceived(this, data, len);
  198. }
  199. } // close namespace