DataChannel.cpp 92 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682
  1. /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /* This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this file,
  4. * You can obtain one at http://mozilla.org/MPL/2.0/. */
  5. #include <algorithm>
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #if !defined(__Userspace_os_Windows)
  9. #include <arpa/inet.h>
  10. #endif
  11. // usrsctp.h expects to have errno definitions prior to its inclusion.
  12. #include <errno.h>
  13. #define SCTP_DEBUG 1
  14. #define SCTP_STDINT_INCLUDE <stdint.h>
  15. #ifdef _MSC_VER
  16. // Disable "warning C4200: nonstandard extension used : zero-sized array in
  17. // struct/union"
  18. // ...which the third-party file usrsctp.h runs afoul of.
  19. #pragma warning(push)
  20. #pragma warning(disable:4200)
  21. #endif
  22. #include "usrsctp.h"
  23. #ifdef _MSC_VER
  24. #pragma warning(pop)
  25. #endif
  26. #include "DataChannelLog.h"
  27. #include "nsServiceManagerUtils.h"
  28. #include "nsIObserverService.h"
  29. #include "nsIObserver.h"
  30. #include "mozilla/Services.h"
  31. #include "mozilla/Sprintf.h"
  32. #include "nsProxyRelease.h"
  33. #include "nsThread.h"
  34. #include "nsThreadUtils.h"
  35. #include "nsAutoPtr.h"
  36. #include "nsNetUtil.h"
  37. #include "nsNetCID.h"
  38. #include "mozilla/StaticPtr.h"
  39. #include "mozilla/Unused.h"
  40. #ifdef MOZ_PEERCONNECTION
  41. #include "mtransport/runnable_utils.h"
  42. #endif
  43. #define DATACHANNEL_LOG(args) LOG(args)
  44. #include "DataChannel.h"
  45. #include "DataChannelProtocol.h"
  46. // Let us turn on and off important assertions in non-debug builds
  47. #ifdef DEBUG
  48. #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
  49. #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
  50. #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
  51. #endif
  52. static bool sctp_initialized;
  53. namespace mozilla {
  54. LazyLogModule gDataChannelLog("DataChannel");
  55. static LazyLogModule gSCTPLog("SCTP");
  56. class DataChannelShutdown : public nsIObserver
  57. {
  58. public:
  59. // This needs to be tied to some form object that is guaranteed to be
  60. // around (singleton likely) unless we want to shutdown sctp whenever
  61. // we're not using it (and in which case we'd keep a refcnt'd object
  62. // ref'd by each DataChannelConnection to release the SCTP usrlib via
  63. // sctp_finish). Right now, the single instance of this class is
  64. // owned by the observer service.
  65. NS_DECL_ISUPPORTS
  66. DataChannelShutdown() {}
  67. void Init()
  68. {
  69. nsCOMPtr<nsIObserverService> observerService =
  70. mozilla::services::GetObserverService();
  71. if (!observerService)
  72. return;
  73. nsresult rv = observerService->AddObserver(this,
  74. "xpcom-will-shutdown",
  75. false);
  76. MOZ_ASSERT(rv == NS_OK);
  77. (void) rv;
  78. }
  79. private:
  80. // The only instance of DataChannelShutdown is owned by the observer
  81. // service, so there is no need to call RemoveObserver here.
  82. virtual ~DataChannelShutdown() = default;
  83. public:
  84. NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
  85. const char16_t* aData) override {
  86. if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
  87. LOG(("Shutting down SCTP"));
  88. if (sctp_initialized) {
  89. usrsctp_finish();
  90. sctp_initialized = false;
  91. }
  92. nsCOMPtr<nsIObserverService> observerService =
  93. mozilla::services::GetObserverService();
  94. if (!observerService)
  95. return NS_ERROR_FAILURE;
  96. nsresult rv = observerService->RemoveObserver(this,
  97. "xpcom-will-shutdown");
  98. MOZ_ASSERT(rv == NS_OK);
  99. (void) rv;
  100. }
  101. return NS_OK;
  102. }
  103. };
  104. NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
  105. BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
  106. size_t length) : mLength(length)
  107. {
  108. mSpa = new sctp_sendv_spa;
  109. *mSpa = spa;
  110. auto *tmp = new char[length]; // infallible malloc!
  111. memcpy(tmp, data, length);
  112. mData = tmp;
  113. }
  114. BufferedMsg::~BufferedMsg()
  115. {
  116. delete mSpa;
  117. delete mData;
  118. }
  119. static int
  120. receive_cb(struct socket* sock, union sctp_sockstore addr,
  121. void *data, size_t datalen,
  122. struct sctp_rcvinfo rcv, int flags, void *ulp_info)
  123. {
  124. DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
  125. return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
  126. }
  127. static
  128. DataChannelConnection *
  129. GetConnectionFromSocket(struct socket* sock)
  130. {
  131. struct sockaddr *addrs = nullptr;
  132. int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
  133. if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
  134. return nullptr;
  135. }
  136. // usrsctp_getladdrs() returns the addresses bound to this socket, which
  137. // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
  138. // then free the list of addresses once we have the pointer. We only open
  139. // AF_CONN sockets, and they should all have the sconn_addr set to the
  140. // pointer that created them, so [0] is as good as any other.
  141. struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
  142. DataChannelConnection *connection =
  143. reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
  144. usrsctp_freeladdrs(addrs);
  145. return connection;
  146. }
  147. // called when the buffer empties to the threshold value
  148. static int
  149. threshold_event(struct socket* sock, uint32_t sb_free)
  150. {
  151. DataChannelConnection *connection = GetConnectionFromSocket(sock);
  152. if (connection) {
  153. LOG(("SendDeferred()"));
  154. connection->SendDeferredMessages();
  155. } else {
  156. LOG(("Can't find connection for socket %p", sock));
  157. }
  158. return 0;
  159. }
  160. static void
  161. debug_printf(const char *format, ...)
  162. {
  163. va_list ap;
  164. char buffer[1024];
  165. if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
  166. va_start(ap, format);
  167. #ifdef _WIN32
  168. if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
  169. #else
  170. if (VsprintfLiteral(buffer, format, ap) > 0) {
  171. #endif
  172. PR_LogPrint("%s", buffer);
  173. }
  174. va_end(ap);
  175. }
  176. }
  177. DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
  178. mLock("netwerk::sctp::DataChannelConnection")
  179. {
  180. mState = CLOSED;
  181. mSocket = nullptr;
  182. mMasterSocket = nullptr;
  183. mListener = listener;
  184. mLocalPort = 0;
  185. mRemotePort = 0;
  186. LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
  187. mInternalIOThread = nullptr;
  188. }
  189. DataChannelConnection::~DataChannelConnection()
  190. {
  191. LOG(("Deleting DataChannelConnection %p", (void *) this));
  192. // This may die on the MainThread, or on the STS thread
  193. ASSERT_WEBRTC(mState == CLOSED);
  194. MOZ_ASSERT(!mMasterSocket);
  195. MOZ_ASSERT(mPending.GetSize() == 0);
  196. // Already disconnected from sigslot/mTransportFlow
  197. // TransportFlows must be released from the STS thread
  198. if (!IsSTSThread()) {
  199. ASSERT_WEBRTC(NS_IsMainThread());
  200. if (mTransportFlow) {
  201. ASSERT_WEBRTC(mSTS);
  202. NS_ProxyRelease(mSTS, mTransportFlow.forget());
  203. }
  204. if (mInternalIOThread) {
  205. // Avoid spinning the event thread from here (which if we're mainthread
  206. // is in the event loop already)
  207. NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
  208. &nsIThread::Shutdown),
  209. NS_DISPATCH_NORMAL);
  210. }
  211. } else {
  212. // on STS, safe to call shutdown
  213. if (mInternalIOThread) {
  214. mInternalIOThread->Shutdown();
  215. }
  216. }
  217. }
  218. void
  219. DataChannelConnection::Destroy()
  220. {
  221. // Though it's probably ok to do this and close the sockets;
  222. // if we really want it to do true clean shutdowns it can
  223. // create a dependant Internal object that would remain around
  224. // until the network shut down the association or timed out.
  225. LOG(("Destroying DataChannelConnection %p", (void *) this));
  226. ASSERT_WEBRTC(NS_IsMainThread());
  227. CloseAll();
  228. MutexAutoLock lock(mLock);
  229. // If we had a pending reset, we aren't waiting for it - clear the list so
  230. // we can deregister this DataChannelConnection without leaking.
  231. ClearResets();
  232. MOZ_ASSERT(mSTS);
  233. ASSERT_WEBRTC(NS_IsMainThread());
  234. // Must do this in Destroy() since we may then delete this object.
  235. // Do this before dispatching to create a consistent ordering of calls to
  236. // the SCTP stack.
  237. if (mUsingDtls) {
  238. usrsctp_deregister_address(static_cast<void *>(this));
  239. LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
  240. }
  241. mListener = nullptr;
  242. // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
  243. // the usrsctp_close() calls can move back here (and just proxy the
  244. // disconnect_all())
  245. RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
  246. &DataChannelConnection::DestroyOnSTS,
  247. mSocket, mMasterSocket),
  248. NS_DISPATCH_NORMAL);
  249. // These will be released on STS
  250. mSocket = nullptr;
  251. mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
  252. // We can't get any more new callbacks from the SCTP library
  253. // All existing callbacks have refs to DataChannelConnection
  254. // nsDOMDataChannel objects have refs to DataChannels that have refs to us
  255. }
  256. void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
  257. struct socket *aSocket)
  258. {
  259. if (aSocket && aSocket != aMasterSocket)
  260. usrsctp_close(aSocket);
  261. if (aMasterSocket)
  262. usrsctp_close(aMasterSocket);
  263. disconnect_all();
  264. }
  265. bool
  266. DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
  267. {
  268. struct sctp_initmsg initmsg;
  269. struct sctp_udpencaps encaps;
  270. struct sctp_assoc_value av;
  271. struct sctp_event event;
  272. socklen_t len;
  273. uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
  274. SCTP_PEER_ADDR_CHANGE,
  275. SCTP_REMOTE_ERROR,
  276. SCTP_SHUTDOWN_EVENT,
  277. SCTP_ADAPTATION_INDICATION,
  278. SCTP_SEND_FAILED_EVENT,
  279. SCTP_STREAM_RESET_EVENT,
  280. SCTP_STREAM_CHANGE_EVENT};
  281. {
  282. ASSERT_WEBRTC(NS_IsMainThread());
  283. // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
  284. if (!sctp_initialized) {
  285. if (aUsingDtls) {
  286. LOG(("sctp_init(DTLS)"));
  287. #ifdef MOZ_PEERCONNECTION
  288. usrsctp_init(0,
  289. DataChannelConnection::SctpDtlsOutput,
  290. debug_printf
  291. );
  292. #else
  293. NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
  294. #endif
  295. } else {
  296. LOG(("sctp_init(%u)", aPort));
  297. usrsctp_init(aPort,
  298. nullptr,
  299. debug_printf
  300. );
  301. }
  302. // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
  303. if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
  304. usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
  305. }
  306. usrsctp_sysctl_set_sctp_blackhole(2);
  307. // ECN is currently not supported by the Firefox code
  308. usrsctp_sysctl_set_sctp_ecn_enable(0);
  309. // Disabling authentication and dynamic address reconfiguration as neither
  310. // of them are used for data channel and only result in additional code
  311. // paths being used.
  312. usrsctp_sysctl_set_sctp_asconf_enable(0);
  313. usrsctp_sysctl_set_sctp_auth_enable(0);
  314. sctp_initialized = true;
  315. RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
  316. shutdown->Init();
  317. }
  318. }
  319. // XXX FIX! make this a global we get once
  320. // Find the STS thread
  321. nsresult rv;
  322. mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
  323. MOZ_ASSERT(NS_SUCCEEDED(rv));
  324. // Open sctp with a callback
  325. if ((mMasterSocket = usrsctp_socket(
  326. aUsingDtls ? AF_CONN : AF_INET,
  327. SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
  328. usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
  329. return false;
  330. }
  331. // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
  332. // in associations for normal IO
  333. if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
  334. LOG(("Couldn't set non_blocking on SCTP socket"));
  335. // We can't handle connect() safely if it will block, not that this will
  336. // even happen.
  337. goto error_cleanup;
  338. }
  339. // Make sure when we close the socket, make sure it doesn't call us back again!
  340. // This would cause it try to use an invalid DataChannelConnection pointer
  341. struct linger l;
  342. l.l_onoff = 1;
  343. l.l_linger = 0;
  344. if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
  345. (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
  346. LOG(("Couldn't set SO_LINGER on SCTP socket"));
  347. // unsafe to allow it to continue if this fails
  348. goto error_cleanup;
  349. }
  350. // XXX Consider disabling this when we add proper SDP negotiation.
  351. // We may want to leave enabled for supporting 'cloning' of SDP offers, which
  352. // implies re-use of the same pseudo-port number, or forcing a renegotiation.
  353. {
  354. uint32_t on = 1;
  355. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
  356. (const void *)&on, (socklen_t)sizeof(on)) < 0) {
  357. LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
  358. }
  359. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
  360. (const void *)&on, (socklen_t)sizeof(on)) < 0) {
  361. LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
  362. }
  363. }
  364. if (!aUsingDtls) {
  365. memset(&encaps, 0, sizeof(encaps));
  366. encaps.sue_address.ss_family = AF_INET;
  367. encaps.sue_port = htons(aPort);
  368. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
  369. (const void*)&encaps,
  370. (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
  371. LOG(("*** failed encaps errno %d", errno));
  372. goto error_cleanup;
  373. }
  374. LOG(("SCTP encapsulation local port %d", aPort));
  375. }
  376. av.assoc_id = SCTP_ALL_ASSOC;
  377. av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
  378. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
  379. (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
  380. LOG(("*** failed enable stream reset errno %d", errno));
  381. goto error_cleanup;
  382. }
  383. /* Enable the events of interest. */
  384. memset(&event, 0, sizeof(event));
  385. event.se_assoc_id = SCTP_ALL_ASSOC;
  386. event.se_on = 1;
  387. for (unsigned short event_type : event_types) {
  388. event.se_type = event_type;
  389. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
  390. LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
  391. goto error_cleanup;
  392. }
  393. }
  394. // Update number of streams
  395. mStreams.AppendElements(aNumStreams);
  396. for (uint32_t i = 0; i < aNumStreams; ++i) {
  397. mStreams[i] = nullptr;
  398. }
  399. memset(&initmsg, 0, sizeof(initmsg));
  400. len = sizeof(initmsg);
  401. if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
  402. LOG(("*** failed getsockopt SCTP_INITMSG"));
  403. goto error_cleanup;
  404. }
  405. LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
  406. initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
  407. initmsg.sinit_num_ostreams = aNumStreams;
  408. initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
  409. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
  410. (socklen_t)sizeof(initmsg)) < 0) {
  411. LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
  412. goto error_cleanup;
  413. }
  414. mSocket = nullptr;
  415. if (aUsingDtls) {
  416. mUsingDtls = true;
  417. usrsctp_register_address(static_cast<void *>(this));
  418. LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
  419. } else {
  420. mUsingDtls = false;
  421. }
  422. return true;
  423. error_cleanup:
  424. usrsctp_close(mMasterSocket);
  425. mMasterSocket = nullptr;
  426. mUsingDtls = false;
  427. return false;
  428. }
  429. #ifdef MOZ_PEERCONNECTION
  430. void
  431. DataChannelConnection::SetEvenOdd()
  432. {
  433. ASSERT_WEBRTC(IsSTSThread());
  434. TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
  435. mTransportFlow->GetLayer(TransportLayerDtls::ID()));
  436. MOZ_ASSERT(dtls); // DTLS is mandatory
  437. mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
  438. }
  439. bool
  440. DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
  441. {
  442. LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
  443. NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
  444. NS_ENSURE_TRUE(aFlow, false);
  445. mTransportFlow = aFlow;
  446. mLocalPort = localport;
  447. mRemotePort = remoteport;
  448. mState = CONNECTING;
  449. RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
  450. &DataChannelConnection::SetSignals),
  451. NS_DISPATCH_NORMAL);
  452. return true;
  453. }
  454. void
  455. DataChannelConnection::SetSignals()
  456. {
  457. ASSERT_WEBRTC(IsSTSThread());
  458. ASSERT_WEBRTC(mTransportFlow);
  459. LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
  460. mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
  461. // SignalStateChange() doesn't call you with the initial state
  462. mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
  463. CompleteConnect(mTransportFlow, mTransportFlow->state());
  464. }
  465. void
  466. DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
  467. {
  468. LOG(("Data transport state: %d", state));
  469. MutexAutoLock lock(mLock);
  470. ASSERT_WEBRTC(IsSTSThread());
  471. // We should abort connection on TS_ERROR.
  472. // Note however that the association will also fail (perhaps with a delay) and
  473. // notify us in that way
  474. if (state != TransportLayer::TS_OPEN || !mMasterSocket)
  475. return;
  476. struct sockaddr_conn addr;
  477. memset(&addr, 0, sizeof(addr));
  478. addr.sconn_family = AF_CONN;
  479. #if defined(__Userspace_os_Darwin)
  480. addr.sconn_len = sizeof(addr);
  481. #endif
  482. addr.sconn_port = htons(mLocalPort);
  483. addr.sconn_addr = static_cast<void *>(this);
  484. LOG(("Calling usrsctp_bind"));
  485. int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
  486. sizeof(addr));
  487. if (r < 0) {
  488. LOG(("usrsctp_bind failed: %d", r));
  489. } else {
  490. // This is the remote addr
  491. addr.sconn_port = htons(mRemotePort);
  492. LOG(("Calling usrsctp_connect"));
  493. r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
  494. sizeof(addr));
  495. if (r >= 0 || errno == EINPROGRESS) {
  496. struct sctp_paddrparams paddrparams;
  497. socklen_t opt_len;
  498. memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
  499. memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
  500. opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
  501. r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
  502. &paddrparams, &opt_len);
  503. if (r < 0) {
  504. LOG(("usrsctp_getsockopt failed: %d", r));
  505. } else {
  506. // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280
  507. paddrparams.spp_pathmtu = 1200; // safe for either
  508. paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
  509. paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
  510. opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
  511. r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
  512. &paddrparams, opt_len);
  513. if (r < 0) {
  514. LOG(("usrsctp_getsockopt failed: %d", r));
  515. } else {
  516. LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu));
  517. }
  518. }
  519. }
  520. if (r < 0) {
  521. if (errno == EINPROGRESS) {
  522. // non-blocking
  523. return;
  524. } else {
  525. LOG(("usrsctp_connect failed: %d", errno));
  526. mState = CLOSED;
  527. }
  528. } else {
  529. // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
  530. // This also avoids issues with calling TransportFlow stuff on Mainthread
  531. return;
  532. }
  533. }
  534. // Note: currently this doesn't actually notify the application
  535. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  536. DataChannelOnMessageAvailable::ON_CONNECTION,
  537. this)));
  538. return;
  539. }
  540. // Process any pending Opens
  541. void
  542. DataChannelConnection::ProcessQueuedOpens()
  543. {
  544. // The nsDeque holds channels with an AddRef applied. Another reference
  545. // (may) be held by the DOMDataChannel, unless it's been GC'd. No other
  546. // references should exist.
  547. // Can't copy nsDeque's. Move into temp array since any that fail will
  548. // go back to mPending
  549. nsDeque temp;
  550. DataChannel *temp_channel; // really already_AddRefed<>
  551. while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
  552. temp.Push(static_cast<void *>(temp_channel));
  553. }
  554. RefPtr<DataChannel> channel;
  555. // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
  556. while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
  557. if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
  558. LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
  559. channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
  560. // OpenFinish returns a reference itself, so we need to take it can Release it
  561. channel = OpenFinish(channel.forget()); // may reset the flag and re-push
  562. } else {
  563. NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
  564. }
  565. }
  566. }
  567. void
  568. DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
  569. const unsigned char *data, size_t len)
  570. {
  571. if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
  572. char *buf;
  573. if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
  574. PR_LogPrint("%s", buf);
  575. usrsctp_freedumpbuffer(buf);
  576. }
  577. }
  578. // Pass the data to SCTP
  579. MutexAutoLock lock(mLock);
  580. usrsctp_conninput(static_cast<void *>(this), data, len, 0);
  581. }
  582. int
  583. DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
  584. {
  585. //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
  586. int res = 0;
  587. if (mTransportFlow) {
  588. res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
  589. }
  590. if (release)
  591. delete [] data;
  592. return res;
  593. }
  594. /* static */
  595. int
  596. DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
  597. uint8_t tos, uint8_t set_df)
  598. {
  599. DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
  600. int res;
  601. if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
  602. char *buf;
  603. if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
  604. PR_LogPrint("%s", buf);
  605. usrsctp_freedumpbuffer(buf);
  606. }
  607. }
  608. // We're async proxying even if on the STSThread because this is called
  609. // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
  610. // SCTP has an option for Apple, on IP connections only, to release at least
  611. // one of the locks before calling a packet output routine; with changes to
  612. // the underlying SCTP stack this might remove the need to use an async proxy.
  613. if ((false /*peer->IsSTSThread()*/)) {
  614. res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
  615. } else {
  616. auto *data = new unsigned char[length];
  617. memcpy(data, buffer, length);
  618. // Commented out since we have to Dispatch SendPacket to avoid deadlock"
  619. // res = -1;
  620. // XXX It might be worthwhile to add an assertion against the thread
  621. // somehow getting into the DataChannel/SCTP code again, as
  622. // DISPATCH_SYNC is not fully blocking. This may be tricky, as it
  623. // needs to be a per-thread check, not a global.
  624. peer->mSTS->Dispatch(WrapRunnable(
  625. RefPtr<DataChannelConnection>(peer),
  626. &DataChannelConnection::SendPacket, data, length, true),
  627. NS_DISPATCH_NORMAL);
  628. res = 0; // cheat! Packets can always be dropped later anyways
  629. }
  630. return res;
  631. }
  632. #endif
  633. #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
  634. // listen for incoming associations
  635. // Blocks! - Don't call this from main thread!
  636. #error This code will not work as-is since SetEvenOdd() runs on Mainthread
  637. bool
  638. DataChannelConnection::Listen(unsigned short port)
  639. {
  640. struct sockaddr_in addr;
  641. socklen_t addr_len;
  642. NS_WARNING_ASSERTION(!NS_IsMainThread(),
  643. "Blocks, do not call from main thread!!!");
  644. /* Acting as the 'server' */
  645. memset((void *)&addr, 0, sizeof(addr));
  646. #ifdef HAVE_SIN_LEN
  647. addr.sin_len = sizeof(struct sockaddr_in);
  648. #endif
  649. addr.sin_family = AF_INET;
  650. addr.sin_port = htons(port);
  651. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  652. LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
  653. mState = CONNECTING;
  654. if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
  655. LOG(("***Failed userspace_bind"));
  656. return false;
  657. }
  658. if (usrsctp_listen(mMasterSocket, 1) < 0) {
  659. LOG(("***Failed userspace_listen"));
  660. return false;
  661. }
  662. LOG(("Accepting connection"));
  663. addr_len = 0;
  664. if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
  665. LOG(("***Failed accept"));
  666. return false;
  667. }
  668. mState = OPEN;
  669. struct linger l;
  670. l.l_onoff = 1;
  671. l.l_linger = 0;
  672. if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
  673. (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
  674. LOG(("Couldn't set SO_LINGER on SCTP socket"));
  675. }
  676. SetEvenOdd();
  677. // Notify Connection open
  678. // XXX We need to make sure connection sticks around until the message is delivered
  679. LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
  680. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  681. DataChannelOnMessageAvailable::ON_CONNECTION,
  682. this, (DataChannel *) nullptr)));
  683. return true;
  684. }
  685. // Blocks! - Don't call this from main thread!
  686. bool
  687. DataChannelConnection::Connect(const char *addr, unsigned short port)
  688. {
  689. struct sockaddr_in addr4;
  690. struct sockaddr_in6 addr6;
  691. NS_WARNING_ASSERTION(!NS_IsMainThread(),
  692. "Blocks, do not call from main thread!!!");
  693. /* Acting as the connector */
  694. LOG(("Connecting to %s, port %u", addr, port));
  695. memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
  696. memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
  697. #ifdef HAVE_SIN_LEN
  698. addr4.sin_len = sizeof(struct sockaddr_in);
  699. #endif
  700. #ifdef HAVE_SIN6_LEN
  701. addr6.sin6_len = sizeof(struct sockaddr_in6);
  702. #endif
  703. addr4.sin_family = AF_INET;
  704. addr6.sin6_family = AF_INET6;
  705. addr4.sin_port = htons(port);
  706. addr6.sin6_port = htons(port);
  707. mState = CONNECTING;
  708. #if !defined(__Userspace_os_Windows)
  709. if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
  710. if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
  711. LOG(("*** Failed userspace_connect"));
  712. return false;
  713. }
  714. } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
  715. if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
  716. LOG(("*** Failed userspace_connect"));
  717. return false;
  718. }
  719. } else {
  720. LOG(("*** Illegal destination address."));
  721. }
  722. #else
  723. {
  724. struct sockaddr_storage ss;
  725. int sslen = sizeof(ss);
  726. if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
  727. addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
  728. if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
  729. LOG(("*** Failed userspace_connect"));
  730. return false;
  731. }
  732. } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
  733. addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
  734. if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
  735. LOG(("*** Failed userspace_connect"));
  736. return false;
  737. }
  738. } else {
  739. LOG(("*** Illegal destination address."));
  740. }
  741. }
  742. #endif
  743. mSocket = mMasterSocket;
  744. LOG(("connect() succeeded! Entering connected mode"));
  745. mState = OPEN;
  746. SetEvenOdd();
  747. // Notify Connection open
  748. // XXX We need to make sure connection sticks around until the message is delivered
  749. LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
  750. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  751. DataChannelOnMessageAvailable::ON_CONNECTION,
  752. this, (DataChannel *) nullptr)));
  753. return true;
  754. }
  755. #endif
  756. DataChannel *
  757. DataChannelConnection::FindChannelByStream(uint16_t stream)
  758. {
  759. return mStreams.SafeElementAt(stream);
  760. }
  761. uint16_t
  762. DataChannelConnection::FindFreeStream()
  763. {
  764. uint32_t i, j, limit;
  765. limit = mStreams.Length();
  766. if (limit > MAX_NUM_STREAMS)
  767. limit = MAX_NUM_STREAMS;
  768. for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
  769. if (!mStreams[i]) {
  770. // Verify it's not still in the process of closing
  771. for (j = 0; j < mStreamsResetting.Length(); ++j) {
  772. if (mStreamsResetting[j] == i) {
  773. break;
  774. }
  775. }
  776. if (j == mStreamsResetting.Length())
  777. break;
  778. }
  779. }
  780. if (i >= limit) {
  781. return INVALID_STREAM;
  782. }
  783. return i;
  784. }
  785. bool
  786. DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
  787. {
  788. struct sctp_status status;
  789. struct sctp_add_streams sas;
  790. uint32_t outStreamsNeeded;
  791. socklen_t len;
  792. if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
  793. aNeeded = MAX_NUM_STREAMS - mStreams.Length();
  794. }
  795. if (aNeeded <= 0) {
  796. return false;
  797. }
  798. len = (socklen_t)sizeof(struct sctp_status);
  799. if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
  800. LOG(("***failed: getsockopt SCTP_STATUS"));
  801. return false;
  802. }
  803. outStreamsNeeded = aNeeded; // number to add
  804. // Note: if multiple channel opens happen when we don't have enough space,
  805. // we'll call RequestMoreStreams() multiple times
  806. memset(&sas, 0, sizeof(sas));
  807. sas.sas_instrms = 0;
  808. sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
  809. // Doesn't block, we get an event when it succeeds or fails
  810. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
  811. (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
  812. if (errno == EALREADY) {
  813. LOG(("Already have %u output streams", outStreamsNeeded));
  814. return true;
  815. }
  816. LOG(("***failed: setsockopt ADD errno=%d", errno));
  817. return false;
  818. }
  819. LOG(("Requested %u more streams", outStreamsNeeded));
  820. // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
  821. // values are larger than mStreams.Length()
  822. return true;
  823. }
  824. int32_t
  825. DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
  826. {
  827. struct sctp_sndinfo sndinfo;
  828. // Note: Main-thread IO, but doesn't block
  829. memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
  830. sndinfo.snd_sid = stream;
  831. sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
  832. if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
  833. &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
  834. SCTP_SENDV_SNDINFO, 0) < 0) {
  835. //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
  836. return (0);
  837. }
  838. return (1);
  839. }
  840. int32_t
  841. DataChannelConnection::SendOpenAckMessage(uint16_t stream)
  842. {
  843. struct rtcweb_datachannel_ack ack;
  844. memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
  845. ack.msg_type = DATA_CHANNEL_ACK;
  846. return SendControlMessage(&ack, sizeof(ack), stream);
  847. }
  848. int32_t
  849. DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
  850. const nsACString& protocol,
  851. uint16_t stream, bool unordered,
  852. uint16_t prPolicy, uint32_t prValue)
  853. {
  854. const int label_len = label.Length(); // not including nul
  855. const int proto_len = protocol.Length(); // not including nul
  856. // careful - request struct include one char for the label
  857. const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
  858. label_len + proto_len;
  859. struct rtcweb_datachannel_open_request *req =
  860. (struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size);
  861. memset(req, 0, req_size);
  862. req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
  863. switch (prPolicy) {
  864. case SCTP_PR_SCTP_NONE:
  865. req->channel_type = DATA_CHANNEL_RELIABLE;
  866. break;
  867. case SCTP_PR_SCTP_TTL:
  868. req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
  869. break;
  870. case SCTP_PR_SCTP_RTX:
  871. req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
  872. break;
  873. default:
  874. // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
  875. free(req);
  876. return (0);
  877. }
  878. if (unordered) {
  879. // Per the current types, all differ by 0x80 between ordered and unordered
  880. req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
  881. }
  882. req->reliability_param = htonl(prValue);
  883. req->priority = htons(0); /* XXX: add support */
  884. req->label_length = htons(label_len);
  885. req->protocol_length = htons(proto_len);
  886. memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
  887. memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
  888. int32_t result = SendControlMessage(req, req_size, stream);
  889. free(req);
  890. return result;
  891. }
  892. // XXX This should use a separate thread (outbound queue) which should
  893. // select() to know when to *try* to send data to the socket again.
  894. // Alternatively, it can use a timeout, but that's guaranteed to be wrong
  895. // (just not sure in what direction). We could re-implement NSPR's
  896. // PR_POLL_WRITE/etc handling... with a lot of work.
  897. // Better yet, use the SCTP stack's notifications on buffer state to avoid
  898. // filling the SCTP's buffers.
  899. // returns if we're still blocked or not
  900. bool
  901. DataChannelConnection::SendDeferredMessages()
  902. {
  903. uint32_t i;
  904. RefPtr<DataChannel> channel; // we may null out the refs to this
  905. bool still_blocked = false;
  906. // This may block while something is modifying channels, but should not block for IO
  907. mLock.AssertCurrentThreadOwns();
  908. // XXX For total fairness, on a still_blocked we'd start next time at the
  909. // same index. Sorry, not going to bother for now.
  910. for (i = 0; i < mStreams.Length(); ++i) {
  911. channel = mStreams[i];
  912. if (!channel)
  913. continue;
  914. // Only one of these should be set....
  915. if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
  916. if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
  917. channel->mStream,
  918. channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
  919. channel->mPrPolicy, channel->mPrValue)) {
  920. channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
  921. channel->mState = OPEN;
  922. channel->mReady = true;
  923. LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  924. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  925. DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
  926. channel)));
  927. } else {
  928. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  929. still_blocked = true;
  930. } else {
  931. // Close the channel, inform the user
  932. mStreams[channel->mStream] = nullptr;
  933. channel->mState = CLOSED;
  934. // Don't need to reset; we didn't open it
  935. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  936. DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  937. channel)));
  938. }
  939. }
  940. }
  941. if (still_blocked)
  942. break;
  943. if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
  944. if (SendOpenAckMessage(channel->mStream)) {
  945. channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
  946. } else {
  947. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  948. still_blocked = true;
  949. } else {
  950. // Close the channel, inform the user
  951. CloseInt(channel);
  952. // XXX send error via DataChannelOnMessageAvailable (bug 843625)
  953. }
  954. }
  955. }
  956. if (still_blocked)
  957. break;
  958. if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
  959. bool failed_send = false;
  960. int32_t result;
  961. if (channel->mState == CLOSED || channel->mState == CLOSING) {
  962. channel->mBufferedData.Clear();
  963. }
  964. uint32_t buffered_amount = channel->GetBufferedAmountLocked();
  965. uint32_t threshold = channel->GetBufferedAmountLowThreshold();
  966. bool was_over_threshold = buffered_amount >= threshold;
  967. while (!channel->mBufferedData.IsEmpty() &&
  968. !failed_send) {
  969. struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
  970. const char *data = channel->mBufferedData[0]->mData;
  971. size_t len = channel->mBufferedData[0]->mLength;
  972. // SCTP will return EMSGSIZE if the message is bigger than the buffer
  973. // size (or EAGAIN if there isn't space)
  974. if ((result = usrsctp_sendv(mSocket, data, len,
  975. nullptr, 0,
  976. (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
  977. SCTP_SENDV_SPA,
  978. 0)) < 0) {
  979. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  980. // leave queued for resend
  981. failed_send = true;
  982. LOG(("queue full again when resending %d bytes (%d)", len, result));
  983. } else {
  984. LOG(("error %d re-sending string", errno));
  985. failed_send = true;
  986. }
  987. } else {
  988. LOG(("Resent buffer of %d bytes (%d)", len, result));
  989. // In theory this could underflow if >4GB was buffered and re
  990. // truncated in GetBufferedAmount(), but this won't cause any problems.
  991. buffered_amount -= channel->mBufferedData[0]->mLength;
  992. channel->mBufferedData.RemoveElementAt(0);
  993. // can never fire with default threshold of 0
  994. if (was_over_threshold && buffered_amount < threshold) {
  995. LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
  996. channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
  997. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  998. DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
  999. this, channel)));
  1000. was_over_threshold = false;
  1001. }
  1002. if (buffered_amount == 0) {
  1003. // buffered-to-not-buffered transition; tell the DOM code in case this makes it
  1004. // available for GC
  1005. LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
  1006. channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
  1007. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1008. DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
  1009. this, channel)));
  1010. }
  1011. }
  1012. }
  1013. if (channel->mBufferedData.IsEmpty())
  1014. channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
  1015. else
  1016. still_blocked = true;
  1017. }
  1018. if (still_blocked)
  1019. break;
  1020. }
  1021. return still_blocked;
  1022. }
  1023. void
  1024. DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
  1025. size_t length,
  1026. uint16_t stream)
  1027. {
  1028. RefPtr<DataChannel> channel;
  1029. uint32_t prValue;
  1030. uint16_t prPolicy;
  1031. uint32_t flags;
  1032. mLock.AssertCurrentThreadOwns();
  1033. if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
  1034. LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
  1035. (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
  1036. if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
  1037. return;
  1038. }
  1039. LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
  1040. switch (req->channel_type) {
  1041. case DATA_CHANNEL_RELIABLE:
  1042. case DATA_CHANNEL_RELIABLE_UNORDERED:
  1043. prPolicy = SCTP_PR_SCTP_NONE;
  1044. break;
  1045. case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
  1046. case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
  1047. prPolicy = SCTP_PR_SCTP_RTX;
  1048. break;
  1049. case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
  1050. case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
  1051. prPolicy = SCTP_PR_SCTP_TTL;
  1052. break;
  1053. default:
  1054. LOG(("Unknown channel type", req->channel_type));
  1055. /* XXX error handling */
  1056. return;
  1057. }
  1058. prValue = ntohl(req->reliability_param);
  1059. flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
  1060. if ((channel = FindChannelByStream(stream))) {
  1061. if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
  1062. LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
  1063. stream, channel->mState));
  1064. /* XXX: some error handling */
  1065. } else {
  1066. LOG(("Open for externally negotiated channel %u", stream));
  1067. // XXX should also check protocol, maybe label
  1068. if (prPolicy != channel->mPrPolicy ||
  1069. prValue != channel->mPrValue ||
  1070. flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
  1071. {
  1072. LOG(("WARNING: external negotiation mismatch with OpenRequest:"
  1073. "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
  1074. stream, prPolicy, channel->mPrPolicy,
  1075. prValue, channel->mPrValue, flags, channel->mFlags));
  1076. }
  1077. }
  1078. return;
  1079. }
  1080. if (stream >= mStreams.Length()) {
  1081. LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
  1082. return;
  1083. }
  1084. nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
  1085. nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
  1086. ntohs(req->protocol_length)));
  1087. channel = new DataChannel(this,
  1088. stream,
  1089. DataChannel::CONNECTING,
  1090. label,
  1091. protocol,
  1092. prPolicy, prValue,
  1093. flags,
  1094. nullptr, nullptr);
  1095. mStreams[stream] = channel;
  1096. channel->mState = DataChannel::WAITING_TO_OPEN;
  1097. LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
  1098. channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
  1099. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1100. DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
  1101. this, channel)));
  1102. LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  1103. if (!SendOpenAckMessage(stream)) {
  1104. // XXX Only on EAGAIN!? And if not, then close the channel??
  1105. channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
  1106. // Note: we're locked, so there's no danger of a race with the
  1107. // buffer-threshold callback
  1108. }
  1109. // Now process any queued data messages for the channel (which will
  1110. // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
  1111. // more that come in before that happens)
  1112. DeliverQueuedData(stream);
  1113. }
  1114. // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
  1115. // That would make this code moot. Keep it for now for backwards compatibility.
  1116. void
  1117. DataChannelConnection::DeliverQueuedData(uint16_t stream)
  1118. {
  1119. mLock.AssertCurrentThreadOwns();
  1120. uint32_t i = 0;
  1121. while (i < mQueuedData.Length()) {
  1122. // Careful! we may modify the array length from within the loop!
  1123. if (mQueuedData[i]->mStream == stream) {
  1124. LOG(("Delivering queued data for stream %u, length %u",
  1125. stream, (unsigned int) mQueuedData[i]->mLength));
  1126. // Deliver the queued data
  1127. HandleDataMessage(mQueuedData[i]->mPpid,
  1128. mQueuedData[i]->mData, mQueuedData[i]->mLength,
  1129. mQueuedData[i]->mStream);
  1130. mQueuedData.RemoveElementAt(i);
  1131. continue; // don't bump index since we removed the element
  1132. }
  1133. i++;
  1134. }
  1135. }
  1136. void
  1137. DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
  1138. size_t length, uint16_t stream)
  1139. {
  1140. DataChannel *channel;
  1141. mLock.AssertCurrentThreadOwns();
  1142. channel = FindChannelByStream(stream);
  1143. NS_ENSURE_TRUE_VOID(channel);
  1144. LOG(("OpenAck received for stream %u, waiting=%d", stream,
  1145. (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
  1146. channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
  1147. }
  1148. void
  1149. DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
  1150. {
  1151. /* XXX: Send an error message? */
  1152. LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
  1153. // XXX Log to JS error console if possible
  1154. }
  1155. void
  1156. DataChannelConnection::HandleDataMessage(uint32_t ppid,
  1157. const void *data, size_t length,
  1158. uint16_t stream)
  1159. {
  1160. DataChannel *channel;
  1161. const char *buffer = (const char *) data;
  1162. mLock.AssertCurrentThreadOwns();
  1163. channel = FindChannelByStream(stream);
  1164. // XXX A closed channel may trip this... check
  1165. // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
  1166. // That would make this code moot. Keep it for now for backwards compatibility.
  1167. if (!channel) {
  1168. // In the updated 0-RTT open case, the sender can send data immediately
  1169. // after Open, and doesn't set the in-order bit (since we don't have a
  1170. // response or ack). Also, with external negotiation, data can come in
  1171. // before we're told about the external negotiation. We need to buffer
  1172. // data until either a) Open comes in, if the ordering get messed up,
  1173. // or b) the app tells us this channel was externally negotiated. When
  1174. // these occur, we deliver the data.
  1175. // Since this is rare and non-performance, keep a single list of queued
  1176. // data messages to deliver once the channel opens.
  1177. LOG(("Queuing data for stream %u, length %u", stream, length));
  1178. // Copies data
  1179. mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
  1180. return;
  1181. }
  1182. // XXX should this be a simple if, no warnings/debugbreaks?
  1183. NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
  1184. {
  1185. nsAutoCString recvData(buffer, length); // copies (<64) or allocates
  1186. bool is_binary = true;
  1187. if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
  1188. ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
  1189. is_binary = false;
  1190. }
  1191. if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
  1192. NS_WARNING("DataChannel message aborted by fragment type change!");
  1193. channel->mRecvBuffer.Truncate(0);
  1194. }
  1195. channel->mIsRecvBinary = is_binary;
  1196. switch (ppid) {
  1197. case DATA_CHANNEL_PPID_DOMSTRING:
  1198. case DATA_CHANNEL_PPID_BINARY:
  1199. channel->mRecvBuffer += recvData;
  1200. LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
  1201. is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
  1202. channel->mStream));
  1203. return; // Not ready to notify application
  1204. case DATA_CHANNEL_PPID_DOMSTRING_LAST:
  1205. LOG(("DataChannel: String message received of length %lu on channel %u",
  1206. length, channel->mStream));
  1207. if (!channel->mRecvBuffer.IsEmpty()) {
  1208. channel->mRecvBuffer += recvData;
  1209. LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
  1210. channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1211. DataChannelOnMessageAvailable::ON_DATA, this,
  1212. channel, channel->mRecvBuffer, -1));
  1213. channel->mRecvBuffer.Truncate(0);
  1214. return;
  1215. }
  1216. // else send using recvData normally
  1217. length = -1; // Flag for DOMString
  1218. // WebSockets checks IsUTF8() here; we can try to deliver it
  1219. break;
  1220. case DATA_CHANNEL_PPID_BINARY_LAST:
  1221. LOG(("DataChannel: Received binary message of length %lu on channel id %u",
  1222. length, channel->mStream));
  1223. if (!channel->mRecvBuffer.IsEmpty()) {
  1224. channel->mRecvBuffer += recvData;
  1225. LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
  1226. channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1227. DataChannelOnMessageAvailable::ON_DATA, this,
  1228. channel, channel->mRecvBuffer,
  1229. channel->mRecvBuffer.Length()));
  1230. channel->mRecvBuffer.Truncate(0);
  1231. return;
  1232. }
  1233. // else send using recvData normally
  1234. break;
  1235. default:
  1236. NS_ERROR("Unknown data PPID");
  1237. return;
  1238. }
  1239. /* Notify onmessage */
  1240. LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
  1241. channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1242. DataChannelOnMessageAvailable::ON_DATA, this,
  1243. channel, recvData, length));
  1244. }
  1245. }
  1246. // Called with mLock locked!
  1247. void
  1248. DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
  1249. {
  1250. const struct rtcweb_datachannel_open_request *req;
  1251. const struct rtcweb_datachannel_ack *ack;
  1252. mLock.AssertCurrentThreadOwns();
  1253. switch (ppid) {
  1254. case DATA_CHANNEL_PPID_CONTROL:
  1255. req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
  1256. NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
  1257. switch (req->msg_type) {
  1258. case DATA_CHANNEL_OPEN_REQUEST:
  1259. // structure includes a possibly-unused char label[1] (in a packed structure)
  1260. NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
  1261. HandleOpenRequestMessage(req, length, stream);
  1262. break;
  1263. case DATA_CHANNEL_ACK:
  1264. // >= sizeof(*ack) checked above
  1265. ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
  1266. HandleOpenAckMessage(ack, length, stream);
  1267. break;
  1268. default:
  1269. HandleUnknownMessage(ppid, length, stream);
  1270. break;
  1271. }
  1272. break;
  1273. case DATA_CHANNEL_PPID_DOMSTRING:
  1274. case DATA_CHANNEL_PPID_DOMSTRING_LAST:
  1275. case DATA_CHANNEL_PPID_BINARY:
  1276. case DATA_CHANNEL_PPID_BINARY_LAST:
  1277. HandleDataMessage(ppid, buffer, length, stream);
  1278. break;
  1279. default:
  1280. LOG(("Message of length %lu, PPID %u on stream %u received.",
  1281. length, ppid, stream));
  1282. break;
  1283. }
  1284. }
  1285. void
  1286. DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
  1287. {
  1288. uint32_t i, n;
  1289. switch (sac->sac_state) {
  1290. case SCTP_COMM_UP:
  1291. LOG(("Association change: SCTP_COMM_UP"));
  1292. if (mState == CONNECTING) {
  1293. mSocket = mMasterSocket;
  1294. mState = OPEN;
  1295. SetEvenOdd();
  1296. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1297. DataChannelOnMessageAvailable::ON_CONNECTION,
  1298. this)));
  1299. LOG(("DTLS connect() succeeded! Entering connected mode"));
  1300. // Open any streams pending...
  1301. ProcessQueuedOpens();
  1302. } else if (mState == OPEN) {
  1303. LOG(("DataConnection Already OPEN"));
  1304. } else {
  1305. LOG(("Unexpected state: %d", mState));
  1306. }
  1307. break;
  1308. case SCTP_COMM_LOST:
  1309. LOG(("Association change: SCTP_COMM_LOST"));
  1310. // This association is toast, so also close all the channels -- from mainthread!
  1311. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1312. DataChannelOnMessageAvailable::ON_DISCONNECTED,
  1313. this)));
  1314. break;
  1315. case SCTP_RESTART:
  1316. LOG(("Association change: SCTP_RESTART"));
  1317. break;
  1318. case SCTP_SHUTDOWN_COMP:
  1319. LOG(("Association change: SCTP_SHUTDOWN_COMP"));
  1320. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1321. DataChannelOnMessageAvailable::ON_DISCONNECTED,
  1322. this)));
  1323. break;
  1324. case SCTP_CANT_STR_ASSOC:
  1325. LOG(("Association change: SCTP_CANT_STR_ASSOC"));
  1326. break;
  1327. default:
  1328. LOG(("Association change: UNKNOWN"));
  1329. break;
  1330. }
  1331. LOG(("Association change: streams (in/out) = (%u/%u)",
  1332. sac->sac_inbound_streams, sac->sac_outbound_streams));
  1333. NS_ENSURE_TRUE_VOID(sac);
  1334. n = sac->sac_length - sizeof(*sac);
  1335. if (((sac->sac_state == SCTP_COMM_UP) ||
  1336. (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
  1337. for (i = 0; i < n; ++i) {
  1338. switch (sac->sac_info[i]) {
  1339. case SCTP_ASSOC_SUPPORTS_PR:
  1340. LOG(("Supports: PR"));
  1341. break;
  1342. case SCTP_ASSOC_SUPPORTS_AUTH:
  1343. LOG(("Supports: AUTH"));
  1344. break;
  1345. case SCTP_ASSOC_SUPPORTS_ASCONF:
  1346. LOG(("Supports: ASCONF"));
  1347. break;
  1348. case SCTP_ASSOC_SUPPORTS_MULTIBUF:
  1349. LOG(("Supports: MULTIBUF"));
  1350. break;
  1351. case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
  1352. LOG(("Supports: RE-CONFIG"));
  1353. break;
  1354. default:
  1355. LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
  1356. break;
  1357. }
  1358. }
  1359. } else if (((sac->sac_state == SCTP_COMM_LOST) ||
  1360. (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
  1361. LOG(("Association: ABORT ="));
  1362. for (i = 0; i < n; ++i) {
  1363. LOG((" 0x%02x", sac->sac_info[i]));
  1364. }
  1365. }
  1366. if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
  1367. (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
  1368. (sac->sac_state == SCTP_COMM_LOST)) {
  1369. return;
  1370. }
  1371. }
  1372. void
  1373. DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
  1374. {
  1375. const char *addr = "";
  1376. #if !defined(__Userspace_os_Windows)
  1377. char addr_buf[INET6_ADDRSTRLEN];
  1378. struct sockaddr_in *sin;
  1379. struct sockaddr_in6 *sin6;
  1380. #endif
  1381. switch (spc->spc_aaddr.ss_family) {
  1382. case AF_INET:
  1383. #if !defined(__Userspace_os_Windows)
  1384. sin = (struct sockaddr_in *)&spc->spc_aaddr;
  1385. addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
  1386. #endif
  1387. break;
  1388. case AF_INET6:
  1389. #if !defined(__Userspace_os_Windows)
  1390. sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
  1391. addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
  1392. #endif
  1393. break;
  1394. case AF_CONN:
  1395. addr = "DTLS connection";
  1396. break;
  1397. default:
  1398. break;
  1399. }
  1400. LOG(("Peer address %s is now ", addr));
  1401. switch (spc->spc_state) {
  1402. case SCTP_ADDR_AVAILABLE:
  1403. LOG(("SCTP_ADDR_AVAILABLE"));
  1404. break;
  1405. case SCTP_ADDR_UNREACHABLE:
  1406. LOG(("SCTP_ADDR_UNREACHABLE"));
  1407. break;
  1408. case SCTP_ADDR_REMOVED:
  1409. LOG(("SCTP_ADDR_REMOVED"));
  1410. break;
  1411. case SCTP_ADDR_ADDED:
  1412. LOG(("SCTP_ADDR_ADDED"));
  1413. break;
  1414. case SCTP_ADDR_MADE_PRIM:
  1415. LOG(("SCTP_ADDR_MADE_PRIM"));
  1416. break;
  1417. case SCTP_ADDR_CONFIRMED:
  1418. LOG(("SCTP_ADDR_CONFIRMED"));
  1419. break;
  1420. default:
  1421. LOG(("UNKNOWN"));
  1422. break;
  1423. }
  1424. LOG((" (error = 0x%08x).\n", spc->spc_error));
  1425. }
  1426. void
  1427. DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
  1428. {
  1429. size_t i, n;
  1430. n = sre->sre_length - sizeof(struct sctp_remote_error);
  1431. LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
  1432. for (i = 0; i < n; ++i) {
  1433. LOG((" 0x%02x", sre-> sre_data[i]));
  1434. }
  1435. }
  1436. void
  1437. DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
  1438. {
  1439. LOG(("Shutdown event."));
  1440. /* XXX: notify all channels. */
  1441. // Attempts to actually send anything will fail
  1442. }
  1443. void
  1444. DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
  1445. {
  1446. LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
  1447. }
  1448. void
  1449. DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
  1450. {
  1451. size_t i, n;
  1452. if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
  1453. LOG(("Unsent "));
  1454. }
  1455. if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
  1456. LOG(("Sent "));
  1457. }
  1458. if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
  1459. LOG(("(flags = %x) ", ssfe->ssfe_flags));
  1460. }
  1461. LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
  1462. ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
  1463. ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
  1464. n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
  1465. for (i = 0; i < n; ++i) {
  1466. LOG((" 0x%02x", ssfe->ssfe_data[i]));
  1467. }
  1468. }
  1469. void
  1470. DataChannelConnection::ClearResets()
  1471. {
  1472. // Clear all pending resets
  1473. if (!mStreamsResetting.IsEmpty()) {
  1474. LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
  1475. }
  1476. for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
  1477. RefPtr<DataChannel> channel;
  1478. channel = FindChannelByStream(mStreamsResetting[i]);
  1479. if (channel) {
  1480. LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
  1481. mStreams[channel->mStream] = nullptr;
  1482. }
  1483. }
  1484. mStreamsResetting.Clear();
  1485. }
  1486. void
  1487. DataChannelConnection::ResetOutgoingStream(uint16_t stream)
  1488. {
  1489. uint32_t i;
  1490. mLock.AssertCurrentThreadOwns();
  1491. LOG(("Connection %p: Resetting outgoing stream %u",
  1492. (void *) this, stream));
  1493. // Rarely has more than a couple items and only for a short time
  1494. for (i = 0; i < mStreamsResetting.Length(); ++i) {
  1495. if (mStreamsResetting[i] == stream) {
  1496. return;
  1497. }
  1498. }
  1499. mStreamsResetting.AppendElement(stream);
  1500. }
  1501. void
  1502. DataChannelConnection::SendOutgoingStreamReset()
  1503. {
  1504. struct sctp_reset_streams *srs;
  1505. uint32_t i;
  1506. size_t len;
  1507. LOG(("Connection %p: Sending outgoing stream reset for %d streams",
  1508. (void *) this, mStreamsResetting.Length()));
  1509. mLock.AssertCurrentThreadOwns();
  1510. if (mStreamsResetting.IsEmpty()) {
  1511. LOG(("No streams to reset"));
  1512. return;
  1513. }
  1514. len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
  1515. srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
  1516. memset(srs, 0, len);
  1517. srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
  1518. srs->srs_number_streams = mStreamsResetting.Length();
  1519. for (i = 0; i < mStreamsResetting.Length(); ++i) {
  1520. srs->srs_stream_list[i] = mStreamsResetting[i];
  1521. }
  1522. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
  1523. LOG(("***failed: setsockopt RESET, errno %d", errno));
  1524. // if errno == EALREADY, this is normal - we can't send another reset
  1525. // with one pending.
  1526. // When we get an incoming reset (which may be a response to our
  1527. // outstanding one), see if we have any pending outgoing resets and
  1528. // send them
  1529. } else {
  1530. mStreamsResetting.Clear();
  1531. }
  1532. free(srs);
  1533. }
  1534. void
  1535. DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
  1536. {
  1537. uint32_t n, i;
  1538. RefPtr<DataChannel> channel; // since we may null out the ref to the channel
  1539. if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
  1540. !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
  1541. n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
  1542. for (i = 0; i < n; ++i) {
  1543. if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
  1544. channel = FindChannelByStream(strrst->strreset_stream_list[i]);
  1545. if (channel) {
  1546. // The other side closed the channel
  1547. // We could be in three states:
  1548. // 1. Normal state (input and output streams (OPEN)
  1549. // Notify application, send a RESET in response on our
  1550. // outbound channel. Go to CLOSED
  1551. // 2. We sent our own reset (CLOSING); either they crossed on the
  1552. // wire, or this is a response to our Reset.
  1553. // Go to CLOSED
  1554. // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
  1555. // I believe this is impossible, as we don't have an input stream yet.
  1556. LOG(("Incoming: Channel %u closed, state %d",
  1557. channel->mStream, channel->mState));
  1558. ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
  1559. channel->mState == DataChannel::CLOSING ||
  1560. channel->mState == DataChannel::CONNECTING ||
  1561. channel->mState == DataChannel::WAITING_TO_OPEN);
  1562. if (channel->mState == DataChannel::OPEN ||
  1563. channel->mState == DataChannel::WAITING_TO_OPEN) {
  1564. // Mark the stream for reset (the reset is sent below)
  1565. ResetOutgoingStream(channel->mStream);
  1566. }
  1567. mStreams[channel->mStream] = nullptr;
  1568. LOG(("Disconnected DataChannel %p from connection %p",
  1569. (void *) channel.get(), (void *) channel->mConnection.get()));
  1570. // This sends ON_CHANNEL_CLOSED to mainthread
  1571. channel->StreamClosedLocked();
  1572. } else {
  1573. LOG(("Can't find incoming channel %d",i));
  1574. }
  1575. }
  1576. }
  1577. }
  1578. // Process any pending resets now:
  1579. if (!mStreamsResetting.IsEmpty()) {
  1580. LOG(("Sending %d pending resets", mStreamsResetting.Length()));
  1581. SendOutgoingStreamReset();
  1582. }
  1583. }
  1584. void
  1585. DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
  1586. {
  1587. uint16_t stream;
  1588. RefPtr<DataChannel> channel;
  1589. if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
  1590. LOG(("*** Failed increasing number of streams from %u (%u/%u)",
  1591. mStreams.Length(),
  1592. strchg->strchange_instrms,
  1593. strchg->strchange_outstrms));
  1594. // XXX FIX! notify pending opens of failure
  1595. return;
  1596. } else {
  1597. if (strchg->strchange_instrms > mStreams.Length()) {
  1598. LOG(("Other side increased streams from %u to %u",
  1599. mStreams.Length(), strchg->strchange_instrms));
  1600. }
  1601. if (strchg->strchange_outstrms > mStreams.Length() ||
  1602. strchg->strchange_instrms > mStreams.Length()) {
  1603. uint16_t old_len = mStreams.Length();
  1604. uint16_t new_len = std::max(strchg->strchange_outstrms,
  1605. strchg->strchange_instrms);
  1606. LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
  1607. old_len, new_len, new_len - old_len,
  1608. strchg->strchange_instrms));
  1609. // make sure both are the same length
  1610. mStreams.AppendElements(new_len - old_len);
  1611. LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
  1612. for (size_t i = old_len; i < mStreams.Length(); ++i) {
  1613. mStreams[i] = nullptr;
  1614. }
  1615. // Re-process any channels waiting for streams.
  1616. // Linear search, but we don't increase channels often and
  1617. // the array would only get long in case of an app error normally
  1618. // Make sure we request enough streams if there's a big jump in streams
  1619. // Could make a more complex API for OpenXxxFinish() and avoid this loop
  1620. size_t num_needed = mPending.GetSize();
  1621. LOG(("%d of %d new streams already needed", num_needed,
  1622. new_len - old_len));
  1623. num_needed -= (new_len - old_len); // number we added
  1624. if (num_needed > 0) {
  1625. if (num_needed < 16)
  1626. num_needed = 16;
  1627. LOG(("Not enough new streams, asking for %d more", num_needed));
  1628. RequestMoreStreams(num_needed);
  1629. } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
  1630. LOG(("Requesting %d output streams to match partner",
  1631. strchg->strchange_instrms - strchg->strchange_outstrms));
  1632. RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
  1633. }
  1634. ProcessQueuedOpens();
  1635. }
  1636. // else probably not a change in # of streams
  1637. }
  1638. for (uint32_t i = 0; i < mStreams.Length(); ++i) {
  1639. channel = mStreams[i];
  1640. if (!channel)
  1641. continue;
  1642. if ((channel->mState == CONNECTING) &&
  1643. (channel->mStream == INVALID_STREAM)) {
  1644. if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
  1645. (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
  1646. /* XXX: Signal to the other end. */
  1647. channel->mState = CLOSED;
  1648. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1649. DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1650. channel)));
  1651. // maybe fire onError (bug 843625)
  1652. } else {
  1653. stream = FindFreeStream();
  1654. if (stream != INVALID_STREAM) {
  1655. channel->mStream = stream;
  1656. mStreams[stream] = channel;
  1657. channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
  1658. // Note: we're locked, so there's no danger of a race with the
  1659. // buffer-threshold callback
  1660. } else {
  1661. /* We will not find more ... */
  1662. break;
  1663. }
  1664. }
  1665. }
  1666. }
  1667. }
  1668. // Called with mLock locked!
  1669. void
  1670. DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
  1671. {
  1672. mLock.AssertCurrentThreadOwns();
  1673. if (notif->sn_header.sn_length != (uint32_t)n) {
  1674. return;
  1675. }
  1676. switch (notif->sn_header.sn_type) {
  1677. case SCTP_ASSOC_CHANGE:
  1678. HandleAssociationChangeEvent(&(notif->sn_assoc_change));
  1679. break;
  1680. case SCTP_PEER_ADDR_CHANGE:
  1681. HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
  1682. break;
  1683. case SCTP_REMOTE_ERROR:
  1684. HandleRemoteErrorEvent(&(notif->sn_remote_error));
  1685. break;
  1686. case SCTP_SHUTDOWN_EVENT:
  1687. HandleShutdownEvent(&(notif->sn_shutdown_event));
  1688. break;
  1689. case SCTP_ADAPTATION_INDICATION:
  1690. HandleAdaptationIndication(&(notif->sn_adaptation_event));
  1691. break;
  1692. case SCTP_PARTIAL_DELIVERY_EVENT:
  1693. LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
  1694. break;
  1695. case SCTP_AUTHENTICATION_EVENT:
  1696. LOG(("SCTP_AUTHENTICATION_EVENT"));
  1697. break;
  1698. case SCTP_SENDER_DRY_EVENT:
  1699. //LOG(("SCTP_SENDER_DRY_EVENT"));
  1700. break;
  1701. case SCTP_NOTIFICATIONS_STOPPED_EVENT:
  1702. LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
  1703. break;
  1704. case SCTP_SEND_FAILED_EVENT:
  1705. HandleSendFailedEvent(&(notif->sn_send_failed_event));
  1706. break;
  1707. case SCTP_STREAM_RESET_EVENT:
  1708. HandleStreamResetEvent(&(notif->sn_strreset_event));
  1709. break;
  1710. case SCTP_ASSOC_RESET_EVENT:
  1711. LOG(("SCTP_ASSOC_RESET_EVENT"));
  1712. break;
  1713. case SCTP_STREAM_CHANGE_EVENT:
  1714. HandleStreamChangeEvent(&(notif->sn_strchange_event));
  1715. break;
  1716. default:
  1717. LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
  1718. break;
  1719. }
  1720. }
  1721. int
  1722. DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
  1723. struct sctp_rcvinfo rcv, int32_t flags)
  1724. {
  1725. ASSERT_WEBRTC(!NS_IsMainThread());
  1726. if (!data) {
  1727. usrsctp_close(sock); // SCTP has finished shutting down
  1728. } else {
  1729. bool locked = false;
  1730. if (!IsSTSThread()) {
  1731. mLock.Lock();
  1732. locked = true;
  1733. } else {
  1734. mLock.AssertCurrentThreadOwns();
  1735. }
  1736. if (flags & MSG_NOTIFICATION) {
  1737. HandleNotification(static_cast<union sctp_notification *>(data), datalen);
  1738. } else {
  1739. HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
  1740. }
  1741. if (locked) {
  1742. mLock.Unlock();
  1743. }
  1744. }
  1745. // sctp allocates 'data' with malloc(), and expects the receiver to free
  1746. // it (presumably with free).
  1747. // XXX future optimization: try to deliver messages without an internal
  1748. // alloc/copy, and if so delay the free until later.
  1749. free(data);
  1750. // usrsctp defines the callback as returning an int, but doesn't use it
  1751. return 1;
  1752. }
  1753. already_AddRefed<DataChannel>
  1754. DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
  1755. Type type, bool inOrder,
  1756. uint32_t prValue, DataChannelListener *aListener,
  1757. nsISupports *aContext, bool aExternalNegotiated,
  1758. uint16_t aStream)
  1759. {
  1760. // aStream == INVALID_STREAM to have the protocol allocate
  1761. uint16_t prPolicy = SCTP_PR_SCTP_NONE;
  1762. uint32_t flags;
  1763. LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
  1764. PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
  1765. type, inOrder, prValue, aListener, aContext,
  1766. aExternalNegotiated ? "true" : "false", aStream));
  1767. switch (type) {
  1768. case DATA_CHANNEL_RELIABLE:
  1769. prPolicy = SCTP_PR_SCTP_NONE;
  1770. break;
  1771. case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
  1772. prPolicy = SCTP_PR_SCTP_RTX;
  1773. break;
  1774. case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
  1775. prPolicy = SCTP_PR_SCTP_TTL;
  1776. break;
  1777. }
  1778. if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
  1779. return nullptr;
  1780. }
  1781. // Don't look past currently-negotiated streams
  1782. if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
  1783. LOG(("ERROR: external negotiation of already-open channel %u", aStream));
  1784. // XXX How do we indicate this up to the application? Probably the
  1785. // caller's job, but we may need to return an error code.
  1786. return nullptr;
  1787. }
  1788. flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
  1789. RefPtr<DataChannel> channel(new DataChannel(this,
  1790. aStream,
  1791. DataChannel::CONNECTING,
  1792. label, protocol,
  1793. type, prValue,
  1794. flags,
  1795. aListener, aContext));
  1796. if (aExternalNegotiated) {
  1797. channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
  1798. }
  1799. MutexAutoLock lock(mLock); // OpenFinish assumes this
  1800. return OpenFinish(channel.forget());
  1801. }
  1802. // Separate routine so we can also call it to finish up from pending opens
  1803. already_AddRefed<DataChannel>
  1804. DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
  1805. {
  1806. RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
  1807. // Normally 1 reference if called from ::Open(), or 2 if called from
  1808. // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
  1809. uint16_t stream = channel->mStream;
  1810. bool queue = false;
  1811. mLock.AssertCurrentThreadOwns();
  1812. // Cases we care about:
  1813. // Pre-negotiated:
  1814. // Not Open:
  1815. // Doesn't fit:
  1816. // -> change initial ask or renegotiate after open
  1817. // -> queue open
  1818. // Open:
  1819. // Doesn't fit:
  1820. // -> RequestMoreStreams && queue
  1821. // Does fit:
  1822. // -> open
  1823. // Not negotiated:
  1824. // Not Open:
  1825. // -> queue open
  1826. // Open:
  1827. // -> Try to get a stream
  1828. // Doesn't fit:
  1829. // -> RequestMoreStreams && queue
  1830. // Does fit:
  1831. // -> open
  1832. // So the Open cases are basically the same
  1833. // Not Open cases are simply queue for non-negotiated, and
  1834. // either change the initial ask or possibly renegotiate after open.
  1835. if (mState == OPEN) {
  1836. if (stream == INVALID_STREAM) {
  1837. stream = FindFreeStream(); // may be INVALID_STREAM if we need more
  1838. }
  1839. if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
  1840. // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
  1841. // to avoid going back immediately for more if the ask to N, N+1, etc
  1842. int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
  1843. (stream-((int32_t)mStreams.Length())) + 16;
  1844. if (!RequestMoreStreams(more_needed)) {
  1845. // Something bad happened... we're done
  1846. goto request_error_cleanup;
  1847. }
  1848. queue = true;
  1849. }
  1850. } else {
  1851. // not OPEN
  1852. if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
  1853. mState == CLOSED) {
  1854. // Update number of streams for init message
  1855. struct sctp_initmsg initmsg;
  1856. socklen_t len = sizeof(initmsg);
  1857. int32_t total_needed = stream+16;
  1858. memset(&initmsg, 0, sizeof(initmsg));
  1859. if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
  1860. LOG(("*** failed getsockopt SCTP_INITMSG"));
  1861. goto request_error_cleanup;
  1862. }
  1863. LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
  1864. initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
  1865. initmsg.sinit_num_ostreams = total_needed;
  1866. initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
  1867. if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
  1868. (socklen_t)sizeof(initmsg)) < 0) {
  1869. LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
  1870. goto request_error_cleanup;
  1871. }
  1872. int32_t old_len = mStreams.Length();
  1873. mStreams.AppendElements(total_needed - old_len);
  1874. for (int32_t i = old_len; i < total_needed; ++i) {
  1875. mStreams[i] = nullptr;
  1876. }
  1877. }
  1878. // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
  1879. // is called, if needed
  1880. queue = true;
  1881. }
  1882. if (queue) {
  1883. LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
  1884. // Also serves to mark we told the app
  1885. channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
  1886. // we need a ref for the nsDeQue and one to return
  1887. DataChannel* rawChannel = channel;
  1888. rawChannel->AddRef();
  1889. mPending.Push(rawChannel);
  1890. return channel.forget();
  1891. }
  1892. MOZ_ASSERT(stream != INVALID_STREAM);
  1893. // just allocated (& OPEN), or externally negotiated
  1894. mStreams[stream] = channel; // holds a reference
  1895. channel->mStream = stream;
  1896. #ifdef TEST_QUEUED_DATA
  1897. // It's painful to write a test for this...
  1898. channel->mState = OPEN;
  1899. channel->mReady = true;
  1900. SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
  1901. #endif
  1902. if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
  1903. // Don't send unordered until this gets cleared
  1904. channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
  1905. }
  1906. if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
  1907. if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
  1908. stream,
  1909. !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
  1910. channel->mPrPolicy, channel->mPrValue)) {
  1911. LOG(("SendOpenRequest failed, errno = %d", errno));
  1912. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1913. channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
  1914. // Note: we're locked, so there's no danger of a race with the
  1915. // buffer-threshold callback
  1916. return channel.forget();
  1917. } else {
  1918. if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
  1919. // We already returned the channel to the app.
  1920. NS_ERROR("Failed to send open request");
  1921. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1922. DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1923. channel)));
  1924. }
  1925. // If we haven't returned the channel yet, it will get destroyed when we exit
  1926. // this function.
  1927. mStreams[stream] = nullptr;
  1928. channel->mStream = INVALID_STREAM;
  1929. // we'll be destroying the channel
  1930. channel->mState = CLOSED;
  1931. return nullptr;
  1932. }
  1933. /* NOTREACHED */
  1934. }
  1935. }
  1936. // Either externally negotiated or we sent Open
  1937. channel->mState = OPEN;
  1938. channel->mReady = true;
  1939. // FIX? Move into DOMDataChannel? I don't think we can send it yet here
  1940. LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  1941. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1942. DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
  1943. channel)));
  1944. return channel.forget();
  1945. request_error_cleanup:
  1946. channel->mState = CLOSED;
  1947. if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
  1948. // We already returned the channel to the app.
  1949. NS_ERROR("Failed to request more streams");
  1950. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  1951. DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1952. channel)));
  1953. return channel.forget();
  1954. }
  1955. // we'll be destroying the channel, but it never really got set up
  1956. // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
  1957. // Dispatch it to ourselves
  1958. return nullptr;
  1959. }
  1960. int32_t
  1961. DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
  1962. size_t length, uint32_t ppid)
  1963. {
  1964. uint16_t flags;
  1965. struct sctp_sendv_spa spa;
  1966. int32_t result;
  1967. NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
  1968. NS_WARNING_ASSERTION(length > 0, "Length is 0?!");
  1969. // To avoid problems where an in-order OPEN is lost and an
  1970. // out-of-order data message "beats" it, require data to be in-order
  1971. // until we get an ACK.
  1972. if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
  1973. !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
  1974. flags = SCTP_UNORDERED;
  1975. } else {
  1976. flags = 0;
  1977. }
  1978. spa.sendv_sndinfo.snd_ppid = htonl(ppid);
  1979. spa.sendv_sndinfo.snd_sid = channel->mStream;
  1980. spa.sendv_sndinfo.snd_flags = flags;
  1981. spa.sendv_sndinfo.snd_context = 0;
  1982. spa.sendv_sndinfo.snd_assoc_id = 0;
  1983. spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
  1984. if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
  1985. spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
  1986. spa.sendv_prinfo.pr_value = channel->mPrValue;
  1987. spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  1988. }
  1989. // Note: Main-thread IO, but doesn't block!
  1990. // XXX FIX! to deal with heavy overruns of JS trying to pass data in
  1991. // (more than the buffersize) queue data onto another thread to do the
  1992. // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
  1993. // SCTP will return EMSGSIZE if the message is bigger than the buffer
  1994. // size (or EAGAIN if there isn't space)
  1995. // Avoid a race between buffer-full-failure (where we have to add the
  1996. // packet to the buffered-data queue) and the buffer-now-only-half-full
  1997. // callback, which happens on a different thread. Otherwise we might
  1998. // fail here, then before we add it to the queue get the half-full
  1999. // callback, find nothing to do, then on this thread add it to the
  2000. // queue - which would sit there. Also, if we later send more data, it
  2001. // would arrive ahead of the buffered message, but if the buffer ever
  2002. // got to 1/2 full, the message would get sent - but at a semi-random
  2003. // time, after other data it was supposed to be in front of.
  2004. // Must lock before empty check for similar reasons!
  2005. MutexAutoLock lock(mLock);
  2006. if (channel->mBufferedData.IsEmpty()) {
  2007. result = usrsctp_sendv(mSocket, data, length,
  2008. nullptr, 0,
  2009. (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
  2010. SCTP_SENDV_SPA, 0);
  2011. LOG(("Sent buffer (len=%u), result=%d", length, result));
  2012. } else {
  2013. // Fake EAGAIN if we're already buffering data
  2014. result = -1;
  2015. errno = EAGAIN;
  2016. }
  2017. if (result < 0) {
  2018. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  2019. // queue data for resend! And queue any further data for the stream until it is...
  2020. auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc
  2021. channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
  2022. channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
  2023. LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
  2024. return 0;
  2025. }
  2026. LOG(("error %d sending string", errno));
  2027. }
  2028. return result;
  2029. }
  2030. // Handles fragmenting binary messages
  2031. int32_t
  2032. DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
  2033. size_t len,
  2034. uint32_t ppid_partial, uint32_t ppid_final)
  2035. {
  2036. // Since there's a limit on network buffer size and no limits on message
  2037. // size, and we don't want to use EOR mode (multiple writes for a
  2038. // message, but all other streams are blocked until you finish sending
  2039. // this message), we need to add application-level fragmentation of large
  2040. // messages. On a reliable channel, these can be simply rebuilt into a
  2041. // large message. On an unreliable channel, we can't and don't know how
  2042. // long to wait, and there are no retransmissions, and no easy way to
  2043. // tell the user "this part is missing", so on unreliable channels we
  2044. // need to return an error if sending more bytes than the network buffers
  2045. // can hold, and perhaps a lower number.
  2046. // We *really* don't want to do this from main thread! - and SendMsgInternal
  2047. // avoids blocking.
  2048. // This MUST be reliable and in-order for the reassembly to work
  2049. if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
  2050. channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
  2051. !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
  2052. int32_t sent=0;
  2053. uint32_t origlen = len;
  2054. LOG(("Sending binary message length %u in chunks", len));
  2055. // XXX check flags for out-of-order, or force in-order for large binary messages
  2056. while (len > 0) {
  2057. size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
  2058. uint32_t ppid;
  2059. len -= sendlen;
  2060. ppid = len > 0 ? ppid_partial : ppid_final;
  2061. LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
  2062. // Note that these might end up being deferred and queued.
  2063. sent += SendMsgInternal(channel, data, sendlen, ppid);
  2064. data += sendlen;
  2065. }
  2066. LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
  2067. (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
  2068. origlen, sent,
  2069. channel->mBufferedData.Length()));
  2070. return sent;
  2071. }
  2072. NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
  2073. "Sending too-large data on unreliable channel!");
  2074. // This will fail if the message is too large (default 256K)
  2075. return SendMsgInternal(channel, data, len, ppid_final);
  2076. }
  2077. class ReadBlobRunnable : public Runnable {
  2078. public:
  2079. ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
  2080. nsIInputStream* aBlob) :
  2081. mConnection(aConnection),
  2082. mStream(aStream),
  2083. mBlob(aBlob)
  2084. {}
  2085. NS_IMETHOD Run() override {
  2086. // ReadBlob() is responsible to releasing the reference
  2087. DataChannelConnection *self = mConnection;
  2088. self->ReadBlob(mConnection.forget(), mStream, mBlob);
  2089. return NS_OK;
  2090. }
  2091. private:
  2092. // Make sure the Connection doesn't die while there are jobs outstanding.
  2093. // Let it die (if released by PeerConnectionImpl while we're running)
  2094. // when we send our runnable back to MainThread. Then ~DataChannelConnection
  2095. // can send the IOThread to MainThread to die in a runnable, avoiding
  2096. // unsafe event loop recursion. Evil.
  2097. RefPtr<DataChannelConnection> mConnection;
  2098. uint16_t mStream;
  2099. // Use RefCount for preventing the object is deleted when SendBlob returns.
  2100. RefPtr<nsIInputStream> mBlob;
  2101. };
  2102. int32_t
  2103. DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
  2104. {
  2105. DataChannel *channel = mStreams[stream];
  2106. NS_ENSURE_TRUE(channel, 0);
  2107. // Spawn a thread to send the data
  2108. if (!mInternalIOThread) {
  2109. nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
  2110. if (NS_FAILED(res)) {
  2111. return -1;
  2112. }
  2113. }
  2114. mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
  2115. return 0;
  2116. }
  2117. class DataChannelBlobSendRunnable : public Runnable
  2118. {
  2119. public:
  2120. DataChannelBlobSendRunnable(already_AddRefed<DataChannelConnection>& aConnection,
  2121. uint16_t aStream)
  2122. : mConnection(aConnection)
  2123. , mStream(aStream) {}
  2124. ~DataChannelBlobSendRunnable() override
  2125. {
  2126. if (!NS_IsMainThread() && mConnection) {
  2127. MOZ_ASSERT(false);
  2128. // explicitly leak the connection if destroyed off mainthread
  2129. Unused << mConnection.forget().take();
  2130. }
  2131. }
  2132. NS_IMETHOD Run() override
  2133. {
  2134. ASSERT_WEBRTC(NS_IsMainThread());
  2135. mConnection->SendBinaryMsg(mStream, mData);
  2136. mConnection = nullptr;
  2137. return NS_OK;
  2138. }
  2139. // explicitly public so we can avoid allocating twice and copying
  2140. nsCString mData;
  2141. private:
  2142. // Note: we can be destroyed off the target thread, so be careful not to let this
  2143. // get Released()ed on the temp thread!
  2144. RefPtr<DataChannelConnection> mConnection;
  2145. uint16_t mStream;
  2146. };
  2147. void
  2148. DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
  2149. uint16_t aStream, nsIInputStream* aBlob)
  2150. {
  2151. // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
  2152. // it off mainthread; if PeerConnectionImpl has released then we want
  2153. // ~DataChannelConnection() to run on MainThread
  2154. // XXX to do this safely, we must enqueue these atomically onto the
  2155. // output socket. We need a sender thread(s?) to enqueue data into the
  2156. // socket and to avoid main-thread IO that might block. Even on a
  2157. // background thread, we may not want to block on one stream's data.
  2158. // I.e. run non-blocking and service multiple channels.
  2159. // For now as a hack, send as a single blast of queued packets which may
  2160. // be deferred until buffer space is available.
  2161. uint64_t len;
  2162. nsCOMPtr<nsIThread> mainThread;
  2163. NS_GetMainThread(getter_AddRefs(mainThread));
  2164. // Must not let Dispatching it cause the DataChannelConnection to get
  2165. // released on the wrong thread. Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),...
  2166. // will occasionally cause aThis to get released on this thread. Also, an explicit Runnable
  2167. // lets us avoid copying the blob data an extra time.
  2168. RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis,
  2169. aStream);
  2170. // avoid copying the blob data by passing the mData from the runnable
  2171. if (NS_FAILED(aBlob->Available(&len)) ||
  2172. NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, len))) {
  2173. // Bug 966602: Doesn't return an error to the caller via onerror.
  2174. // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
  2175. // aThis is now owned by the runnable; release it there
  2176. NS_ProxyRelease(mainThread, runnable.forget());
  2177. return;
  2178. }
  2179. aBlob->Close();
  2180. NS_DispatchToMainThread(runnable, NS_DISPATCH_NORMAL);
  2181. }
  2182. void
  2183. DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
  2184. {
  2185. ASSERT_WEBRTC(NS_IsMainThread());
  2186. for (uint32_t i = 0; i < mStreams.Length(); ++i) {
  2187. if (mStreams[i]) {
  2188. aStreamList->push_back(mStreams[i]->mStream);
  2189. }
  2190. }
  2191. }
  2192. int32_t
  2193. DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
  2194. bool isBinary)
  2195. {
  2196. ASSERT_WEBRTC(NS_IsMainThread());
  2197. // We really could allow this from other threads, so long as we deal with
  2198. // asynchronosity issues with channels closing, in particular access to
  2199. // mStreams, and issues with the association closing (access to mSocket).
  2200. const char *data = aMsg.BeginReading();
  2201. uint32_t len = aMsg.Length();
  2202. DataChannel *channel;
  2203. LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
  2204. // XXX if we want more efficiency, translate flags once at open time
  2205. channel = mStreams[stream];
  2206. NS_ENSURE_TRUE(channel, 0);
  2207. if (isBinary)
  2208. return SendBinary(channel, data, len,
  2209. DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
  2210. return SendBinary(channel, data, len,
  2211. DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
  2212. }
  2213. void
  2214. DataChannelConnection::Close(DataChannel *aChannel)
  2215. {
  2216. MutexAutoLock lock(mLock);
  2217. CloseInt(aChannel);
  2218. }
  2219. // So we can call Close() with the lock already held
  2220. // Called from someone who holds a ref via ::Close(), or from ~DataChannel
  2221. void
  2222. DataChannelConnection::CloseInt(DataChannel *aChannel)
  2223. {
  2224. MOZ_ASSERT(aChannel);
  2225. RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
  2226. mLock.AssertCurrentThreadOwns();
  2227. LOG(("Connection %p/Channel %p: Closing stream %u",
  2228. channel->mConnection.get(), channel.get(), channel->mStream));
  2229. // re-test since it may have closed before the lock was grabbed
  2230. if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
  2231. LOG(("Channel already closing/closed (%u)", aChannel->mState));
  2232. if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
  2233. // called from CloseAll()
  2234. // we're not going to hang around waiting any more
  2235. mStreams[channel->mStream] = nullptr;
  2236. }
  2237. return;
  2238. }
  2239. aChannel->mBufferedData.Clear();
  2240. if (channel->mStream != INVALID_STREAM) {
  2241. ResetOutgoingStream(channel->mStream);
  2242. if (mState == CLOSED) { // called from CloseAll()
  2243. // Let resets accumulate then send all at once in CloseAll()
  2244. // we're not going to hang around waiting
  2245. mStreams[channel->mStream] = nullptr;
  2246. } else {
  2247. SendOutgoingStreamReset();
  2248. }
  2249. }
  2250. aChannel->mState = CLOSING;
  2251. if (mState == CLOSED) {
  2252. // we're not going to hang around waiting
  2253. channel->StreamClosedLocked();
  2254. }
  2255. // At this point when we leave here, the object is a zombie held alive only by the DOM object
  2256. }
  2257. void DataChannelConnection::CloseAll()
  2258. {
  2259. LOG(("Closing all channels (connection %p)", (void*) this));
  2260. // Don't need to lock here
  2261. // Make sure no more channels will be opened
  2262. {
  2263. MutexAutoLock lock(mLock);
  2264. mState = CLOSED;
  2265. }
  2266. // Close current channels
  2267. // If there are runnables, they hold a strong ref and keep the channel
  2268. // and/or connection alive (even if in a CLOSED state)
  2269. bool closed_some = false;
  2270. for (uint32_t i = 0; i < mStreams.Length(); ++i) {
  2271. if (mStreams[i]) {
  2272. mStreams[i]->Close();
  2273. closed_some = true;
  2274. }
  2275. }
  2276. // Clean up any pending opens for channels
  2277. RefPtr<DataChannel> channel;
  2278. while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
  2279. LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
  2280. channel->Close(); // also releases the ref on each iteration
  2281. closed_some = true;
  2282. }
  2283. // It's more efficient to let the Resets queue in shutdown and then
  2284. // SendOutgoingStreamReset() here.
  2285. if (closed_some) {
  2286. MutexAutoLock lock(mLock);
  2287. SendOutgoingStreamReset();
  2288. }
  2289. }
  2290. DataChannel::~DataChannel()
  2291. {
  2292. // NS_ASSERTION since this is more "I think I caught all the cases that
  2293. // can cause this" than a true kill-the-program assertion. If this is
  2294. // wrong, nothing bad happens. A worst it's a leak.
  2295. NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
  2296. }
  2297. void
  2298. DataChannel::Close()
  2299. {
  2300. if (mConnection) {
  2301. // ensure we don't get deleted
  2302. RefPtr<DataChannelConnection> connection(mConnection);
  2303. connection->Close(this);
  2304. }
  2305. }
  2306. // Used when disconnecting from the DataChannelConnection
  2307. void
  2308. DataChannel::StreamClosedLocked()
  2309. {
  2310. mConnection->mLock.AssertCurrentThreadOwns();
  2311. ENSURE_DATACONNECTION;
  2312. LOG(("Destroying Data channel %u", mStream));
  2313. MOZ_ASSERT_IF(mStream != INVALID_STREAM,
  2314. !mConnection->FindChannelByStream(mStream));
  2315. mStream = INVALID_STREAM;
  2316. mState = CLOSED;
  2317. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  2318. DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED,
  2319. mConnection, this)));
  2320. // We leave mConnection live until the DOM releases us, to avoid races
  2321. }
  2322. void
  2323. DataChannel::ReleaseConnection()
  2324. {
  2325. ASSERT_WEBRTC(NS_IsMainThread());
  2326. mConnection = nullptr;
  2327. }
  2328. void
  2329. DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
  2330. {
  2331. MutexAutoLock mLock(mListenerLock);
  2332. mContext = aContext;
  2333. mListener = aListener;
  2334. }
  2335. // May be called from another (i.e. Main) thread!
  2336. void
  2337. DataChannel::AppReady()
  2338. {
  2339. ENSURE_DATACONNECTION;
  2340. MutexAutoLock lock(mConnection->mLock);
  2341. mReady = true;
  2342. if (mState == WAITING_TO_OPEN) {
  2343. mState = OPEN;
  2344. NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
  2345. DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
  2346. this)));
  2347. for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
  2348. nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
  2349. MOZ_ASSERT(runnable);
  2350. NS_DispatchToMainThread(runnable);
  2351. }
  2352. } else {
  2353. NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
  2354. }
  2355. mQueuedMessages.Clear();
  2356. mQueuedMessages.Compact();
  2357. // We never use it again... We could even allocate the array in the odd
  2358. // cases we need it.
  2359. }
  2360. uint32_t
  2361. DataChannel::GetBufferedAmountLocked() const
  2362. {
  2363. size_t buffered = 0;
  2364. for (auto& buffer : mBufferedData) {
  2365. buffered += buffer->mLength;
  2366. }
  2367. // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
  2368. // amount from the SCTP stack for a single stream. It is on their to-do
  2369. // list, and once we import a stack with support for that, we'll need to
  2370. // add it to what we buffer. Also we'll need to ask for notification of a per-
  2371. // stream buffer-low event and merge that into the handling of buffer-low
  2372. // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
  2373. if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
  2374. buffered = UINT32_MAX;
  2375. }
  2376. return buffered;
  2377. }
  2378. uint32_t
  2379. DataChannel::GetBufferedAmountLowThreshold()
  2380. {
  2381. return mBufferedThreshold;
  2382. }
  2383. // Never fire immediately, as it's defined to fire on transitions, not state
  2384. void
  2385. DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
  2386. {
  2387. mBufferedThreshold = aThreshold;
  2388. }
  2389. // Called with mLock locked!
  2390. void
  2391. DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
  2392. {
  2393. if (!mReady &&
  2394. (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
  2395. mQueuedMessages.AppendElement(aMessage);
  2396. } else {
  2397. NS_DispatchToMainThread(aMessage);
  2398. }
  2399. }
  2400. } // namespace mozilla