123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682 |
- /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this file,
- * You can obtain one at http://mozilla.org/MPL/2.0/. */
- #include <algorithm>
- #include <stdio.h>
- #include <stdlib.h>
- #if !defined(__Userspace_os_Windows)
- #include <arpa/inet.h>
- #endif
- // usrsctp.h expects to have errno definitions prior to its inclusion.
- #include <errno.h>
- #define SCTP_DEBUG 1
- #define SCTP_STDINT_INCLUDE <stdint.h>
- #ifdef _MSC_VER
- // Disable "warning C4200: nonstandard extension used : zero-sized array in
- // struct/union"
- // ...which the third-party file usrsctp.h runs afoul of.
- #pragma warning(push)
- #pragma warning(disable:4200)
- #endif
- #include "usrsctp.h"
- #ifdef _MSC_VER
- #pragma warning(pop)
- #endif
- #include "DataChannelLog.h"
- #include "nsServiceManagerUtils.h"
- #include "nsIObserverService.h"
- #include "nsIObserver.h"
- #include "mozilla/Services.h"
- #include "mozilla/Sprintf.h"
- #include "nsProxyRelease.h"
- #include "nsThread.h"
- #include "nsThreadUtils.h"
- #include "nsAutoPtr.h"
- #include "nsNetUtil.h"
- #include "nsNetCID.h"
- #include "mozilla/StaticPtr.h"
- #include "mozilla/Unused.h"
- #ifdef MOZ_PEERCONNECTION
- #include "mtransport/runnable_utils.h"
- #endif
- #define DATACHANNEL_LOG(args) LOG(args)
- #include "DataChannel.h"
- #include "DataChannelProtocol.h"
- // Let us turn on and off important assertions in non-debug builds
- #ifdef DEBUG
- #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
- #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
- #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
- #endif
- static bool sctp_initialized;
- namespace mozilla {
- LazyLogModule gDataChannelLog("DataChannel");
- static LazyLogModule gSCTPLog("SCTP");
- class DataChannelShutdown : public nsIObserver
- {
- public:
- // This needs to be tied to some form object that is guaranteed to be
- // around (singleton likely) unless we want to shutdown sctp whenever
- // we're not using it (and in which case we'd keep a refcnt'd object
- // ref'd by each DataChannelConnection to release the SCTP usrlib via
- // sctp_finish). Right now, the single instance of this class is
- // owned by the observer service.
- NS_DECL_ISUPPORTS
- DataChannelShutdown() {}
- void Init()
- {
- nsCOMPtr<nsIObserverService> observerService =
- mozilla::services::GetObserverService();
- if (!observerService)
- return;
- nsresult rv = observerService->AddObserver(this,
- "xpcom-will-shutdown",
- false);
- MOZ_ASSERT(rv == NS_OK);
- (void) rv;
- }
- private:
- // The only instance of DataChannelShutdown is owned by the observer
- // service, so there is no need to call RemoveObserver here.
- virtual ~DataChannelShutdown() = default;
- public:
- NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
- const char16_t* aData) override {
- if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
- LOG(("Shutting down SCTP"));
- if (sctp_initialized) {
- usrsctp_finish();
- sctp_initialized = false;
- }
- nsCOMPtr<nsIObserverService> observerService =
- mozilla::services::GetObserverService();
- if (!observerService)
- return NS_ERROR_FAILURE;
- nsresult rv = observerService->RemoveObserver(this,
- "xpcom-will-shutdown");
- MOZ_ASSERT(rv == NS_OK);
- (void) rv;
- }
- return NS_OK;
- }
- };
- NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
- BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
- size_t length) : mLength(length)
- {
- mSpa = new sctp_sendv_spa;
- *mSpa = spa;
- auto *tmp = new char[length]; // infallible malloc!
- memcpy(tmp, data, length);
- mData = tmp;
- }
- BufferedMsg::~BufferedMsg()
- {
- delete mSpa;
- delete mData;
- }
- static int
- receive_cb(struct socket* sock, union sctp_sockstore addr,
- void *data, size_t datalen,
- struct sctp_rcvinfo rcv, int flags, void *ulp_info)
- {
- DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
- return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
- }
- static
- DataChannelConnection *
- GetConnectionFromSocket(struct socket* sock)
- {
- struct sockaddr *addrs = nullptr;
- int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
- if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
- return nullptr;
- }
- // usrsctp_getladdrs() returns the addresses bound to this socket, which
- // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
- // then free the list of addresses once we have the pointer. We only open
- // AF_CONN sockets, and they should all have the sconn_addr set to the
- // pointer that created them, so [0] is as good as any other.
- struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
- DataChannelConnection *connection =
- reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
- usrsctp_freeladdrs(addrs);
- return connection;
- }
- // called when the buffer empties to the threshold value
- static int
- threshold_event(struct socket* sock, uint32_t sb_free)
- {
- DataChannelConnection *connection = GetConnectionFromSocket(sock);
- if (connection) {
- LOG(("SendDeferred()"));
- connection->SendDeferredMessages();
- } else {
- LOG(("Can't find connection for socket %p", sock));
- }
- return 0;
- }
- static void
- debug_printf(const char *format, ...)
- {
- va_list ap;
- char buffer[1024];
- if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
- va_start(ap, format);
- #ifdef _WIN32
- if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
- #else
- if (VsprintfLiteral(buffer, format, ap) > 0) {
- #endif
- PR_LogPrint("%s", buffer);
- }
- va_end(ap);
- }
- }
- DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
- mLock("netwerk::sctp::DataChannelConnection")
- {
- mState = CLOSED;
- mSocket = nullptr;
- mMasterSocket = nullptr;
- mListener = listener;
- mLocalPort = 0;
- mRemotePort = 0;
- LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
- mInternalIOThread = nullptr;
- }
- DataChannelConnection::~DataChannelConnection()
- {
- LOG(("Deleting DataChannelConnection %p", (void *) this));
- // This may die on the MainThread, or on the STS thread
- ASSERT_WEBRTC(mState == CLOSED);
- MOZ_ASSERT(!mMasterSocket);
- MOZ_ASSERT(mPending.GetSize() == 0);
- // Already disconnected from sigslot/mTransportFlow
- // TransportFlows must be released from the STS thread
- if (!IsSTSThread()) {
- ASSERT_WEBRTC(NS_IsMainThread());
- if (mTransportFlow) {
- ASSERT_WEBRTC(mSTS);
- NS_ProxyRelease(mSTS, mTransportFlow.forget());
- }
- if (mInternalIOThread) {
- // Avoid spinning the event thread from here (which if we're mainthread
- // is in the event loop already)
- NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
- &nsIThread::Shutdown),
- NS_DISPATCH_NORMAL);
- }
- } else {
- // on STS, safe to call shutdown
- if (mInternalIOThread) {
- mInternalIOThread->Shutdown();
- }
- }
- }
- void
- DataChannelConnection::Destroy()
- {
- // Though it's probably ok to do this and close the sockets;
- // if we really want it to do true clean shutdowns it can
- // create a dependant Internal object that would remain around
- // until the network shut down the association or timed out.
- LOG(("Destroying DataChannelConnection %p", (void *) this));
- ASSERT_WEBRTC(NS_IsMainThread());
- CloseAll();
- MutexAutoLock lock(mLock);
- // If we had a pending reset, we aren't waiting for it - clear the list so
- // we can deregister this DataChannelConnection without leaking.
- ClearResets();
- MOZ_ASSERT(mSTS);
- ASSERT_WEBRTC(NS_IsMainThread());
- // Must do this in Destroy() since we may then delete this object.
- // Do this before dispatching to create a consistent ordering of calls to
- // the SCTP stack.
- if (mUsingDtls) {
- usrsctp_deregister_address(static_cast<void *>(this));
- LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
- }
- mListener = nullptr;
- // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
- // the usrsctp_close() calls can move back here (and just proxy the
- // disconnect_all())
- RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
- &DataChannelConnection::DestroyOnSTS,
- mSocket, mMasterSocket),
- NS_DISPATCH_NORMAL);
- // These will be released on STS
- mSocket = nullptr;
- mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
- // We can't get any more new callbacks from the SCTP library
- // All existing callbacks have refs to DataChannelConnection
- // nsDOMDataChannel objects have refs to DataChannels that have refs to us
- }
- void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
- struct socket *aSocket)
- {
- if (aSocket && aSocket != aMasterSocket)
- usrsctp_close(aSocket);
- if (aMasterSocket)
- usrsctp_close(aMasterSocket);
- disconnect_all();
- }
- bool
- DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
- {
- struct sctp_initmsg initmsg;
- struct sctp_udpencaps encaps;
- struct sctp_assoc_value av;
- struct sctp_event event;
- socklen_t len;
- uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
- SCTP_PEER_ADDR_CHANGE,
- SCTP_REMOTE_ERROR,
- SCTP_SHUTDOWN_EVENT,
- SCTP_ADAPTATION_INDICATION,
- SCTP_SEND_FAILED_EVENT,
- SCTP_STREAM_RESET_EVENT,
- SCTP_STREAM_CHANGE_EVENT};
- {
- ASSERT_WEBRTC(NS_IsMainThread());
- // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
- if (!sctp_initialized) {
- if (aUsingDtls) {
- LOG(("sctp_init(DTLS)"));
- #ifdef MOZ_PEERCONNECTION
- usrsctp_init(0,
- DataChannelConnection::SctpDtlsOutput,
- debug_printf
- );
- #else
- NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
- #endif
- } else {
- LOG(("sctp_init(%u)", aPort));
- usrsctp_init(aPort,
- nullptr,
- debug_printf
- );
- }
- // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
- if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
- usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
- }
- usrsctp_sysctl_set_sctp_blackhole(2);
- // ECN is currently not supported by the Firefox code
- usrsctp_sysctl_set_sctp_ecn_enable(0);
- // Disabling authentication and dynamic address reconfiguration as neither
- // of them are used for data channel and only result in additional code
- // paths being used.
- usrsctp_sysctl_set_sctp_asconf_enable(0);
- usrsctp_sysctl_set_sctp_auth_enable(0);
- sctp_initialized = true;
- RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
- shutdown->Init();
- }
- }
- // XXX FIX! make this a global we get once
- // Find the STS thread
- nsresult rv;
- mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
- MOZ_ASSERT(NS_SUCCEEDED(rv));
- // Open sctp with a callback
- if ((mMasterSocket = usrsctp_socket(
- aUsingDtls ? AF_CONN : AF_INET,
- SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
- usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
- return false;
- }
- // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
- // in associations for normal IO
- if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
- LOG(("Couldn't set non_blocking on SCTP socket"));
- // We can't handle connect() safely if it will block, not that this will
- // even happen.
- goto error_cleanup;
- }
- // Make sure when we close the socket, make sure it doesn't call us back again!
- // This would cause it try to use an invalid DataChannelConnection pointer
- struct linger l;
- l.l_onoff = 1;
- l.l_linger = 0;
- if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
- (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
- LOG(("Couldn't set SO_LINGER on SCTP socket"));
- // unsafe to allow it to continue if this fails
- goto error_cleanup;
- }
- // XXX Consider disabling this when we add proper SDP negotiation.
- // We may want to leave enabled for supporting 'cloning' of SDP offers, which
- // implies re-use of the same pseudo-port number, or forcing a renegotiation.
- {
- uint32_t on = 1;
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
- (const void *)&on, (socklen_t)sizeof(on)) < 0) {
- LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
- }
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
- (const void *)&on, (socklen_t)sizeof(on)) < 0) {
- LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
- }
- }
- if (!aUsingDtls) {
- memset(&encaps, 0, sizeof(encaps));
- encaps.sue_address.ss_family = AF_INET;
- encaps.sue_port = htons(aPort);
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
- (const void*)&encaps,
- (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
- LOG(("*** failed encaps errno %d", errno));
- goto error_cleanup;
- }
- LOG(("SCTP encapsulation local port %d", aPort));
- }
- av.assoc_id = SCTP_ALL_ASSOC;
- av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
- (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
- LOG(("*** failed enable stream reset errno %d", errno));
- goto error_cleanup;
- }
- /* Enable the events of interest. */
- memset(&event, 0, sizeof(event));
- event.se_assoc_id = SCTP_ALL_ASSOC;
- event.se_on = 1;
- for (unsigned short event_type : event_types) {
- event.se_type = event_type;
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
- LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
- goto error_cleanup;
- }
- }
- // Update number of streams
- mStreams.AppendElements(aNumStreams);
- for (uint32_t i = 0; i < aNumStreams; ++i) {
- mStreams[i] = nullptr;
- }
- memset(&initmsg, 0, sizeof(initmsg));
- len = sizeof(initmsg);
- if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
- LOG(("*** failed getsockopt SCTP_INITMSG"));
- goto error_cleanup;
- }
- LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
- initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
- initmsg.sinit_num_ostreams = aNumStreams;
- initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
- (socklen_t)sizeof(initmsg)) < 0) {
- LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
- goto error_cleanup;
- }
- mSocket = nullptr;
- if (aUsingDtls) {
- mUsingDtls = true;
- usrsctp_register_address(static_cast<void *>(this));
- LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
- } else {
- mUsingDtls = false;
- }
- return true;
- error_cleanup:
- usrsctp_close(mMasterSocket);
- mMasterSocket = nullptr;
- mUsingDtls = false;
- return false;
- }
- #ifdef MOZ_PEERCONNECTION
- void
- DataChannelConnection::SetEvenOdd()
- {
- ASSERT_WEBRTC(IsSTSThread());
- TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
- mTransportFlow->GetLayer(TransportLayerDtls::ID()));
- MOZ_ASSERT(dtls); // DTLS is mandatory
- mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
- }
- bool
- DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
- {
- LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
- NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
- NS_ENSURE_TRUE(aFlow, false);
- mTransportFlow = aFlow;
- mLocalPort = localport;
- mRemotePort = remoteport;
- mState = CONNECTING;
- RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
- &DataChannelConnection::SetSignals),
- NS_DISPATCH_NORMAL);
- return true;
- }
- void
- DataChannelConnection::SetSignals()
- {
- ASSERT_WEBRTC(IsSTSThread());
- ASSERT_WEBRTC(mTransportFlow);
- LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
- mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
- // SignalStateChange() doesn't call you with the initial state
- mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
- CompleteConnect(mTransportFlow, mTransportFlow->state());
- }
- void
- DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
- {
- LOG(("Data transport state: %d", state));
- MutexAutoLock lock(mLock);
- ASSERT_WEBRTC(IsSTSThread());
- // We should abort connection on TS_ERROR.
- // Note however that the association will also fail (perhaps with a delay) and
- // notify us in that way
- if (state != TransportLayer::TS_OPEN || !mMasterSocket)
- return;
- struct sockaddr_conn addr;
- memset(&addr, 0, sizeof(addr));
- addr.sconn_family = AF_CONN;
- #if defined(__Userspace_os_Darwin)
- addr.sconn_len = sizeof(addr);
- #endif
- addr.sconn_port = htons(mLocalPort);
- addr.sconn_addr = static_cast<void *>(this);
- LOG(("Calling usrsctp_bind"));
- int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
- sizeof(addr));
- if (r < 0) {
- LOG(("usrsctp_bind failed: %d", r));
- } else {
- // This is the remote addr
- addr.sconn_port = htons(mRemotePort);
- LOG(("Calling usrsctp_connect"));
- r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
- sizeof(addr));
- if (r >= 0 || errno == EINPROGRESS) {
- struct sctp_paddrparams paddrparams;
- socklen_t opt_len;
- memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
- memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
- opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
- r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
- &paddrparams, &opt_len);
- if (r < 0) {
- LOG(("usrsctp_getsockopt failed: %d", r));
- } else {
- // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280
- paddrparams.spp_pathmtu = 1200; // safe for either
- paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
- paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
- opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
- r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
- &paddrparams, opt_len);
- if (r < 0) {
- LOG(("usrsctp_getsockopt failed: %d", r));
- } else {
- LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu));
- }
- }
- }
- if (r < 0) {
- if (errno == EINPROGRESS) {
- // non-blocking
- return;
- } else {
- LOG(("usrsctp_connect failed: %d", errno));
- mState = CLOSED;
- }
- } else {
- // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
- // This also avoids issues with calling TransportFlow stuff on Mainthread
- return;
- }
- }
- // Note: currently this doesn't actually notify the application
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CONNECTION,
- this)));
- return;
- }
- // Process any pending Opens
- void
- DataChannelConnection::ProcessQueuedOpens()
- {
- // The nsDeque holds channels with an AddRef applied. Another reference
- // (may) be held by the DOMDataChannel, unless it's been GC'd. No other
- // references should exist.
- // Can't copy nsDeque's. Move into temp array since any that fail will
- // go back to mPending
- nsDeque temp;
- DataChannel *temp_channel; // really already_AddRefed<>
- while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
- temp.Push(static_cast<void *>(temp_channel));
- }
- RefPtr<DataChannel> channel;
- // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
- while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
- if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
- LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
- // OpenFinish returns a reference itself, so we need to take it can Release it
- channel = OpenFinish(channel.forget()); // may reset the flag and re-push
- } else {
- NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
- }
- }
- }
- void
- DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
- const unsigned char *data, size_t len)
- {
- if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
- char *buf;
- if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
- PR_LogPrint("%s", buf);
- usrsctp_freedumpbuffer(buf);
- }
- }
- // Pass the data to SCTP
- MutexAutoLock lock(mLock);
- usrsctp_conninput(static_cast<void *>(this), data, len, 0);
- }
- int
- DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
- {
- //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
- int res = 0;
- if (mTransportFlow) {
- res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
- }
- if (release)
- delete [] data;
- return res;
- }
- /* static */
- int
- DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
- uint8_t tos, uint8_t set_df)
- {
- DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
- int res;
- if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
- char *buf;
- if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
- PR_LogPrint("%s", buf);
- usrsctp_freedumpbuffer(buf);
- }
- }
- // We're async proxying even if on the STSThread because this is called
- // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
- // SCTP has an option for Apple, on IP connections only, to release at least
- // one of the locks before calling a packet output routine; with changes to
- // the underlying SCTP stack this might remove the need to use an async proxy.
- if ((false /*peer->IsSTSThread()*/)) {
- res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
- } else {
- auto *data = new unsigned char[length];
- memcpy(data, buffer, length);
- // Commented out since we have to Dispatch SendPacket to avoid deadlock"
- // res = -1;
- // XXX It might be worthwhile to add an assertion against the thread
- // somehow getting into the DataChannel/SCTP code again, as
- // DISPATCH_SYNC is not fully blocking. This may be tricky, as it
- // needs to be a per-thread check, not a global.
- peer->mSTS->Dispatch(WrapRunnable(
- RefPtr<DataChannelConnection>(peer),
- &DataChannelConnection::SendPacket, data, length, true),
- NS_DISPATCH_NORMAL);
- res = 0; // cheat! Packets can always be dropped later anyways
- }
- return res;
- }
- #endif
- #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
- // listen for incoming associations
- // Blocks! - Don't call this from main thread!
- #error This code will not work as-is since SetEvenOdd() runs on Mainthread
- bool
- DataChannelConnection::Listen(unsigned short port)
- {
- struct sockaddr_in addr;
- socklen_t addr_len;
- NS_WARNING_ASSERTION(!NS_IsMainThread(),
- "Blocks, do not call from main thread!!!");
- /* Acting as the 'server' */
- memset((void *)&addr, 0, sizeof(addr));
- #ifdef HAVE_SIN_LEN
- addr.sin_len = sizeof(struct sockaddr_in);
- #endif
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
- LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
- mState = CONNECTING;
- if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
- LOG(("***Failed userspace_bind"));
- return false;
- }
- if (usrsctp_listen(mMasterSocket, 1) < 0) {
- LOG(("***Failed userspace_listen"));
- return false;
- }
- LOG(("Accepting connection"));
- addr_len = 0;
- if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
- LOG(("***Failed accept"));
- return false;
- }
- mState = OPEN;
- struct linger l;
- l.l_onoff = 1;
- l.l_linger = 0;
- if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
- (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
- LOG(("Couldn't set SO_LINGER on SCTP socket"));
- }
- SetEvenOdd();
- // Notify Connection open
- // XXX We need to make sure connection sticks around until the message is delivered
- LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CONNECTION,
- this, (DataChannel *) nullptr)));
- return true;
- }
- // Blocks! - Don't call this from main thread!
- bool
- DataChannelConnection::Connect(const char *addr, unsigned short port)
- {
- struct sockaddr_in addr4;
- struct sockaddr_in6 addr6;
- NS_WARNING_ASSERTION(!NS_IsMainThread(),
- "Blocks, do not call from main thread!!!");
- /* Acting as the connector */
- LOG(("Connecting to %s, port %u", addr, port));
- memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
- memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
- #ifdef HAVE_SIN_LEN
- addr4.sin_len = sizeof(struct sockaddr_in);
- #endif
- #ifdef HAVE_SIN6_LEN
- addr6.sin6_len = sizeof(struct sockaddr_in6);
- #endif
- addr4.sin_family = AF_INET;
- addr6.sin6_family = AF_INET6;
- addr4.sin_port = htons(port);
- addr6.sin6_port = htons(port);
- mState = CONNECTING;
- #if !defined(__Userspace_os_Windows)
- if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
- if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
- LOG(("*** Failed userspace_connect"));
- return false;
- }
- } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
- if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
- LOG(("*** Failed userspace_connect"));
- return false;
- }
- } else {
- LOG(("*** Illegal destination address."));
- }
- #else
- {
- struct sockaddr_storage ss;
- int sslen = sizeof(ss);
- if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
- addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
- if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
- LOG(("*** Failed userspace_connect"));
- return false;
- }
- } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
- addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
- if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
- LOG(("*** Failed userspace_connect"));
- return false;
- }
- } else {
- LOG(("*** Illegal destination address."));
- }
- }
- #endif
- mSocket = mMasterSocket;
- LOG(("connect() succeeded! Entering connected mode"));
- mState = OPEN;
- SetEvenOdd();
- // Notify Connection open
- // XXX We need to make sure connection sticks around until the message is delivered
- LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CONNECTION,
- this, (DataChannel *) nullptr)));
- return true;
- }
- #endif
- DataChannel *
- DataChannelConnection::FindChannelByStream(uint16_t stream)
- {
- return mStreams.SafeElementAt(stream);
- }
- uint16_t
- DataChannelConnection::FindFreeStream()
- {
- uint32_t i, j, limit;
- limit = mStreams.Length();
- if (limit > MAX_NUM_STREAMS)
- limit = MAX_NUM_STREAMS;
- for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
- if (!mStreams[i]) {
- // Verify it's not still in the process of closing
- for (j = 0; j < mStreamsResetting.Length(); ++j) {
- if (mStreamsResetting[j] == i) {
- break;
- }
- }
- if (j == mStreamsResetting.Length())
- break;
- }
- }
- if (i >= limit) {
- return INVALID_STREAM;
- }
- return i;
- }
- bool
- DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
- {
- struct sctp_status status;
- struct sctp_add_streams sas;
- uint32_t outStreamsNeeded;
- socklen_t len;
- if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
- aNeeded = MAX_NUM_STREAMS - mStreams.Length();
- }
- if (aNeeded <= 0) {
- return false;
- }
- len = (socklen_t)sizeof(struct sctp_status);
- if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
- LOG(("***failed: getsockopt SCTP_STATUS"));
- return false;
- }
- outStreamsNeeded = aNeeded; // number to add
- // Note: if multiple channel opens happen when we don't have enough space,
- // we'll call RequestMoreStreams() multiple times
- memset(&sas, 0, sizeof(sas));
- sas.sas_instrms = 0;
- sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
- // Doesn't block, we get an event when it succeeds or fails
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
- (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
- if (errno == EALREADY) {
- LOG(("Already have %u output streams", outStreamsNeeded));
- return true;
- }
- LOG(("***failed: setsockopt ADD errno=%d", errno));
- return false;
- }
- LOG(("Requested %u more streams", outStreamsNeeded));
- // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
- // values are larger than mStreams.Length()
- return true;
- }
- int32_t
- DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
- {
- struct sctp_sndinfo sndinfo;
- // Note: Main-thread IO, but doesn't block
- memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
- sndinfo.snd_sid = stream;
- sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
- if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
- &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
- SCTP_SENDV_SNDINFO, 0) < 0) {
- //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
- return (0);
- }
- return (1);
- }
- int32_t
- DataChannelConnection::SendOpenAckMessage(uint16_t stream)
- {
- struct rtcweb_datachannel_ack ack;
- memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
- ack.msg_type = DATA_CHANNEL_ACK;
- return SendControlMessage(&ack, sizeof(ack), stream);
- }
- int32_t
- DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
- const nsACString& protocol,
- uint16_t stream, bool unordered,
- uint16_t prPolicy, uint32_t prValue)
- {
- const int label_len = label.Length(); // not including nul
- const int proto_len = protocol.Length(); // not including nul
- // careful - request struct include one char for the label
- const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
- label_len + proto_len;
- struct rtcweb_datachannel_open_request *req =
- (struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size);
- memset(req, 0, req_size);
- req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
- switch (prPolicy) {
- case SCTP_PR_SCTP_NONE:
- req->channel_type = DATA_CHANNEL_RELIABLE;
- break;
- case SCTP_PR_SCTP_TTL:
- req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
- break;
- case SCTP_PR_SCTP_RTX:
- req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
- break;
- default:
- // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
- free(req);
- return (0);
- }
- if (unordered) {
- // Per the current types, all differ by 0x80 between ordered and unordered
- req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
- }
- req->reliability_param = htonl(prValue);
- req->priority = htons(0); /* XXX: add support */
- req->label_length = htons(label_len);
- req->protocol_length = htons(proto_len);
- memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
- memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
- int32_t result = SendControlMessage(req, req_size, stream);
- free(req);
- return result;
- }
- // XXX This should use a separate thread (outbound queue) which should
- // select() to know when to *try* to send data to the socket again.
- // Alternatively, it can use a timeout, but that's guaranteed to be wrong
- // (just not sure in what direction). We could re-implement NSPR's
- // PR_POLL_WRITE/etc handling... with a lot of work.
- // Better yet, use the SCTP stack's notifications on buffer state to avoid
- // filling the SCTP's buffers.
- // returns if we're still blocked or not
- bool
- DataChannelConnection::SendDeferredMessages()
- {
- uint32_t i;
- RefPtr<DataChannel> channel; // we may null out the refs to this
- bool still_blocked = false;
- // This may block while something is modifying channels, but should not block for IO
- mLock.AssertCurrentThreadOwns();
- // XXX For total fairness, on a still_blocked we'd start next time at the
- // same index. Sorry, not going to bother for now.
- for (i = 0; i < mStreams.Length(); ++i) {
- channel = mStreams[i];
- if (!channel)
- continue;
- // Only one of these should be set....
- if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
- if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
- channel->mStream,
- channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
- channel->mPrPolicy, channel->mPrValue)) {
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
- channel->mState = OPEN;
- channel->mReady = true;
- LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
- channel)));
- } else {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- still_blocked = true;
- } else {
- // Close the channel, inform the user
- mStreams[channel->mStream] = nullptr;
- channel->mState = CLOSED;
- // Don't need to reset; we didn't open it
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
- channel)));
- }
- }
- }
- if (still_blocked)
- break;
- if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
- if (SendOpenAckMessage(channel->mStream)) {
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
- } else {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- still_blocked = true;
- } else {
- // Close the channel, inform the user
- CloseInt(channel);
- // XXX send error via DataChannelOnMessageAvailable (bug 843625)
- }
- }
- }
- if (still_blocked)
- break;
- if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
- bool failed_send = false;
- int32_t result;
- if (channel->mState == CLOSED || channel->mState == CLOSING) {
- channel->mBufferedData.Clear();
- }
- uint32_t buffered_amount = channel->GetBufferedAmountLocked();
- uint32_t threshold = channel->GetBufferedAmountLowThreshold();
- bool was_over_threshold = buffered_amount >= threshold;
- while (!channel->mBufferedData.IsEmpty() &&
- !failed_send) {
- struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
- const char *data = channel->mBufferedData[0]->mData;
- size_t len = channel->mBufferedData[0]->mLength;
- // SCTP will return EMSGSIZE if the message is bigger than the buffer
- // size (or EAGAIN if there isn't space)
- if ((result = usrsctp_sendv(mSocket, data, len,
- nullptr, 0,
- (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
- SCTP_SENDV_SPA,
- 0)) < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- // leave queued for resend
- failed_send = true;
- LOG(("queue full again when resending %d bytes (%d)", len, result));
- } else {
- LOG(("error %d re-sending string", errno));
- failed_send = true;
- }
- } else {
- LOG(("Resent buffer of %d bytes (%d)", len, result));
- // In theory this could underflow if >4GB was buffered and re
- // truncated in GetBufferedAmount(), but this won't cause any problems.
- buffered_amount -= channel->mBufferedData[0]->mLength;
- channel->mBufferedData.RemoveElementAt(0);
- // can never fire with default threshold of 0
- if (was_over_threshold && buffered_amount < threshold) {
- LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
- channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
- this, channel)));
- was_over_threshold = false;
- }
- if (buffered_amount == 0) {
- // buffered-to-not-buffered transition; tell the DOM code in case this makes it
- // available for GC
- LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
- channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
- this, channel)));
- }
- }
- }
- if (channel->mBufferedData.IsEmpty())
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
- else
- still_blocked = true;
- }
- if (still_blocked)
- break;
- }
- return still_blocked;
- }
- void
- DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
- size_t length,
- uint16_t stream)
- {
- RefPtr<DataChannel> channel;
- uint32_t prValue;
- uint16_t prPolicy;
- uint32_t flags;
- mLock.AssertCurrentThreadOwns();
- if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
- LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
- (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
- if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
- return;
- }
- LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
- switch (req->channel_type) {
- case DATA_CHANNEL_RELIABLE:
- case DATA_CHANNEL_RELIABLE_UNORDERED:
- prPolicy = SCTP_PR_SCTP_NONE;
- break;
- case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
- case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
- prPolicy = SCTP_PR_SCTP_RTX;
- break;
- case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
- case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
- prPolicy = SCTP_PR_SCTP_TTL;
- break;
- default:
- LOG(("Unknown channel type", req->channel_type));
- /* XXX error handling */
- return;
- }
- prValue = ntohl(req->reliability_param);
- flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
- if ((channel = FindChannelByStream(stream))) {
- if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
- LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
- stream, channel->mState));
- /* XXX: some error handling */
- } else {
- LOG(("Open for externally negotiated channel %u", stream));
- // XXX should also check protocol, maybe label
- if (prPolicy != channel->mPrPolicy ||
- prValue != channel->mPrValue ||
- flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
- {
- LOG(("WARNING: external negotiation mismatch with OpenRequest:"
- "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
- stream, prPolicy, channel->mPrPolicy,
- prValue, channel->mPrValue, flags, channel->mFlags));
- }
- }
- return;
- }
- if (stream >= mStreams.Length()) {
- LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
- return;
- }
- nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
- nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
- ntohs(req->protocol_length)));
- channel = new DataChannel(this,
- stream,
- DataChannel::CONNECTING,
- label,
- protocol,
- prPolicy, prValue,
- flags,
- nullptr, nullptr);
- mStreams[stream] = channel;
- channel->mState = DataChannel::WAITING_TO_OPEN;
- LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
- channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
- this, channel)));
- LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
- if (!SendOpenAckMessage(stream)) {
- // XXX Only on EAGAIN!? And if not, then close the channel??
- channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
- // Note: we're locked, so there's no danger of a race with the
- // buffer-threshold callback
- }
- // Now process any queued data messages for the channel (which will
- // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
- // more that come in before that happens)
- DeliverQueuedData(stream);
- }
- // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
- // That would make this code moot. Keep it for now for backwards compatibility.
- void
- DataChannelConnection::DeliverQueuedData(uint16_t stream)
- {
- mLock.AssertCurrentThreadOwns();
- uint32_t i = 0;
- while (i < mQueuedData.Length()) {
- // Careful! we may modify the array length from within the loop!
- if (mQueuedData[i]->mStream == stream) {
- LOG(("Delivering queued data for stream %u, length %u",
- stream, (unsigned int) mQueuedData[i]->mLength));
- // Deliver the queued data
- HandleDataMessage(mQueuedData[i]->mPpid,
- mQueuedData[i]->mData, mQueuedData[i]->mLength,
- mQueuedData[i]->mStream);
- mQueuedData.RemoveElementAt(i);
- continue; // don't bump index since we removed the element
- }
- i++;
- }
- }
- void
- DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
- size_t length, uint16_t stream)
- {
- DataChannel *channel;
- mLock.AssertCurrentThreadOwns();
- channel = FindChannelByStream(stream);
- NS_ENSURE_TRUE_VOID(channel);
- LOG(("OpenAck received for stream %u, waiting=%d", stream,
- (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
- channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
- }
- void
- DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
- {
- /* XXX: Send an error message? */
- LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
- // XXX Log to JS error console if possible
- }
- void
- DataChannelConnection::HandleDataMessage(uint32_t ppid,
- const void *data, size_t length,
- uint16_t stream)
- {
- DataChannel *channel;
- const char *buffer = (const char *) data;
- mLock.AssertCurrentThreadOwns();
- channel = FindChannelByStream(stream);
- // XXX A closed channel may trip this... check
- // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
- // That would make this code moot. Keep it for now for backwards compatibility.
- if (!channel) {
- // In the updated 0-RTT open case, the sender can send data immediately
- // after Open, and doesn't set the in-order bit (since we don't have a
- // response or ack). Also, with external negotiation, data can come in
- // before we're told about the external negotiation. We need to buffer
- // data until either a) Open comes in, if the ordering get messed up,
- // or b) the app tells us this channel was externally negotiated. When
- // these occur, we deliver the data.
- // Since this is rare and non-performance, keep a single list of queued
- // data messages to deliver once the channel opens.
- LOG(("Queuing data for stream %u, length %u", stream, length));
- // Copies data
- mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
- return;
- }
- // XXX should this be a simple if, no warnings/debugbreaks?
- NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
- {
- nsAutoCString recvData(buffer, length); // copies (<64) or allocates
- bool is_binary = true;
- if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
- ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
- is_binary = false;
- }
- if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
- NS_WARNING("DataChannel message aborted by fragment type change!");
- channel->mRecvBuffer.Truncate(0);
- }
- channel->mIsRecvBinary = is_binary;
- switch (ppid) {
- case DATA_CHANNEL_PPID_DOMSTRING:
- case DATA_CHANNEL_PPID_BINARY:
- channel->mRecvBuffer += recvData;
- LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
- is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
- channel->mStream));
- return; // Not ready to notify application
- case DATA_CHANNEL_PPID_DOMSTRING_LAST:
- LOG(("DataChannel: String message received of length %lu on channel %u",
- length, channel->mStream));
- if (!channel->mRecvBuffer.IsEmpty()) {
- channel->mRecvBuffer += recvData;
- LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
- channel->SendOrQueue(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DATA, this,
- channel, channel->mRecvBuffer, -1));
- channel->mRecvBuffer.Truncate(0);
- return;
- }
- // else send using recvData normally
- length = -1; // Flag for DOMString
- // WebSockets checks IsUTF8() here; we can try to deliver it
- break;
- case DATA_CHANNEL_PPID_BINARY_LAST:
- LOG(("DataChannel: Received binary message of length %lu on channel id %u",
- length, channel->mStream));
- if (!channel->mRecvBuffer.IsEmpty()) {
- channel->mRecvBuffer += recvData;
- LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
- channel->SendOrQueue(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DATA, this,
- channel, channel->mRecvBuffer,
- channel->mRecvBuffer.Length()));
- channel->mRecvBuffer.Truncate(0);
- return;
- }
- // else send using recvData normally
- break;
- default:
- NS_ERROR("Unknown data PPID");
- return;
- }
- /* Notify onmessage */
- LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
- channel->SendOrQueue(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DATA, this,
- channel, recvData, length));
- }
- }
- // Called with mLock locked!
- void
- DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
- {
- const struct rtcweb_datachannel_open_request *req;
- const struct rtcweb_datachannel_ack *ack;
- mLock.AssertCurrentThreadOwns();
- switch (ppid) {
- case DATA_CHANNEL_PPID_CONTROL:
- req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
- NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
- switch (req->msg_type) {
- case DATA_CHANNEL_OPEN_REQUEST:
- // structure includes a possibly-unused char label[1] (in a packed structure)
- NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
- HandleOpenRequestMessage(req, length, stream);
- break;
- case DATA_CHANNEL_ACK:
- // >= sizeof(*ack) checked above
- ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
- HandleOpenAckMessage(ack, length, stream);
- break;
- default:
- HandleUnknownMessage(ppid, length, stream);
- break;
- }
- break;
- case DATA_CHANNEL_PPID_DOMSTRING:
- case DATA_CHANNEL_PPID_DOMSTRING_LAST:
- case DATA_CHANNEL_PPID_BINARY:
- case DATA_CHANNEL_PPID_BINARY_LAST:
- HandleDataMessage(ppid, buffer, length, stream);
- break;
- default:
- LOG(("Message of length %lu, PPID %u on stream %u received.",
- length, ppid, stream));
- break;
- }
- }
- void
- DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
- {
- uint32_t i, n;
- switch (sac->sac_state) {
- case SCTP_COMM_UP:
- LOG(("Association change: SCTP_COMM_UP"));
- if (mState == CONNECTING) {
- mSocket = mMasterSocket;
- mState = OPEN;
- SetEvenOdd();
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CONNECTION,
- this)));
- LOG(("DTLS connect() succeeded! Entering connected mode"));
- // Open any streams pending...
- ProcessQueuedOpens();
- } else if (mState == OPEN) {
- LOG(("DataConnection Already OPEN"));
- } else {
- LOG(("Unexpected state: %d", mState));
- }
- break;
- case SCTP_COMM_LOST:
- LOG(("Association change: SCTP_COMM_LOST"));
- // This association is toast, so also close all the channels -- from mainthread!
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DISCONNECTED,
- this)));
- break;
- case SCTP_RESTART:
- LOG(("Association change: SCTP_RESTART"));
- break;
- case SCTP_SHUTDOWN_COMP:
- LOG(("Association change: SCTP_SHUTDOWN_COMP"));
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_DISCONNECTED,
- this)));
- break;
- case SCTP_CANT_STR_ASSOC:
- LOG(("Association change: SCTP_CANT_STR_ASSOC"));
- break;
- default:
- LOG(("Association change: UNKNOWN"));
- break;
- }
- LOG(("Association change: streams (in/out) = (%u/%u)",
- sac->sac_inbound_streams, sac->sac_outbound_streams));
- NS_ENSURE_TRUE_VOID(sac);
- n = sac->sac_length - sizeof(*sac);
- if (((sac->sac_state == SCTP_COMM_UP) ||
- (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
- for (i = 0; i < n; ++i) {
- switch (sac->sac_info[i]) {
- case SCTP_ASSOC_SUPPORTS_PR:
- LOG(("Supports: PR"));
- break;
- case SCTP_ASSOC_SUPPORTS_AUTH:
- LOG(("Supports: AUTH"));
- break;
- case SCTP_ASSOC_SUPPORTS_ASCONF:
- LOG(("Supports: ASCONF"));
- break;
- case SCTP_ASSOC_SUPPORTS_MULTIBUF:
- LOG(("Supports: MULTIBUF"));
- break;
- case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
- LOG(("Supports: RE-CONFIG"));
- break;
- default:
- LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
- break;
- }
- }
- } else if (((sac->sac_state == SCTP_COMM_LOST) ||
- (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
- LOG(("Association: ABORT ="));
- for (i = 0; i < n; ++i) {
- LOG((" 0x%02x", sac->sac_info[i]));
- }
- }
- if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
- (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
- (sac->sac_state == SCTP_COMM_LOST)) {
- return;
- }
- }
- void
- DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
- {
- const char *addr = "";
- #if !defined(__Userspace_os_Windows)
- char addr_buf[INET6_ADDRSTRLEN];
- struct sockaddr_in *sin;
- struct sockaddr_in6 *sin6;
- #endif
- switch (spc->spc_aaddr.ss_family) {
- case AF_INET:
- #if !defined(__Userspace_os_Windows)
- sin = (struct sockaddr_in *)&spc->spc_aaddr;
- addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
- #endif
- break;
- case AF_INET6:
- #if !defined(__Userspace_os_Windows)
- sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
- addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
- #endif
- break;
- case AF_CONN:
- addr = "DTLS connection";
- break;
- default:
- break;
- }
- LOG(("Peer address %s is now ", addr));
- switch (spc->spc_state) {
- case SCTP_ADDR_AVAILABLE:
- LOG(("SCTP_ADDR_AVAILABLE"));
- break;
- case SCTP_ADDR_UNREACHABLE:
- LOG(("SCTP_ADDR_UNREACHABLE"));
- break;
- case SCTP_ADDR_REMOVED:
- LOG(("SCTP_ADDR_REMOVED"));
- break;
- case SCTP_ADDR_ADDED:
- LOG(("SCTP_ADDR_ADDED"));
- break;
- case SCTP_ADDR_MADE_PRIM:
- LOG(("SCTP_ADDR_MADE_PRIM"));
- break;
- case SCTP_ADDR_CONFIRMED:
- LOG(("SCTP_ADDR_CONFIRMED"));
- break;
- default:
- LOG(("UNKNOWN"));
- break;
- }
- LOG((" (error = 0x%08x).\n", spc->spc_error));
- }
- void
- DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
- {
- size_t i, n;
- n = sre->sre_length - sizeof(struct sctp_remote_error);
- LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
- for (i = 0; i < n; ++i) {
- LOG((" 0x%02x", sre-> sre_data[i]));
- }
- }
- void
- DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
- {
- LOG(("Shutdown event."));
- /* XXX: notify all channels. */
- // Attempts to actually send anything will fail
- }
- void
- DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
- {
- LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
- }
- void
- DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
- {
- size_t i, n;
- if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
- LOG(("Unsent "));
- }
- if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
- LOG(("Sent "));
- }
- if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
- LOG(("(flags = %x) ", ssfe->ssfe_flags));
- }
- LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
- ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
- ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
- n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
- for (i = 0; i < n; ++i) {
- LOG((" 0x%02x", ssfe->ssfe_data[i]));
- }
- }
- void
- DataChannelConnection::ClearResets()
- {
- // Clear all pending resets
- if (!mStreamsResetting.IsEmpty()) {
- LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
- }
- for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
- RefPtr<DataChannel> channel;
- channel = FindChannelByStream(mStreamsResetting[i]);
- if (channel) {
- LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
- mStreams[channel->mStream] = nullptr;
- }
- }
- mStreamsResetting.Clear();
- }
- void
- DataChannelConnection::ResetOutgoingStream(uint16_t stream)
- {
- uint32_t i;
- mLock.AssertCurrentThreadOwns();
- LOG(("Connection %p: Resetting outgoing stream %u",
- (void *) this, stream));
- // Rarely has more than a couple items and only for a short time
- for (i = 0; i < mStreamsResetting.Length(); ++i) {
- if (mStreamsResetting[i] == stream) {
- return;
- }
- }
- mStreamsResetting.AppendElement(stream);
- }
- void
- DataChannelConnection::SendOutgoingStreamReset()
- {
- struct sctp_reset_streams *srs;
- uint32_t i;
- size_t len;
- LOG(("Connection %p: Sending outgoing stream reset for %d streams",
- (void *) this, mStreamsResetting.Length()));
- mLock.AssertCurrentThreadOwns();
- if (mStreamsResetting.IsEmpty()) {
- LOG(("No streams to reset"));
- return;
- }
- len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
- srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
- memset(srs, 0, len);
- srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
- srs->srs_number_streams = mStreamsResetting.Length();
- for (i = 0; i < mStreamsResetting.Length(); ++i) {
- srs->srs_stream_list[i] = mStreamsResetting[i];
- }
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
- LOG(("***failed: setsockopt RESET, errno %d", errno));
- // if errno == EALREADY, this is normal - we can't send another reset
- // with one pending.
- // When we get an incoming reset (which may be a response to our
- // outstanding one), see if we have any pending outgoing resets and
- // send them
- } else {
- mStreamsResetting.Clear();
- }
- free(srs);
- }
- void
- DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
- {
- uint32_t n, i;
- RefPtr<DataChannel> channel; // since we may null out the ref to the channel
- if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
- !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
- n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
- for (i = 0; i < n; ++i) {
- if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
- channel = FindChannelByStream(strrst->strreset_stream_list[i]);
- if (channel) {
- // The other side closed the channel
- // We could be in three states:
- // 1. Normal state (input and output streams (OPEN)
- // Notify application, send a RESET in response on our
- // outbound channel. Go to CLOSED
- // 2. We sent our own reset (CLOSING); either they crossed on the
- // wire, or this is a response to our Reset.
- // Go to CLOSED
- // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
- // I believe this is impossible, as we don't have an input stream yet.
- LOG(("Incoming: Channel %u closed, state %d",
- channel->mStream, channel->mState));
- ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
- channel->mState == DataChannel::CLOSING ||
- channel->mState == DataChannel::CONNECTING ||
- channel->mState == DataChannel::WAITING_TO_OPEN);
- if (channel->mState == DataChannel::OPEN ||
- channel->mState == DataChannel::WAITING_TO_OPEN) {
- // Mark the stream for reset (the reset is sent below)
- ResetOutgoingStream(channel->mStream);
- }
- mStreams[channel->mStream] = nullptr;
- LOG(("Disconnected DataChannel %p from connection %p",
- (void *) channel.get(), (void *) channel->mConnection.get()));
- // This sends ON_CHANNEL_CLOSED to mainthread
- channel->StreamClosedLocked();
- } else {
- LOG(("Can't find incoming channel %d",i));
- }
- }
- }
- }
- // Process any pending resets now:
- if (!mStreamsResetting.IsEmpty()) {
- LOG(("Sending %d pending resets", mStreamsResetting.Length()));
- SendOutgoingStreamReset();
- }
- }
- void
- DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
- {
- uint16_t stream;
- RefPtr<DataChannel> channel;
- if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
- LOG(("*** Failed increasing number of streams from %u (%u/%u)",
- mStreams.Length(),
- strchg->strchange_instrms,
- strchg->strchange_outstrms));
- // XXX FIX! notify pending opens of failure
- return;
- } else {
- if (strchg->strchange_instrms > mStreams.Length()) {
- LOG(("Other side increased streams from %u to %u",
- mStreams.Length(), strchg->strchange_instrms));
- }
- if (strchg->strchange_outstrms > mStreams.Length() ||
- strchg->strchange_instrms > mStreams.Length()) {
- uint16_t old_len = mStreams.Length();
- uint16_t new_len = std::max(strchg->strchange_outstrms,
- strchg->strchange_instrms);
- LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
- old_len, new_len, new_len - old_len,
- strchg->strchange_instrms));
- // make sure both are the same length
- mStreams.AppendElements(new_len - old_len);
- LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
- for (size_t i = old_len; i < mStreams.Length(); ++i) {
- mStreams[i] = nullptr;
- }
- // Re-process any channels waiting for streams.
- // Linear search, but we don't increase channels often and
- // the array would only get long in case of an app error normally
- // Make sure we request enough streams if there's a big jump in streams
- // Could make a more complex API for OpenXxxFinish() and avoid this loop
- size_t num_needed = mPending.GetSize();
- LOG(("%d of %d new streams already needed", num_needed,
- new_len - old_len));
- num_needed -= (new_len - old_len); // number we added
- if (num_needed > 0) {
- if (num_needed < 16)
- num_needed = 16;
- LOG(("Not enough new streams, asking for %d more", num_needed));
- RequestMoreStreams(num_needed);
- } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
- LOG(("Requesting %d output streams to match partner",
- strchg->strchange_instrms - strchg->strchange_outstrms));
- RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
- }
- ProcessQueuedOpens();
- }
- // else probably not a change in # of streams
- }
- for (uint32_t i = 0; i < mStreams.Length(); ++i) {
- channel = mStreams[i];
- if (!channel)
- continue;
- if ((channel->mState == CONNECTING) &&
- (channel->mStream == INVALID_STREAM)) {
- if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
- (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
- /* XXX: Signal to the other end. */
- channel->mState = CLOSED;
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
- channel)));
- // maybe fire onError (bug 843625)
- } else {
- stream = FindFreeStream();
- if (stream != INVALID_STREAM) {
- channel->mStream = stream;
- mStreams[stream] = channel;
- channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
- // Note: we're locked, so there's no danger of a race with the
- // buffer-threshold callback
- } else {
- /* We will not find more ... */
- break;
- }
- }
- }
- }
- }
- // Called with mLock locked!
- void
- DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
- {
- mLock.AssertCurrentThreadOwns();
- if (notif->sn_header.sn_length != (uint32_t)n) {
- return;
- }
- switch (notif->sn_header.sn_type) {
- case SCTP_ASSOC_CHANGE:
- HandleAssociationChangeEvent(&(notif->sn_assoc_change));
- break;
- case SCTP_PEER_ADDR_CHANGE:
- HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
- break;
- case SCTP_REMOTE_ERROR:
- HandleRemoteErrorEvent(&(notif->sn_remote_error));
- break;
- case SCTP_SHUTDOWN_EVENT:
- HandleShutdownEvent(&(notif->sn_shutdown_event));
- break;
- case SCTP_ADAPTATION_INDICATION:
- HandleAdaptationIndication(&(notif->sn_adaptation_event));
- break;
- case SCTP_PARTIAL_DELIVERY_EVENT:
- LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
- break;
- case SCTP_AUTHENTICATION_EVENT:
- LOG(("SCTP_AUTHENTICATION_EVENT"));
- break;
- case SCTP_SENDER_DRY_EVENT:
- //LOG(("SCTP_SENDER_DRY_EVENT"));
- break;
- case SCTP_NOTIFICATIONS_STOPPED_EVENT:
- LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
- break;
- case SCTP_SEND_FAILED_EVENT:
- HandleSendFailedEvent(&(notif->sn_send_failed_event));
- break;
- case SCTP_STREAM_RESET_EVENT:
- HandleStreamResetEvent(&(notif->sn_strreset_event));
- break;
- case SCTP_ASSOC_RESET_EVENT:
- LOG(("SCTP_ASSOC_RESET_EVENT"));
- break;
- case SCTP_STREAM_CHANGE_EVENT:
- HandleStreamChangeEvent(&(notif->sn_strchange_event));
- break;
- default:
- LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
- break;
- }
- }
- int
- DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
- struct sctp_rcvinfo rcv, int32_t flags)
- {
- ASSERT_WEBRTC(!NS_IsMainThread());
- if (!data) {
- usrsctp_close(sock); // SCTP has finished shutting down
- } else {
- bool locked = false;
- if (!IsSTSThread()) {
- mLock.Lock();
- locked = true;
- } else {
- mLock.AssertCurrentThreadOwns();
- }
- if (flags & MSG_NOTIFICATION) {
- HandleNotification(static_cast<union sctp_notification *>(data), datalen);
- } else {
- HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
- }
- if (locked) {
- mLock.Unlock();
- }
- }
- // sctp allocates 'data' with malloc(), and expects the receiver to free
- // it (presumably with free).
- // XXX future optimization: try to deliver messages without an internal
- // alloc/copy, and if so delay the free until later.
- free(data);
- // usrsctp defines the callback as returning an int, but doesn't use it
- return 1;
- }
- already_AddRefed<DataChannel>
- DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
- Type type, bool inOrder,
- uint32_t prValue, DataChannelListener *aListener,
- nsISupports *aContext, bool aExternalNegotiated,
- uint16_t aStream)
- {
- // aStream == INVALID_STREAM to have the protocol allocate
- uint16_t prPolicy = SCTP_PR_SCTP_NONE;
- uint32_t flags;
- LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
- PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
- type, inOrder, prValue, aListener, aContext,
- aExternalNegotiated ? "true" : "false", aStream));
- switch (type) {
- case DATA_CHANNEL_RELIABLE:
- prPolicy = SCTP_PR_SCTP_NONE;
- break;
- case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
- prPolicy = SCTP_PR_SCTP_RTX;
- break;
- case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
- prPolicy = SCTP_PR_SCTP_TTL;
- break;
- }
- if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
- return nullptr;
- }
- // Don't look past currently-negotiated streams
- if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
- LOG(("ERROR: external negotiation of already-open channel %u", aStream));
- // XXX How do we indicate this up to the application? Probably the
- // caller's job, but we may need to return an error code.
- return nullptr;
- }
- flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
- RefPtr<DataChannel> channel(new DataChannel(this,
- aStream,
- DataChannel::CONNECTING,
- label, protocol,
- type, prValue,
- flags,
- aListener, aContext));
- if (aExternalNegotiated) {
- channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
- }
- MutexAutoLock lock(mLock); // OpenFinish assumes this
- return OpenFinish(channel.forget());
- }
- // Separate routine so we can also call it to finish up from pending opens
- already_AddRefed<DataChannel>
- DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
- {
- RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
- // Normally 1 reference if called from ::Open(), or 2 if called from
- // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
- uint16_t stream = channel->mStream;
- bool queue = false;
- mLock.AssertCurrentThreadOwns();
- // Cases we care about:
- // Pre-negotiated:
- // Not Open:
- // Doesn't fit:
- // -> change initial ask or renegotiate after open
- // -> queue open
- // Open:
- // Doesn't fit:
- // -> RequestMoreStreams && queue
- // Does fit:
- // -> open
- // Not negotiated:
- // Not Open:
- // -> queue open
- // Open:
- // -> Try to get a stream
- // Doesn't fit:
- // -> RequestMoreStreams && queue
- // Does fit:
- // -> open
- // So the Open cases are basically the same
- // Not Open cases are simply queue for non-negotiated, and
- // either change the initial ask or possibly renegotiate after open.
- if (mState == OPEN) {
- if (stream == INVALID_STREAM) {
- stream = FindFreeStream(); // may be INVALID_STREAM if we need more
- }
- if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
- // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
- // to avoid going back immediately for more if the ask to N, N+1, etc
- int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
- (stream-((int32_t)mStreams.Length())) + 16;
- if (!RequestMoreStreams(more_needed)) {
- // Something bad happened... we're done
- goto request_error_cleanup;
- }
- queue = true;
- }
- } else {
- // not OPEN
- if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
- mState == CLOSED) {
- // Update number of streams for init message
- struct sctp_initmsg initmsg;
- socklen_t len = sizeof(initmsg);
- int32_t total_needed = stream+16;
- memset(&initmsg, 0, sizeof(initmsg));
- if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
- LOG(("*** failed getsockopt SCTP_INITMSG"));
- goto request_error_cleanup;
- }
- LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
- initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
- initmsg.sinit_num_ostreams = total_needed;
- initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
- if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
- (socklen_t)sizeof(initmsg)) < 0) {
- LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
- goto request_error_cleanup;
- }
- int32_t old_len = mStreams.Length();
- mStreams.AppendElements(total_needed - old_len);
- for (int32_t i = old_len; i < total_needed; ++i) {
- mStreams[i] = nullptr;
- }
- }
- // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
- // is called, if needed
- queue = true;
- }
- if (queue) {
- LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
- // Also serves to mark we told the app
- channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
- // we need a ref for the nsDeQue and one to return
- DataChannel* rawChannel = channel;
- rawChannel->AddRef();
- mPending.Push(rawChannel);
- return channel.forget();
- }
- MOZ_ASSERT(stream != INVALID_STREAM);
- // just allocated (& OPEN), or externally negotiated
- mStreams[stream] = channel; // holds a reference
- channel->mStream = stream;
- #ifdef TEST_QUEUED_DATA
- // It's painful to write a test for this...
- channel->mState = OPEN;
- channel->mReady = true;
- SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
- #endif
- if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
- // Don't send unordered until this gets cleared
- channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
- }
- if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
- if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
- stream,
- !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
- channel->mPrPolicy, channel->mPrValue)) {
- LOG(("SendOpenRequest failed, errno = %d", errno));
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
- // Note: we're locked, so there's no danger of a race with the
- // buffer-threshold callback
- return channel.forget();
- } else {
- if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
- // We already returned the channel to the app.
- NS_ERROR("Failed to send open request");
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
- channel)));
- }
- // If we haven't returned the channel yet, it will get destroyed when we exit
- // this function.
- mStreams[stream] = nullptr;
- channel->mStream = INVALID_STREAM;
- // we'll be destroying the channel
- channel->mState = CLOSED;
- return nullptr;
- }
- /* NOTREACHED */
- }
- }
- // Either externally negotiated or we sent Open
- channel->mState = OPEN;
- channel->mReady = true;
- // FIX? Move into DOMDataChannel? I don't think we can send it yet here
- LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
- channel)));
- return channel.forget();
- request_error_cleanup:
- channel->mState = CLOSED;
- if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
- // We already returned the channel to the app.
- NS_ERROR("Failed to request more streams");
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
- channel)));
- return channel.forget();
- }
- // we'll be destroying the channel, but it never really got set up
- // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
- // Dispatch it to ourselves
- return nullptr;
- }
- int32_t
- DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
- size_t length, uint32_t ppid)
- {
- uint16_t flags;
- struct sctp_sendv_spa spa;
- int32_t result;
- NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
- NS_WARNING_ASSERTION(length > 0, "Length is 0?!");
- // To avoid problems where an in-order OPEN is lost and an
- // out-of-order data message "beats" it, require data to be in-order
- // until we get an ACK.
- if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
- !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
- flags = SCTP_UNORDERED;
- } else {
- flags = 0;
- }
- spa.sendv_sndinfo.snd_ppid = htonl(ppid);
- spa.sendv_sndinfo.snd_sid = channel->mStream;
- spa.sendv_sndinfo.snd_flags = flags;
- spa.sendv_sndinfo.snd_context = 0;
- spa.sendv_sndinfo.snd_assoc_id = 0;
- spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
- if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
- spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
- spa.sendv_prinfo.pr_value = channel->mPrValue;
- spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
- }
- // Note: Main-thread IO, but doesn't block!
- // XXX FIX! to deal with heavy overruns of JS trying to pass data in
- // (more than the buffersize) queue data onto another thread to do the
- // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
- // SCTP will return EMSGSIZE if the message is bigger than the buffer
- // size (or EAGAIN if there isn't space)
- // Avoid a race between buffer-full-failure (where we have to add the
- // packet to the buffered-data queue) and the buffer-now-only-half-full
- // callback, which happens on a different thread. Otherwise we might
- // fail here, then before we add it to the queue get the half-full
- // callback, find nothing to do, then on this thread add it to the
- // queue - which would sit there. Also, if we later send more data, it
- // would arrive ahead of the buffered message, but if the buffer ever
- // got to 1/2 full, the message would get sent - but at a semi-random
- // time, after other data it was supposed to be in front of.
- // Must lock before empty check for similar reasons!
- MutexAutoLock lock(mLock);
- if (channel->mBufferedData.IsEmpty()) {
- result = usrsctp_sendv(mSocket, data, length,
- nullptr, 0,
- (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
- SCTP_SENDV_SPA, 0);
- LOG(("Sent buffer (len=%u), result=%d", length, result));
- } else {
- // Fake EAGAIN if we're already buffering data
- result = -1;
- errno = EAGAIN;
- }
- if (result < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- // queue data for resend! And queue any further data for the stream until it is...
- auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc
- channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
- channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
- LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
- return 0;
- }
- LOG(("error %d sending string", errno));
- }
- return result;
- }
- // Handles fragmenting binary messages
- int32_t
- DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
- size_t len,
- uint32_t ppid_partial, uint32_t ppid_final)
- {
- // Since there's a limit on network buffer size and no limits on message
- // size, and we don't want to use EOR mode (multiple writes for a
- // message, but all other streams are blocked until you finish sending
- // this message), we need to add application-level fragmentation of large
- // messages. On a reliable channel, these can be simply rebuilt into a
- // large message. On an unreliable channel, we can't and don't know how
- // long to wait, and there are no retransmissions, and no easy way to
- // tell the user "this part is missing", so on unreliable channels we
- // need to return an error if sending more bytes than the network buffers
- // can hold, and perhaps a lower number.
- // We *really* don't want to do this from main thread! - and SendMsgInternal
- // avoids blocking.
- // This MUST be reliable and in-order for the reassembly to work
- if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
- channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
- !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
- int32_t sent=0;
- uint32_t origlen = len;
- LOG(("Sending binary message length %u in chunks", len));
- // XXX check flags for out-of-order, or force in-order for large binary messages
- while (len > 0) {
- size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
- uint32_t ppid;
- len -= sendlen;
- ppid = len > 0 ? ppid_partial : ppid_final;
- LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
- // Note that these might end up being deferred and queued.
- sent += SendMsgInternal(channel, data, sendlen, ppid);
- data += sendlen;
- }
- LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
- (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
- origlen, sent,
- channel->mBufferedData.Length()));
- return sent;
- }
- NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
- "Sending too-large data on unreliable channel!");
- // This will fail if the message is too large (default 256K)
- return SendMsgInternal(channel, data, len, ppid_final);
- }
- class ReadBlobRunnable : public Runnable {
- public:
- ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
- nsIInputStream* aBlob) :
- mConnection(aConnection),
- mStream(aStream),
- mBlob(aBlob)
- {}
- NS_IMETHOD Run() override {
- // ReadBlob() is responsible to releasing the reference
- DataChannelConnection *self = mConnection;
- self->ReadBlob(mConnection.forget(), mStream, mBlob);
- return NS_OK;
- }
- private:
- // Make sure the Connection doesn't die while there are jobs outstanding.
- // Let it die (if released by PeerConnectionImpl while we're running)
- // when we send our runnable back to MainThread. Then ~DataChannelConnection
- // can send the IOThread to MainThread to die in a runnable, avoiding
- // unsafe event loop recursion. Evil.
- RefPtr<DataChannelConnection> mConnection;
- uint16_t mStream;
- // Use RefCount for preventing the object is deleted when SendBlob returns.
- RefPtr<nsIInputStream> mBlob;
- };
- int32_t
- DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
- {
- DataChannel *channel = mStreams[stream];
- NS_ENSURE_TRUE(channel, 0);
- // Spawn a thread to send the data
- if (!mInternalIOThread) {
- nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
- if (NS_FAILED(res)) {
- return -1;
- }
- }
- mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
- return 0;
- }
- class DataChannelBlobSendRunnable : public Runnable
- {
- public:
- DataChannelBlobSendRunnable(already_AddRefed<DataChannelConnection>& aConnection,
- uint16_t aStream)
- : mConnection(aConnection)
- , mStream(aStream) {}
- ~DataChannelBlobSendRunnable() override
- {
- if (!NS_IsMainThread() && mConnection) {
- MOZ_ASSERT(false);
- // explicitly leak the connection if destroyed off mainthread
- Unused << mConnection.forget().take();
- }
- }
- NS_IMETHOD Run() override
- {
- ASSERT_WEBRTC(NS_IsMainThread());
- mConnection->SendBinaryMsg(mStream, mData);
- mConnection = nullptr;
- return NS_OK;
- }
- // explicitly public so we can avoid allocating twice and copying
- nsCString mData;
- private:
- // Note: we can be destroyed off the target thread, so be careful not to let this
- // get Released()ed on the temp thread!
- RefPtr<DataChannelConnection> mConnection;
- uint16_t mStream;
- };
- void
- DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
- uint16_t aStream, nsIInputStream* aBlob)
- {
- // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
- // it off mainthread; if PeerConnectionImpl has released then we want
- // ~DataChannelConnection() to run on MainThread
- // XXX to do this safely, we must enqueue these atomically onto the
- // output socket. We need a sender thread(s?) to enqueue data into the
- // socket and to avoid main-thread IO that might block. Even on a
- // background thread, we may not want to block on one stream's data.
- // I.e. run non-blocking and service multiple channels.
- // For now as a hack, send as a single blast of queued packets which may
- // be deferred until buffer space is available.
- uint64_t len;
- nsCOMPtr<nsIThread> mainThread;
- NS_GetMainThread(getter_AddRefs(mainThread));
- // Must not let Dispatching it cause the DataChannelConnection to get
- // released on the wrong thread. Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),...
- // will occasionally cause aThis to get released on this thread. Also, an explicit Runnable
- // lets us avoid copying the blob data an extra time.
- RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis,
- aStream);
- // avoid copying the blob data by passing the mData from the runnable
- if (NS_FAILED(aBlob->Available(&len)) ||
- NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, len))) {
- // Bug 966602: Doesn't return an error to the caller via onerror.
- // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
- // aThis is now owned by the runnable; release it there
- NS_ProxyRelease(mainThread, runnable.forget());
- return;
- }
- aBlob->Close();
- NS_DispatchToMainThread(runnable, NS_DISPATCH_NORMAL);
- }
- void
- DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
- {
- ASSERT_WEBRTC(NS_IsMainThread());
- for (uint32_t i = 0; i < mStreams.Length(); ++i) {
- if (mStreams[i]) {
- aStreamList->push_back(mStreams[i]->mStream);
- }
- }
- }
- int32_t
- DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
- bool isBinary)
- {
- ASSERT_WEBRTC(NS_IsMainThread());
- // We really could allow this from other threads, so long as we deal with
- // asynchronosity issues with channels closing, in particular access to
- // mStreams, and issues with the association closing (access to mSocket).
- const char *data = aMsg.BeginReading();
- uint32_t len = aMsg.Length();
- DataChannel *channel;
- LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
- // XXX if we want more efficiency, translate flags once at open time
- channel = mStreams[stream];
- NS_ENSURE_TRUE(channel, 0);
- if (isBinary)
- return SendBinary(channel, data, len,
- DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
- return SendBinary(channel, data, len,
- DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
- }
- void
- DataChannelConnection::Close(DataChannel *aChannel)
- {
- MutexAutoLock lock(mLock);
- CloseInt(aChannel);
- }
- // So we can call Close() with the lock already held
- // Called from someone who holds a ref via ::Close(), or from ~DataChannel
- void
- DataChannelConnection::CloseInt(DataChannel *aChannel)
- {
- MOZ_ASSERT(aChannel);
- RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
- mLock.AssertCurrentThreadOwns();
- LOG(("Connection %p/Channel %p: Closing stream %u",
- channel->mConnection.get(), channel.get(), channel->mStream));
- // re-test since it may have closed before the lock was grabbed
- if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
- LOG(("Channel already closing/closed (%u)", aChannel->mState));
- if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
- // called from CloseAll()
- // we're not going to hang around waiting any more
- mStreams[channel->mStream] = nullptr;
- }
- return;
- }
- aChannel->mBufferedData.Clear();
- if (channel->mStream != INVALID_STREAM) {
- ResetOutgoingStream(channel->mStream);
- if (mState == CLOSED) { // called from CloseAll()
- // Let resets accumulate then send all at once in CloseAll()
- // we're not going to hang around waiting
- mStreams[channel->mStream] = nullptr;
- } else {
- SendOutgoingStreamReset();
- }
- }
- aChannel->mState = CLOSING;
- if (mState == CLOSED) {
- // we're not going to hang around waiting
- channel->StreamClosedLocked();
- }
- // At this point when we leave here, the object is a zombie held alive only by the DOM object
- }
- void DataChannelConnection::CloseAll()
- {
- LOG(("Closing all channels (connection %p)", (void*) this));
- // Don't need to lock here
- // Make sure no more channels will be opened
- {
- MutexAutoLock lock(mLock);
- mState = CLOSED;
- }
- // Close current channels
- // If there are runnables, they hold a strong ref and keep the channel
- // and/or connection alive (even if in a CLOSED state)
- bool closed_some = false;
- for (uint32_t i = 0; i < mStreams.Length(); ++i) {
- if (mStreams[i]) {
- mStreams[i]->Close();
- closed_some = true;
- }
- }
- // Clean up any pending opens for channels
- RefPtr<DataChannel> channel;
- while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
- LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
- channel->Close(); // also releases the ref on each iteration
- closed_some = true;
- }
- // It's more efficient to let the Resets queue in shutdown and then
- // SendOutgoingStreamReset() here.
- if (closed_some) {
- MutexAutoLock lock(mLock);
- SendOutgoingStreamReset();
- }
- }
- DataChannel::~DataChannel()
- {
- // NS_ASSERTION since this is more "I think I caught all the cases that
- // can cause this" than a true kill-the-program assertion. If this is
- // wrong, nothing bad happens. A worst it's a leak.
- NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
- }
- void
- DataChannel::Close()
- {
- if (mConnection) {
- // ensure we don't get deleted
- RefPtr<DataChannelConnection> connection(mConnection);
- connection->Close(this);
- }
- }
- // Used when disconnecting from the DataChannelConnection
- void
- DataChannel::StreamClosedLocked()
- {
- mConnection->mLock.AssertCurrentThreadOwns();
- ENSURE_DATACONNECTION;
- LOG(("Destroying Data channel %u", mStream));
- MOZ_ASSERT_IF(mStream != INVALID_STREAM,
- !mConnection->FindChannelByStream(mStream));
- mStream = INVALID_STREAM;
- mState = CLOSED;
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED,
- mConnection, this)));
- // We leave mConnection live until the DOM releases us, to avoid races
- }
- void
- DataChannel::ReleaseConnection()
- {
- ASSERT_WEBRTC(NS_IsMainThread());
- mConnection = nullptr;
- }
- void
- DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
- {
- MutexAutoLock mLock(mListenerLock);
- mContext = aContext;
- mListener = aListener;
- }
- // May be called from another (i.e. Main) thread!
- void
- DataChannel::AppReady()
- {
- ENSURE_DATACONNECTION;
- MutexAutoLock lock(mConnection->mLock);
- mReady = true;
- if (mState == WAITING_TO_OPEN) {
- mState = OPEN;
- NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
- DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
- this)));
- for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
- nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
- MOZ_ASSERT(runnable);
- NS_DispatchToMainThread(runnable);
- }
- } else {
- NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
- }
- mQueuedMessages.Clear();
- mQueuedMessages.Compact();
- // We never use it again... We could even allocate the array in the odd
- // cases we need it.
- }
- uint32_t
- DataChannel::GetBufferedAmountLocked() const
- {
- size_t buffered = 0;
- for (auto& buffer : mBufferedData) {
- buffered += buffer->mLength;
- }
- // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
- // amount from the SCTP stack for a single stream. It is on their to-do
- // list, and once we import a stack with support for that, we'll need to
- // add it to what we buffer. Also we'll need to ask for notification of a per-
- // stream buffer-low event and merge that into the handling of buffer-low
- // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
- if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
- buffered = UINT32_MAX;
- }
- return buffered;
- }
- uint32_t
- DataChannel::GetBufferedAmountLowThreshold()
- {
- return mBufferedThreshold;
- }
- // Never fire immediately, as it's defined to fire on transitions, not state
- void
- DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
- {
- mBufferedThreshold = aThreshold;
- }
- // Called with mLock locked!
- void
- DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
- {
- if (!mReady &&
- (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
- mQueuedMessages.AppendElement(aMessage);
- } else {
- NS_DispatchToMainThread(aMessage);
- }
- }
- } // namespace mozilla
|