nr_socket_prsock.cpp 63 KB


  1. /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
  2. /* This Source Code Form is subject to the terms of the Mozilla Public
  3. * License, v. 2.0. If a copy of the MPL was not distributed with this file,
  4. * You can obtain one at http://mozilla.org/MPL/2.0/. */
  5. /*
  6. Modified version of nr_socket_local, adapted for NSPR
  7. */
  8. /* This Source Code Form is subject to the terms of the Mozilla Public
  9. * License, v. 2.0. If a copy of the MPL was not distributed with this file,
  10. * You can obtain one at http://mozilla.org/MPL/2.0/. */
  11. /*
  12. Original code from nICEr and nrappkit.
  13. nICEr copyright:
  14. Copyright (c) 2007, Adobe Systems, Incorporated
  15. All rights reserved.
  16. Redistribution and use in source and binary forms, with or without
  17. modification, are permitted provided that the following conditions are
  18. met:
  19. * Redistributions of source code must retain the above copyright
  20. notice, this list of conditions and the following disclaimer.
  21. * Redistributions in binary form must reproduce the above copyright
  22. notice, this list of conditions and the following disclaimer in the
  23. documentation and/or other materials provided with the distribution.
  24. * Neither the name of Adobe Systems, Network Resonance nor the names of its
  25. contributors may be used to endorse or promote products derived from
  26. this software without specific prior written permission.
  27. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28. "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29. LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30. A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31. OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32. SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33. LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34. DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35. THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36. (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38. nrappkit copyright:
  39. Copyright (C) 2001-2003, Network Resonance, Inc.
  40. Copyright (C) 2006, Network Resonance, Inc.
  41. All Rights Reserved
  42. Redistribution and use in source and binary forms, with or without
  43. modification, are permitted provided that the following conditions
  44. are met:
  45. 1. Redistributions of source code must retain the above copyright
  46. notice, this list of conditions and the following disclaimer.
  47. 2. Redistributions in binary form must reproduce the above copyright
  48. notice, this list of conditions and the following disclaimer in the
  49. documentation and/or other materials provided with the distribution.
  50. 3. Neither the name of Network Resonance, Inc. nor the name of any
  51. contributors to this software may be used to endorse or promote
  52. products derived from this software without specific prior written
  53. permission.
  54. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
  55. AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  56. IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  57. ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  58. LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  59. CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  60. SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  61. INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  62. CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  63. ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  64. POSSIBILITY OF SUCH DAMAGE.
  65. ekr@rtfm.com Thu Dec 20 20:14:49 2001
  66. */
  67. #include <csi_platform.h>
  68. #include <stdio.h>
  69. #include <string.h>
  70. #include <sys/types.h>
  71. #include <assert.h>
  72. #include <errno.h>
  73. #include <string>
  74. #include "nspr.h"
  75. #include "prerror.h"
  76. #include "prio.h"
  77. #include "prnetdb.h"
  78. #include "mozilla/net/DNS.h"
  79. #include "nsCOMPtr.h"
  80. #include "nsASocketHandler.h"
  81. #include "nsISocketTransportService.h"
  82. #include "nsNetCID.h"
  83. #include "nsISupportsImpl.h"
  84. #include "nsServiceManagerUtils.h"
  85. #include "nsComponentManagerUtils.h"
  86. #include "nsXPCOM.h"
  87. #include "nsXULAppAPI.h"
  88. #include "runnable_utils.h"
  89. #include "mozilla/SyncRunnable.h"
  90. #include "nsTArray.h"
  91. #include "mozilla/dom/TCPSocketBinding.h"
  92. #include "nsITCPSocketCallback.h"
  93. #include "nsIPrefService.h"
  94. #include "nsIPrefBranch.h"
  95. #include "nsISocketFilter.h"
  96. #ifdef XP_WIN
  97. #include "mozilla/WindowsVersion.h"
  98. #endif
  99. #if defined(MOZILLA_INTERNAL_API)
  100. // csi_platform.h deep in nrappkit defines LOG_INFO and LOG_WARNING
  101. #ifdef LOG_INFO
  102. #define LOG_TEMP_INFO LOG_INFO
  103. #undef LOG_INFO
  104. #endif
  105. #ifdef LOG_WARNING
  106. #define LOG_TEMP_WARNING LOG_WARNING
  107. #undef LOG_WARNING
  108. #endif
  109. #if defined(LOG_DEBUG)
  110. #define LOG_TEMP_DEBUG LOG_DEBUG
  111. #undef LOG_DEBUG
  112. #endif
  113. #undef strlcpy
  114. #include "mozilla/dom/network/TCPSocketChild.h"
  115. #ifdef LOG_TEMP_INFO
  116. #define LOG_INFO LOG_TEMP_INFO
  117. #endif
  118. #ifdef LOG_TEMP_WARNING
  119. #define LOG_WARNING LOG_TEMP_WARNING
  120. #endif
  121. #ifdef LOG_TEMP_DEBUG
  122. #define LOG_DEBUG LOG_TEMP_DEBUG
  123. #endif
  124. #ifdef XP_WIN
  125. #ifdef LOG_DEBUG
  126. #undef LOG_DEBUG
  127. #endif
  128. // cloned from csi_platform.h. Win32 doesn't like how we hide symbols
  129. #define LOG_DEBUG 7
  130. #endif
  131. #endif
  132. extern "C" {
  133. #include "nr_api.h"
  134. #include "async_wait.h"
  135. #include "nr_socket.h"
  136. #include "nr_socket_local.h"
  137. #include "stun_hint.h"
  138. }
  139. #include "nr_socket_prsock.h"
  140. #include "simpletokenbucket.h"
  141. #include "test_nr_socket.h"
  142. // Implement the nsISupports ref counting
  143. namespace mozilla {
  144. #if defined(MOZILLA_INTERNAL_API)
  145. class SingletonThreadHolder final
  146. {
  147. private:
  148. ~SingletonThreadHolder()
  149. {
  150. r_log(LOG_GENERIC,LOG_DEBUG,"Deleting SingletonThreadHolder");
  151. MOZ_ASSERT(!mThread, "SingletonThreads should be Released and shut down before exit!");
  152. if (mThread) {
  153. mThread->Shutdown();
  154. mThread = nullptr;
  155. }
  156. }
  157. DISALLOW_COPY_ASSIGN(SingletonThreadHolder);
  158. public:
  159. // Must be threadsafe for StaticRefPtr/ClearOnShutdown
  160. NS_INLINE_DECL_THREADSAFE_REFCOUNTING(SingletonThreadHolder)
  161. explicit SingletonThreadHolder(const nsCSubstring& aName)
  162. : mName(aName)
  163. {
  164. mParentThread = NS_GetCurrentThread();
  165. }
  166. nsIThread* GetThread() {
  167. return mThread;
  168. }
  169. /*
  170. * Keep track of how many instances are using a SingletonThreadHolder.
  171. * When no one is using it, shut it down
  172. */
  173. MozExternalRefCountType AddUse()
  174. {
  175. MOZ_ASSERT(mParentThread == NS_GetCurrentThread());
  176. MOZ_ASSERT(int32_t(mUseCount) >= 0, "illegal refcnt");
  177. nsrefcnt count = ++mUseCount;
  178. if (count == 1) {
  179. // idle -> in-use
  180. nsresult rv = NS_NewThread(getter_AddRefs(mThread));
  181. MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv) && mThread,
  182. "Should successfully create mtransport I/O thread");
  183. NS_SetThreadName(mThread, mName);
  184. r_log(LOG_GENERIC,LOG_DEBUG,"Created wrapped SingletonThread %p",
  185. mThread.get());
  186. }
  187. r_log(LOG_GENERIC,LOG_DEBUG,"AddUse: %lu", (unsigned long) count);
  188. return count;
  189. }
  190. MozExternalRefCountType ReleaseUse()
  191. {
  192. MOZ_ASSERT(mParentThread == NS_GetCurrentThread());
  193. nsrefcnt count = --mUseCount;
  194. MOZ_ASSERT(int32_t(mUseCount) >= 0, "illegal refcnt");
  195. if (count == 0) {
  196. // in-use -> idle -- no one forcing it to remain instantiated
  197. r_log(LOG_GENERIC,LOG_DEBUG,"Shutting down wrapped SingletonThread %p",
  198. mThread.get());
  199. mThread->Shutdown();
  200. mThread = nullptr;
  201. // It'd be nice to use a timer instead... But be careful of
  202. // xpcom-shutdown-threads in that case
  203. }
  204. r_log(LOG_GENERIC,LOG_DEBUG,"ReleaseUse: %lu", (unsigned long) count);
  205. return count;
  206. }
  207. private:
  208. nsCString mName;
  209. nsAutoRefCnt mUseCount;
  210. nsCOMPtr<nsIThread> mParentThread;
  211. nsCOMPtr<nsIThread> mThread;
  212. };
  213. static StaticRefPtr<SingletonThreadHolder> sThread;
  214. static void ClearSingletonOnShutdown()
  215. {
  216. ClearOnShutdown(&sThread);
  217. }
  218. #endif
  219. static nsIThread* GetIOThreadAndAddUse_s()
  220. {
  221. // Always runs on STS thread!
  222. #if defined(MOZILLA_INTERNAL_API)
  223. // We need to safely release this on shutdown to avoid leaks
  224. if (!sThread) {
  225. sThread = new SingletonThreadHolder(NS_LITERAL_CSTRING("mtransport"));
  226. NS_DispatchToMainThread(mozilla::WrapRunnableNM(&ClearSingletonOnShutdown));
  227. }
  228. // Mark that we're using the shared thread and need it to stick around
  229. sThread->AddUse();
  230. return sThread->GetThread();
  231. #else
  232. static nsCOMPtr<nsIThread> sThread;
  233. if (!sThread) {
  234. (void) NS_NewNamedThread("mtransport", getter_AddRefs(sThread));
  235. }
  236. return sThread;
  237. #endif
  238. }
  239. NrSocketIpc::NrSocketIpc(nsIEventTarget *aThread)
  240. : io_thread_(aThread)
  241. {}
  242. static TimeStamp nr_socket_short_term_violation_time;
  243. static TimeStamp nr_socket_long_term_violation_time;
  244. TimeStamp NrSocketBase::short_term_violation_time() {
  245. return nr_socket_short_term_violation_time;
  246. }
  247. TimeStamp NrSocketBase::long_term_violation_time() {
  248. return nr_socket_long_term_violation_time;
  249. }
  250. // NrSocketBase implementation
  251. // async_event APIs
  252. int NrSocketBase::async_wait(int how, NR_async_cb cb, void *cb_arg,
  253. char *function, int line) {
  254. uint16_t flag;
  255. switch (how) {
  256. case NR_ASYNC_WAIT_READ:
  257. flag = PR_POLL_READ;
  258. break;
  259. case NR_ASYNC_WAIT_WRITE:
  260. flag = PR_POLL_WRITE;
  261. break;
  262. default:
  263. return R_BAD_ARGS;
  264. }
  265. cbs_[how] = cb;
  266. cb_args_[how] = cb_arg;
  267. poll_flags_ |= flag;
  268. return 0;
  269. }
  270. int NrSocketBase::cancel(int how) {
  271. uint16_t flag;
  272. switch (how) {
  273. case NR_ASYNC_WAIT_READ:
  274. flag = PR_POLL_READ;
  275. break;
  276. case NR_ASYNC_WAIT_WRITE:
  277. flag = PR_POLL_WRITE;
  278. break;
  279. default:
  280. return R_BAD_ARGS;
  281. }
  282. poll_flags_ &= ~flag;
  283. return 0;
  284. }
  285. void NrSocketBase::fire_callback(int how) {
  286. // This can't happen unless we are armed because we only set
  287. // the flags if we are armed
  288. MOZ_ASSERT(cbs_[how]);
  289. // Now cancel so that we need to be re-armed. Note that
  290. // the re-arming probably happens in the callback we are
  291. // about to fire.
  292. cancel(how);
  293. cbs_[how](this, how, cb_args_[how]);
  294. }
  295. // NrSocket implementation
  296. NS_IMPL_ISUPPORTS0(NrSocket)
  297. // The nsASocket callbacks
  298. void NrSocket::OnSocketReady(PRFileDesc *fd, int16_t outflags) {
  299. if (outflags & PR_POLL_READ & poll_flags())
  300. fire_callback(NR_ASYNC_WAIT_READ);
  301. if (outflags & PR_POLL_WRITE & poll_flags())
  302. fire_callback(NR_ASYNC_WAIT_WRITE);
  303. if (outflags & (PR_POLL_ERR | PR_POLL_NVAL | PR_POLL_HUP))
  304. // TODO: Bug 946423: how do we notify the upper layers about this?
  305. close();
  306. }
  307. void NrSocket::OnSocketDetached(PRFileDesc *fd) {
  308. r_log(LOG_GENERIC, LOG_DEBUG, "Socket %p detached", fd);
  309. }
  310. void NrSocket::IsLocal(bool *aIsLocal) {
  311. // TODO(jesup): better check? Does it matter? (likely no)
  312. *aIsLocal = false;
  313. }
  314. // async_event APIs
  315. int NrSocket::async_wait(int how, NR_async_cb cb, void *cb_arg,
  316. char *function, int line) {
  317. int r = NrSocketBase::async_wait(how, cb, cb_arg, function, line);
  318. if (!r) {
  319. mPollFlags = poll_flags();
  320. }
  321. return r;
  322. }
  323. int NrSocket::cancel(int how) {
  324. int r = NrSocketBase::cancel(how);
  325. if (!r) {
  326. mPollFlags = poll_flags();
  327. }
  328. return r;
  329. }
  330. // Helper functions for addresses
  331. static int nr_transport_addr_to_praddr(nr_transport_addr *addr,
  332. PRNetAddr *naddr)
  333. {
  334. int _status;
  335. memset(naddr, 0, sizeof(*naddr));
  336. switch(addr->protocol){
  337. case IPPROTO_TCP:
  338. break;
  339. case IPPROTO_UDP:
  340. break;
  341. default:
  342. ABORT(R_BAD_ARGS);
  343. }
  344. switch(addr->ip_version){
  345. case NR_IPV4:
  346. naddr->inet.family = PR_AF_INET;
  347. naddr->inet.port = addr->u.addr4.sin_port;
  348. naddr->inet.ip = addr->u.addr4.sin_addr.s_addr;
  349. break;
  350. case NR_IPV6:
  351. naddr->ipv6.family = PR_AF_INET6;
  352. naddr->ipv6.port = addr->u.addr6.sin6_port;
  353. naddr->ipv6.flowinfo = addr->u.addr6.sin6_flowinfo;
  354. memcpy(&naddr->ipv6.ip, &addr->u.addr6.sin6_addr, sizeof(in6_addr));
  355. naddr->ipv6.scope_id = addr->u.addr6.sin6_scope_id;
  356. break;
  357. default:
  358. ABORT(R_BAD_ARGS);
  359. }
  360. _status = 0;
  361. abort:
  362. return(_status);
  363. }
  364. //XXX schien@mozilla.com: copy from PRNetAddrToNetAddr,
  365. // should be removed after fix the link error in signaling_unittests
  366. static int praddr_to_netaddr(const PRNetAddr *prAddr, net::NetAddr *addr)
  367. {
  368. int _status;
  369. switch (prAddr->raw.family) {
  370. case PR_AF_INET:
  371. addr->inet.family = AF_INET;
  372. addr->inet.port = prAddr->inet.port;
  373. addr->inet.ip = prAddr->inet.ip;
  374. break;
  375. case PR_AF_INET6:
  376. addr->inet6.family = AF_INET6;
  377. addr->inet6.port = prAddr->ipv6.port;
  378. addr->inet6.flowinfo = prAddr->ipv6.flowinfo;
  379. memcpy(&addr->inet6.ip, &prAddr->ipv6.ip, sizeof(addr->inet6.ip.u8));
  380. addr->inet6.scope_id = prAddr->ipv6.scope_id;
  381. break;
  382. default:
  383. MOZ_ASSERT(false);
  384. ABORT(R_BAD_ARGS);
  385. }
  386. _status = 0;
  387. abort:
  388. return(_status);
  389. }
  390. static int nr_transport_addr_to_netaddr(nr_transport_addr *addr,
  391. net::NetAddr *naddr)
  392. {
  393. int r, _status;
  394. PRNetAddr praddr;
  395. if((r = nr_transport_addr_to_praddr(addr, &praddr))) {
  396. ABORT(r);
  397. }
  398. if((r = praddr_to_netaddr(&praddr, naddr))) {
  399. ABORT(r);
  400. }
  401. _status = 0;
  402. abort:
  403. return(_status);
  404. }
  405. int nr_netaddr_to_transport_addr(const net::NetAddr *netaddr,
  406. nr_transport_addr *addr, int protocol)
  407. {
  408. int _status;
  409. int r;
  410. switch(netaddr->raw.family) {
  411. case AF_INET:
  412. if ((r = nr_ip4_port_to_transport_addr(ntohl(netaddr->inet.ip),
  413. ntohs(netaddr->inet.port),
  414. protocol, addr)))
  415. ABORT(r);
  416. break;
  417. case AF_INET6:
  418. if ((r = nr_ip6_port_to_transport_addr((in6_addr *)&netaddr->inet6.ip.u8,
  419. ntohs(netaddr->inet6.port),
  420. protocol, addr)))
  421. ABORT(r);
  422. break;
  423. default:
  424. MOZ_ASSERT(false);
  425. ABORT(R_BAD_ARGS);
  426. }
  427. _status = 0;
  428. abort:
  429. return(_status);
  430. }
  431. int nr_praddr_to_transport_addr(const PRNetAddr *praddr,
  432. nr_transport_addr *addr, int protocol,
  433. int keep)
  434. {
  435. int _status;
  436. int r;
  437. struct sockaddr_in ip4;
  438. struct sockaddr_in6 ip6;
  439. switch(praddr->raw.family) {
  440. case PR_AF_INET:
  441. ip4.sin_family = PF_INET;
  442. ip4.sin_addr.s_addr = praddr->inet.ip;
  443. ip4.sin_port = praddr->inet.port;
  444. if ((r = nr_sockaddr_to_transport_addr((sockaddr *)&ip4,
  445. protocol, keep,
  446. addr)))
  447. ABORT(r);
  448. break;
  449. case PR_AF_INET6:
  450. ip6.sin6_family = PF_INET6;
  451. ip6.sin6_port = praddr->ipv6.port;
  452. ip6.sin6_flowinfo = praddr->ipv6.flowinfo;
  453. memcpy(&ip6.sin6_addr, &praddr->ipv6.ip, sizeof(in6_addr));
  454. ip6.sin6_scope_id = praddr->ipv6.scope_id;
  455. if ((r = nr_sockaddr_to_transport_addr((sockaddr *)&ip6,protocol,keep,addr)))
  456. ABORT(r);
  457. break;
  458. default:
  459. MOZ_ASSERT(false);
  460. ABORT(R_BAD_ARGS);
  461. }
  462. _status = 0;
  463. abort:
  464. return(_status);
  465. }
  466. /*
  467. * nr_transport_addr_get_addrstring_and_port
  468. * convert nr_transport_addr to IP address string and port number
  469. */
  470. int nr_transport_addr_get_addrstring_and_port(nr_transport_addr *addr,
  471. nsACString *host, int32_t *port) {
  472. int r, _status;
  473. char addr_string[64];
  474. // We cannot directly use |nr_transport_addr.as_string| because it contains
  475. // more than ip address, therefore, we need to explicity convert it
  476. // from |nr_transport_addr_get_addrstring|.
  477. if ((r=nr_transport_addr_get_addrstring(addr, addr_string, sizeof(addr_string)))) {
  478. ABORT(r);
  479. }
  480. if ((r=nr_transport_addr_get_port(addr, port))) {
  481. ABORT(r);
  482. }
  483. *host = addr_string;
  484. _status = 0;
  485. abort:
  486. return(_status);
  487. }
  488. // nr_socket APIs (as member functions)
  489. int NrSocket::create(nr_transport_addr *addr) {
  490. int r,_status;
  491. PRStatus status;
  492. PRNetAddr naddr;
  493. nsresult rv;
  494. nsCOMPtr<nsISocketTransportService> stservice =
  495. do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
  496. if (!NS_SUCCEEDED(rv)) {
  497. ABORT(R_INTERNAL);
  498. }
  499. if((r=nr_transport_addr_to_praddr(addr, &naddr)))
  500. ABORT(r);
  501. switch (addr->protocol) {
  502. case IPPROTO_UDP:
  503. if (!(fd_ = PR_OpenUDPSocket(naddr.raw.family))) {
  504. r_log(LOG_GENERIC,LOG_CRIT,"Couldn't create UDP socket, "
  505. "family=%d, err=%d", naddr.raw.family, PR_GetError());
  506. ABORT(R_INTERNAL);
  507. }
  508. #ifdef XP_WIN
  509. if (!mozilla::IsWin8OrLater()) {
  510. // Increase default send and receive buffer sizes on <= Win7 to be able to
  511. // receive and send an unpaced HD (>= 720p = 1280x720 - I Frame ~ 21K size)
  512. // stream without losing packets.
  513. // Manual testing showed that 100K buffer size was not enough and the
  514. // packet loss dis-appeared with 256K buffer size.
  515. // See bug 1252769 for future improvements of this.
  516. PRSize min_buffer_size = 256 * 1024;
  517. PRSocketOptionData opt_rcvbuf;
  518. opt_rcvbuf.option = PR_SockOpt_RecvBufferSize;
  519. if ((status = PR_GetSocketOption(fd_, &opt_rcvbuf)) == PR_SUCCESS) {
  520. if (opt_rcvbuf.value.recv_buffer_size < min_buffer_size) {
  521. opt_rcvbuf.value.recv_buffer_size = min_buffer_size;
  522. if ((status = PR_SetSocketOption(fd_, &opt_rcvbuf)) != PR_SUCCESS) {
  523. r_log(LOG_GENERIC, LOG_CRIT,
  524. "Couldn't set socket receive buffer size: %d", status);
  525. }
  526. } else {
  527. r_log(LOG_GENERIC, LOG_INFO,
  528. "Socket receive buffer size is already: %d",
  529. opt_rcvbuf.value.recv_buffer_size);
  530. }
  531. } else {
  532. r_log(LOG_GENERIC, LOG_CRIT,
  533. "Couldn't get socket receive buffer size: %d", status);
  534. }
  535. PRSocketOptionData opt_sndbuf;
  536. opt_sndbuf.option = PR_SockOpt_SendBufferSize;
  537. if ((status = PR_GetSocketOption(fd_, &opt_sndbuf)) == PR_SUCCESS) {
  538. if (opt_sndbuf.value.recv_buffer_size < min_buffer_size) {
  539. opt_sndbuf.value.recv_buffer_size = min_buffer_size;
  540. if ((status = PR_SetSocketOption(fd_, &opt_sndbuf)) != PR_SUCCESS) {
  541. r_log(LOG_GENERIC, LOG_CRIT,
  542. "Couldn't set socket send buffer size: %d", status);
  543. }
  544. } else {
  545. r_log(LOG_GENERIC, LOG_INFO,
  546. "Socket send buffer size is already: %d",
  547. opt_sndbuf.value.recv_buffer_size);
  548. }
  549. } else {
  550. r_log(LOG_GENERIC, LOG_CRIT,
  551. "Couldn't get socket send buffer size: %d", status);
  552. }
  553. }
  554. #endif
  555. break;
  556. case IPPROTO_TCP:
  557. if (!(fd_ = PR_OpenTCPSocket(naddr.raw.family))) {
  558. r_log(LOG_GENERIC,LOG_CRIT,"Couldn't create TCP socket, "
  559. "family=%d, err=%d", naddr.raw.family, PR_GetError());
  560. ABORT(R_INTERNAL);
  561. }
  562. // Set ReuseAddr for TCP sockets to enable having several
  563. // sockets bound to same local IP and port
  564. PRSocketOptionData opt_reuseaddr;
  565. opt_reuseaddr.option = PR_SockOpt_Reuseaddr;
  566. opt_reuseaddr.value.reuse_addr = PR_TRUE;
  567. status = PR_SetSocketOption(fd_, &opt_reuseaddr);
  568. if (status != PR_SUCCESS) {
  569. r_log(LOG_GENERIC, LOG_CRIT,
  570. "Couldn't set reuse addr socket option: %d", status);
  571. ABORT(R_INTERNAL);
  572. }
  573. // And also set ReusePort for platforms supporting this socket option
  574. PRSocketOptionData opt_reuseport;
  575. opt_reuseport.option = PR_SockOpt_Reuseport;
  576. opt_reuseport.value.reuse_port = PR_TRUE;
  577. status = PR_SetSocketOption(fd_, &opt_reuseport);
  578. if (status != PR_SUCCESS) {
  579. if (PR_GetError() != PR_OPERATION_NOT_SUPPORTED_ERROR) {
  580. r_log(LOG_GENERIC, LOG_CRIT,
  581. "Couldn't set reuse port socket option: %d", status);
  582. ABORT(R_INTERNAL);
  583. }
  584. }
  585. // Try to speedup packet delivery by disabling TCP Nagle
  586. PRSocketOptionData opt_nodelay;
  587. opt_nodelay.option = PR_SockOpt_NoDelay;
  588. opt_nodelay.value.no_delay = PR_TRUE;
  589. status = PR_SetSocketOption(fd_, &opt_nodelay);
  590. if (status != PR_SUCCESS) {
  591. r_log(LOG_GENERIC, LOG_WARNING,
  592. "Couldn't set Nodelay socket option: %d", status);
  593. }
  594. break;
  595. default:
  596. ABORT(R_INTERNAL);
  597. }
  598. status = PR_Bind(fd_, &naddr);
  599. if (status != PR_SUCCESS) {
  600. r_log(LOG_GENERIC,LOG_CRIT,"Couldn't bind socket to address %s",
  601. addr->as_string);
  602. ABORT(R_INTERNAL);
  603. }
  604. r_log(LOG_GENERIC,LOG_DEBUG,"Creating socket %p with addr %s",
  605. fd_, addr->as_string);
  606. nr_transport_addr_copy(&my_addr_,addr);
  607. /* If we have a wildcard port, patch up the addr */
  608. if(nr_transport_addr_is_wildcard(addr)){
  609. status = PR_GetSockName(fd_, &naddr);
  610. if (status != PR_SUCCESS){
  611. r_log(LOG_GENERIC, LOG_CRIT, "Couldn't get sock name for socket");
  612. ABORT(R_INTERNAL);
  613. }
  614. if((r=nr_praddr_to_transport_addr(&naddr,&my_addr_,addr->protocol,1)))
  615. ABORT(r);
  616. }
  617. // Set nonblocking
  618. PRSocketOptionData opt_nonblock;
  619. opt_nonblock.option = PR_SockOpt_Nonblocking;
  620. opt_nonblock.value.non_blocking = PR_TRUE;
  621. status = PR_SetSocketOption(fd_, &opt_nonblock);
  622. if (status != PR_SUCCESS) {
  623. r_log(LOG_GENERIC, LOG_CRIT, "Couldn't make socket nonblocking");
  624. ABORT(R_INTERNAL);
  625. }
  626. // Remember our thread.
  627. ststhread_ = do_QueryInterface(stservice, &rv);
  628. if (!NS_SUCCEEDED(rv))
  629. ABORT(R_INTERNAL);
  630. // Finally, register with the STS
  631. rv = stservice->AttachSocket(fd_, this);
  632. if (!NS_SUCCEEDED(rv)) {
  633. r_log(LOG_GENERIC, LOG_CRIT, "Couldn't attach socket to STS, rv=%u",
  634. static_cast<unsigned>(rv));
  635. ABORT(R_INTERNAL);
  636. }
  637. _status = 0;
  638. abort:
  639. return(_status);
  640. }
  641. static int ShouldDrop(size_t len) {
  642. // Global rate limiting for stun requests, to mitigate the ice hammer DoS
  643. // (see http://tools.ietf.org/html/draft-thomson-mmusic-ice-webrtc)
  644. // Tolerate rate of 8k/sec, for one second.
  645. static SimpleTokenBucket burst(16384*1, 16384);
  646. // Tolerate rate of 7.2k/sec over twenty seconds.
  647. static SimpleTokenBucket sustained(7372*20, 7372);
  648. // Check number of tokens in each bucket.
  649. if (burst.getTokens(UINT32_MAX) < len) {
  650. r_log(LOG_GENERIC, LOG_ERR,
  651. "Short term global rate limit for STUN requests exceeded.");
  652. #ifdef MOZILLA_INTERNAL_API
  653. nr_socket_short_term_violation_time = TimeStamp::Now();
  654. #endif
  655. return R_WOULDBLOCK;
  656. }
  657. if (sustained.getTokens(UINT32_MAX) < len) {
  658. r_log(LOG_GENERIC, LOG_ERR,
  659. "Long term global rate limit for STUN requests exceeded.");
  660. #ifdef MOZILLA_INTERNAL_API
  661. nr_socket_long_term_violation_time = TimeStamp::Now();
  662. #endif
  663. return R_WOULDBLOCK;
  664. }
  665. // Take len tokens from both buckets.
  666. // (not threadsafe, but no problem since this is only called from STS)
  667. burst.getTokens(len);
  668. sustained.getTokens(len);
  669. return 0;
  670. }
  671. // This should be called on the STS thread.
  672. int NrSocket::sendto(const void *msg, size_t len,
  673. int flags, nr_transport_addr *to) {
  674. ASSERT_ON_THREAD(ststhread_);
  675. int r,_status;
  676. PRNetAddr naddr;
  677. int32_t status;
  678. if ((r=nr_transport_addr_to_praddr(to, &naddr)))
  679. ABORT(r);
  680. if(fd_==nullptr)
  681. ABORT(R_EOD);
  682. if (nr_is_stun_request_message((UCHAR*)msg, len) && ShouldDrop(len)) {
  683. ABORT(R_WOULDBLOCK);
  684. }
  685. // TODO: Convert flags?
  686. status = PR_SendTo(fd_, msg, len, flags, &naddr, PR_INTERVAL_NO_WAIT);
  687. if (status < 0 || (size_t)status != len) {
  688. if (PR_GetError() == PR_WOULD_BLOCK_ERROR)
  689. ABORT(R_WOULDBLOCK);
  690. r_log(LOG_GENERIC, LOG_INFO, "Error in sendto %s: %d",
  691. to->as_string, PR_GetError());
  692. ABORT(R_IO_ERROR);
  693. }
  694. _status = 0;
  695. abort:
  696. return(_status);
  697. }
  698. int NrSocket::recvfrom(void * buf, size_t maxlen,
  699. size_t *len, int flags,
  700. nr_transport_addr *from) {
  701. ASSERT_ON_THREAD(ststhread_);
  702. int r,_status;
  703. PRNetAddr nfrom;
  704. int32_t status;
  705. status = PR_RecvFrom(fd_, buf, maxlen, flags, &nfrom, PR_INTERVAL_NO_WAIT);
  706. if (status <= 0) {
  707. if (PR_GetError() == PR_WOULD_BLOCK_ERROR)
  708. ABORT(R_WOULDBLOCK);
  709. r_log(LOG_GENERIC, LOG_INFO, "Error in recvfrom: %d", (int)PR_GetError());
  710. ABORT(R_IO_ERROR);
  711. }
  712. *len = status;
  713. if((r=nr_praddr_to_transport_addr(&nfrom,from,my_addr_.protocol,0)))
  714. ABORT(r);
  715. //r_log(LOG_GENERIC,LOG_DEBUG,"Read %d bytes from %s",*len,addr->as_string);
  716. _status = 0;
  717. abort:
  718. return(_status);
  719. }
  720. int NrSocket::getaddr(nr_transport_addr *addrp) {
  721. ASSERT_ON_THREAD(ststhread_);
  722. return nr_transport_addr_copy(addrp, &my_addr_);
  723. }
  724. // Close the socket so that the STS will detach and then kill it
  725. void NrSocket::close() {
  726. ASSERT_ON_THREAD(ststhread_);
  727. mCondition = NS_BASE_STREAM_CLOSED;
  728. }
  729. int NrSocket::connect(nr_transport_addr *addr) {
  730. ASSERT_ON_THREAD(ststhread_);
  731. int r,_status;
  732. PRNetAddr naddr;
  733. int32_t connect_status, getsockname_status;
  734. if ((r=nr_transport_addr_to_praddr(addr, &naddr)))
  735. ABORT(r);
  736. if(!fd_)
  737. ABORT(R_EOD);
  738. // Note: this just means we tried to connect, not that we
  739. // are actually live.
  740. connect_invoked_ = true;
  741. connect_status = PR_Connect(fd_, &naddr, PR_INTERVAL_NO_WAIT);
  742. if (connect_status != PR_SUCCESS) {
  743. if (PR_GetError() != PR_IN_PROGRESS_ERROR)
  744. ABORT(R_IO_ERROR);
  745. }
  746. // If our local address is wildcard, then fill in the
  747. // address now.
  748. if(nr_transport_addr_is_wildcard(&my_addr_)){
  749. getsockname_status = PR_GetSockName(fd_, &naddr);
  750. if (getsockname_status != PR_SUCCESS){
  751. r_log(LOG_GENERIC, LOG_CRIT, "Couldn't get sock name for socket");
  752. ABORT(R_INTERNAL);
  753. }
  754. if((r=nr_praddr_to_transport_addr(&naddr,&my_addr_,addr->protocol,1)))
  755. ABORT(r);
  756. }
  757. // Now return the WOULDBLOCK if needed.
  758. if (connect_status != PR_SUCCESS) {
  759. ABORT(R_WOULDBLOCK);
  760. }
  761. _status = 0;
  762. abort:
  763. return(_status);
  764. }
  765. int NrSocket::write(const void *msg, size_t len, size_t *written) {
  766. ASSERT_ON_THREAD(ststhread_);
  767. int _status;
  768. int32_t status;
  769. if (!connect_invoked_)
  770. ABORT(R_FAILED);
  771. status = PR_Write(fd_, msg, len);
  772. if (status < 0) {
  773. if (PR_GetError() == PR_WOULD_BLOCK_ERROR)
  774. ABORT(R_WOULDBLOCK);
  775. r_log(LOG_GENERIC, LOG_INFO, "Error in write");
  776. ABORT(R_IO_ERROR);
  777. }
  778. *written = status;
  779. _status = 0;
  780. abort:
  781. return _status;
  782. }
  783. int NrSocket::read(void* buf, size_t maxlen, size_t *len) {
  784. ASSERT_ON_THREAD(ststhread_);
  785. int _status;
  786. int32_t status;
  787. if (!connect_invoked_)
  788. ABORT(R_FAILED);
  789. status = PR_Read(fd_, buf, maxlen);
  790. if (status < 0) {
  791. if (PR_GetError() == PR_WOULD_BLOCK_ERROR)
  792. ABORT(R_WOULDBLOCK);
  793. r_log(LOG_GENERIC, LOG_INFO, "Error in read");
  794. ABORT(R_IO_ERROR);
  795. }
  796. if (status == 0)
  797. ABORT(R_EOD);
  798. *len = (size_t)status; // Guaranteed to be > 0
  799. _status = 0;
  800. abort:
  801. return(_status);
  802. }
  803. int NrSocket::listen(int backlog) {
  804. ASSERT_ON_THREAD(ststhread_);
  805. int32_t status;
  806. int _status;
  807. assert(fd_);
  808. status = PR_Listen(fd_, backlog);
  809. if (status != PR_SUCCESS) {
  810. r_log(LOG_GENERIC, LOG_CRIT, "%s: PR_GetError() == %d",
  811. __FUNCTION__, PR_GetError());
  812. ABORT(R_IO_ERROR);
  813. }
  814. _status = 0;
  815. abort:
  816. return(_status);
  817. }
  818. int NrSocket::accept(nr_transport_addr *addrp, nr_socket **sockp) {
  819. ASSERT_ON_THREAD(ststhread_);
  820. int _status, r;
  821. PRStatus status;
  822. PRFileDesc *prfd;
  823. PRNetAddr nfrom;
  824. NrSocket *sock=nullptr;
  825. nsresult rv;
  826. PRSocketOptionData opt_nonblock, opt_nodelay;
  827. nsCOMPtr<nsISocketTransportService> stservice =
  828. do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
  829. if (NS_FAILED(rv)) {
  830. ABORT(R_INTERNAL);
  831. }
  832. if(!fd_)
  833. ABORT(R_EOD);
  834. prfd = PR_Accept(fd_, &nfrom, PR_INTERVAL_NO_WAIT);
  835. if (!prfd) {
  836. if (PR_GetError() == PR_WOULD_BLOCK_ERROR)
  837. ABORT(R_WOULDBLOCK);
  838. ABORT(R_IO_ERROR);
  839. }
  840. sock = new NrSocket();
  841. sock->fd_=prfd;
  842. nr_transport_addr_copy(&sock->my_addr_, &my_addr_);
  843. if((r=nr_praddr_to_transport_addr(&nfrom, addrp, my_addr_.protocol, 0)))
  844. ABORT(r);
  845. // Set nonblocking
  846. opt_nonblock.option = PR_SockOpt_Nonblocking;
  847. opt_nonblock.value.non_blocking = PR_TRUE;
  848. status = PR_SetSocketOption(prfd, &opt_nonblock);
  849. if (status != PR_SUCCESS) {
  850. r_log(LOG_GENERIC, LOG_CRIT,
  851. "Failed to make accepted socket nonblocking: %d", status);
  852. ABORT(R_INTERNAL);
  853. }
  854. // Disable TCP Nagle
  855. opt_nodelay.option = PR_SockOpt_NoDelay;
  856. opt_nodelay.value.no_delay = PR_TRUE;
  857. status = PR_SetSocketOption(prfd, &opt_nodelay);
  858. if (status != PR_SUCCESS) {
  859. r_log(LOG_GENERIC, LOG_WARNING,
  860. "Failed to set Nodelay on accepted socket: %d", status);
  861. }
  862. // Should fail only with OOM
  863. if ((r=nr_socket_create_int(static_cast<void *>(sock), sock->vtbl(), sockp)))
  864. ABORT(r);
  865. // Remember our thread.
  866. sock->ststhread_ = do_QueryInterface(stservice, &rv);
  867. if (NS_FAILED(rv))
  868. ABORT(R_INTERNAL);
  869. // Finally, register with the STS
  870. rv = stservice->AttachSocket(prfd, sock);
  871. if (NS_FAILED(rv)) {
  872. ABORT(R_INTERNAL);
  873. }
  874. sock->connect_invoked_ = true;
  875. // Add a reference so that we can delete it in destroy()
  876. sock->AddRef();
  877. _status = 0;
  878. abort:
  879. if (_status) {
  880. delete sock;
  881. }
  882. return(_status);
  883. }
  884. NS_IMPL_ISUPPORTS(NrUdpSocketIpcProxy, nsIUDPSocketInternal)
  885. nsresult
  886. NrUdpSocketIpcProxy::Init(const RefPtr<NrUdpSocketIpc>& socket)
  887. {
  888. nsresult rv;
  889. sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
  890. if (NS_FAILED(rv)) {
  891. MOZ_ASSERT(false, "Failed to get STS thread");
  892. return rv;
  893. }
  894. socket_ = socket;
  895. return NS_OK;
  896. }
  897. NrUdpSocketIpcProxy::~NrUdpSocketIpcProxy()
  898. {
  899. // Send our ref to STS to be released
  900. RUN_ON_THREAD(sts_thread_,
  901. mozilla::WrapRelease(socket_.forget()),
  902. NS_DISPATCH_NORMAL);
  903. }
  904. // IUDPSocketInternal interfaces
  905. // callback while error happened in UDP socket operation
  906. NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerError(const nsACString &message,
  907. const nsACString &filename,
  908. uint32_t line_number) {
  909. return socket_->CallListenerError(message, filename, line_number);
  910. }
  911. // callback while receiving UDP packet
  912. NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerReceivedData(const nsACString &host,
  913. uint16_t port,
  914. const uint8_t *data,
  915. uint32_t data_length) {
  916. return socket_->CallListenerReceivedData(host, port, data, data_length);
  917. }
  918. // callback while UDP socket is opened
  919. NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerOpened() {
  920. return socket_->CallListenerOpened();
  921. }
  922. // callback while UDP socket is connected
  923. NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerConnected() {
  924. return socket_->CallListenerConnected();
  925. }
  926. // callback while UDP socket is closed
  927. NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerClosed() {
  928. return socket_->CallListenerClosed();
  929. }
  930. // NrUdpSocketIpc Implementation
  931. NrUdpSocketIpc::NrUdpSocketIpc()
  932. : NrSocketIpc(GetIOThreadAndAddUse_s()),
  933. monitor_("NrUdpSocketIpc"),
  934. err_(false),
  935. state_(NR_INIT) {
  936. }
  937. NrUdpSocketIpc::~NrUdpSocketIpc()
  938. {
  939. // also guarantees socket_child_ is released from the io_thread, and
  940. // tells the SingletonThreadHolder we're done with it
  941. #if defined(MOZILLA_INTERNAL_API)
  942. // close(), but transfer the socket_child_ reference to die as well
  943. RUN_ON_THREAD(io_thread_,
  944. mozilla::WrapRunnableNM(&NrUdpSocketIpc::release_child_i,
  945. socket_child_.forget().take(),
  946. sts_thread_),
  947. NS_DISPATCH_NORMAL);
  948. #endif
  949. }
  950. // IUDPSocketInternal interfaces
  951. // callback while error happened in UDP socket operation
  952. NS_IMETHODIMP NrUdpSocketIpc::CallListenerError(const nsACString &message,
  953. const nsACString &filename,
  954. uint32_t line_number) {
  955. ASSERT_ON_THREAD(io_thread_);
  956. r_log(LOG_GENERIC, LOG_ERR, "UDP socket error:%s at %s:%d this=%p",
  957. message.BeginReading(), filename.BeginReading(), line_number, (void*) this );
  958. ReentrantMonitorAutoEnter mon(monitor_);
  959. err_ = true;
  960. monitor_.NotifyAll();
  961. return NS_OK;
  962. }
  963. // callback while receiving UDP packet
  964. NS_IMETHODIMP NrUdpSocketIpc::CallListenerReceivedData(const nsACString &host,
  965. uint16_t port,
  966. const uint8_t *data,
  967. uint32_t data_length) {
  968. ASSERT_ON_THREAD(io_thread_);
  969. PRNetAddr addr;
  970. memset(&addr, 0, sizeof(addr));
  971. {
  972. ReentrantMonitorAutoEnter mon(monitor_);
  973. if (PR_SUCCESS != PR_StringToNetAddr(host.BeginReading(), &addr)) {
  974. err_ = true;
  975. MOZ_ASSERT(false, "Failed to convert remote host to PRNetAddr");
  976. return NS_OK;
  977. }
  978. // Use PR_IpAddrNull to avoid address being reset to 0.
  979. if (PR_SUCCESS != PR_SetNetAddr(PR_IpAddrNull, addr.raw.family, port, &addr)) {
  980. err_ = true;
  981. MOZ_ASSERT(false, "Failed to set port in PRNetAddr");
  982. return NS_OK;
  983. }
  984. }
  985. nsAutoPtr<DataBuffer> buf(new DataBuffer(data, data_length));
  986. RefPtr<nr_udp_message> msg(new nr_udp_message(addr, buf));
  987. RUN_ON_THREAD(sts_thread_,
  988. mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this),
  989. &NrUdpSocketIpc::recv_callback_s,
  990. msg),
  991. NS_DISPATCH_NORMAL);
  992. return NS_OK;
  993. }
  994. nsresult NrUdpSocketIpc::SetAddress() {
  995. uint16_t port;
  996. if (NS_FAILED(socket_child_->GetLocalPort(&port))) {
  997. err_ = true;
  998. MOZ_ASSERT(false, "Failed to get local port");
  999. return NS_OK;
  1000. }
  1001. nsAutoCString address;
  1002. if(NS_FAILED(socket_child_->GetLocalAddress(address))) {
  1003. err_ = true;
  1004. MOZ_ASSERT(false, "Failed to get local address");
  1005. return NS_OK;
  1006. }
  1007. PRNetAddr praddr;
  1008. if (PR_SUCCESS != PR_InitializeNetAddr(PR_IpAddrAny, port, &praddr)) {
  1009. err_ = true;
  1010. MOZ_ASSERT(false, "Failed to set port in PRNetAddr");
  1011. return NS_OK;
  1012. }
  1013. if (PR_SUCCESS != PR_StringToNetAddr(address.BeginReading(), &praddr)) {
  1014. err_ = true;
  1015. MOZ_ASSERT(false, "Failed to convert local host to PRNetAddr");
  1016. return NS_OK;
  1017. }
  1018. nr_transport_addr expected_addr;
  1019. if(nr_transport_addr_copy(&expected_addr, &my_addr_)) {
  1020. err_ = true;
  1021. MOZ_ASSERT(false, "Failed to copy my_addr_");
  1022. }
  1023. if (nr_praddr_to_transport_addr(&praddr, &my_addr_, IPPROTO_UDP, 1)) {
  1024. err_ = true;
  1025. MOZ_ASSERT(false, "Failed to copy local host to my_addr_");
  1026. }
  1027. if (!nr_transport_addr_is_wildcard(&expected_addr) &&
  1028. nr_transport_addr_cmp(&expected_addr, &my_addr_,
  1029. NR_TRANSPORT_ADDR_CMP_MODE_ADDR)) {
  1030. err_ = true;
  1031. MOZ_ASSERT(false, "Address of opened socket is not expected");
  1032. }
  1033. return NS_OK;
  1034. }
  1035. // callback while UDP socket is opened
  1036. NS_IMETHODIMP NrUdpSocketIpc::CallListenerOpened() {
  1037. ASSERT_ON_THREAD(io_thread_);
  1038. ReentrantMonitorAutoEnter mon(monitor_);
  1039. r_log(LOG_GENERIC, LOG_DEBUG, "UDP socket opened this=%p", (void*) this);
  1040. nsresult rv = SetAddress();
  1041. if (NS_FAILED(rv)) {
  1042. return rv;
  1043. }
  1044. mon.NotifyAll();
  1045. return NS_OK;
  1046. }
  1047. // callback while UDP socket is connected
  1048. NS_IMETHODIMP NrUdpSocketIpc::CallListenerConnected() {
  1049. ASSERT_ON_THREAD(io_thread_);
  1050. ReentrantMonitorAutoEnter mon(monitor_);
  1051. r_log(LOG_GENERIC, LOG_DEBUG, "UDP socket connected this=%p", (void*) this);
  1052. MOZ_ASSERT(state_ == NR_CONNECTED);
  1053. nsresult rv = SetAddress();
  1054. if (NS_FAILED(rv)) {
  1055. mon.NotifyAll();
  1056. return rv;
  1057. }
  1058. r_log(LOG_GENERIC, LOG_INFO, "Exit UDP socket connected");
  1059. mon.NotifyAll();
  1060. return NS_OK;
  1061. }
  1062. // callback while UDP socket is closed
  1063. NS_IMETHODIMP NrUdpSocketIpc::CallListenerClosed() {
  1064. ASSERT_ON_THREAD(io_thread_);
  1065. ReentrantMonitorAutoEnter mon(monitor_);
  1066. r_log(LOG_GENERIC, LOG_DEBUG, "UDP socket closed this=%p", (void*) this);
  1067. MOZ_ASSERT(state_ == NR_CONNECTED || state_ == NR_CLOSING);
  1068. state_ = NR_CLOSED;
  1069. return NS_OK;
  1070. }
  1071. //
  1072. // NrSocketBase methods.
  1073. //
  1074. int NrUdpSocketIpc::create(nr_transport_addr *addr) {
  1075. ASSERT_ON_THREAD(sts_thread_);
  1076. int r, _status;
  1077. nsresult rv;
  1078. int32_t port;
  1079. nsCString host;
  1080. ReentrantMonitorAutoEnter mon(monitor_);
  1081. if (state_ != NR_INIT) {
  1082. ABORT(R_INTERNAL);
  1083. }
  1084. sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
  1085. if (NS_FAILED(rv)) {
  1086. MOZ_ASSERT(false, "Failed to get STS thread");
  1087. ABORT(R_INTERNAL);
  1088. }
  1089. if ((r=nr_transport_addr_get_addrstring_and_port(addr, &host, &port))) {
  1090. ABORT(r);
  1091. }
  1092. // wildcard address will be resolved at NrUdpSocketIpc::CallListenerVoid
  1093. if ((r=nr_transport_addr_copy(&my_addr_, addr))) {
  1094. ABORT(r);
  1095. }
  1096. state_ = NR_CONNECTING;
  1097. RUN_ON_THREAD(io_thread_,
  1098. mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this),
  1099. &NrUdpSocketIpc::create_i,
  1100. host, static_cast<uint16_t>(port)),
  1101. NS_DISPATCH_NORMAL);
  1102. // Wait until socket creation complete.
  1103. mon.Wait();
  1104. if (err_) {
  1105. close();
  1106. ABORT(R_INTERNAL);
  1107. }
  1108. state_ = NR_CONNECTED;
  1109. _status = 0;
  1110. abort:
  1111. return(_status);
  1112. }
  1113. int NrUdpSocketIpc::sendto(const void *msg, size_t len, int flags,
  1114. nr_transport_addr *to) {
  1115. ASSERT_ON_THREAD(sts_thread_);
  1116. ReentrantMonitorAutoEnter mon(monitor_);
  1117. //If send err happened before, simply return the error.
  1118. if (err_) {
  1119. return R_IO_ERROR;
  1120. }
  1121. if (state_ != NR_CONNECTED) {
  1122. return R_INTERNAL;
  1123. }
  1124. int r;
  1125. net::NetAddr addr;
  1126. if ((r=nr_transport_addr_to_netaddr(to, &addr))) {
  1127. return r;
  1128. }
  1129. if (nr_is_stun_request_message((UCHAR*)msg, len) && ShouldDrop(len)) {
  1130. return R_WOULDBLOCK;
  1131. }
  1132. nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t*>(msg), len));
  1133. RUN_ON_THREAD(io_thread_,
  1134. mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this),
  1135. &NrUdpSocketIpc::sendto_i,
  1136. addr, buf),
  1137. NS_DISPATCH_NORMAL);
  1138. return 0;
  1139. }
  1140. void NrUdpSocketIpc::close() {
  1141. r_log(LOG_GENERIC, LOG_DEBUG, "NrUdpSocketIpc::close()");
  1142. ASSERT_ON_THREAD(sts_thread_);
  1143. ReentrantMonitorAutoEnter mon(monitor_);
  1144. state_ = NR_CLOSING;
  1145. RUN_ON_THREAD(io_thread_,
  1146. mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this),
  1147. &NrUdpSocketIpc::close_i),
  1148. NS_DISPATCH_NORMAL);
  1149. //remove all enqueued messages
  1150. std::queue<RefPtr<nr_udp_message> > empty;
  1151. std::swap(received_msgs_, empty);
  1152. }
  1153. int NrUdpSocketIpc::recvfrom(void *buf, size_t maxlen, size_t *len, int flags,
  1154. nr_transport_addr *from) {
  1155. ASSERT_ON_THREAD(sts_thread_);
  1156. ReentrantMonitorAutoEnter mon(monitor_);
  1157. int r, _status;
  1158. uint32_t consumed_len;
  1159. *len = 0;
  1160. if (state_ != NR_CONNECTED) {
  1161. ABORT(R_INTERNAL);
  1162. }
  1163. if (received_msgs_.empty()) {
  1164. ABORT(R_WOULDBLOCK);
  1165. }
  1166. {
  1167. RefPtr<nr_udp_message> msg(received_msgs_.front());
  1168. received_msgs_.pop();
  1169. if ((r=nr_praddr_to_transport_addr(&msg->from, from, IPPROTO_UDP, 0))) {
  1170. err_ = true;
  1171. MOZ_ASSERT(false, "Get bogus address for received UDP packet");
  1172. ABORT(r);
  1173. }
  1174. consumed_len = std::min(maxlen, msg->data->len());
  1175. if (consumed_len < msg->data->len()) {
  1176. r_log(LOG_GENERIC, LOG_DEBUG, "Partial received UDP packet will be discard");
  1177. }
  1178. memcpy(buf, msg->data->data(), consumed_len);
  1179. *len = consumed_len;
  1180. }
  1181. _status = 0;
  1182. abort:
  1183. return(_status);
  1184. }
  1185. int NrUdpSocketIpc::getaddr(nr_transport_addr *addrp) {
  1186. ASSERT_ON_THREAD(sts_thread_);
  1187. ReentrantMonitorAutoEnter mon(monitor_);
  1188. if (state_ != NR_CONNECTED) {
  1189. return R_INTERNAL;
  1190. }
  1191. return nr_transport_addr_copy(addrp, &my_addr_);
  1192. }
  1193. int NrUdpSocketIpc::connect(nr_transport_addr *addr) {
  1194. int r,_status;
  1195. int32_t port;
  1196. nsCString host;
  1197. ReentrantMonitorAutoEnter mon(monitor_);
  1198. r_log(LOG_GENERIC, LOG_DEBUG, "NrUdpSocketIpc::connect(%s) this=%p", addr->as_string,
  1199. (void*) this);
  1200. if ((r=nr_transport_addr_get_addrstring_and_port(addr, &host, &port))) {
  1201. ABORT(r);
  1202. }
  1203. RUN_ON_THREAD(io_thread_,
  1204. mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this),
  1205. &NrUdpSocketIpc::connect_i,
  1206. host, static_cast<uint16_t>(port)),
  1207. NS_DISPATCH_NORMAL);
  1208. // Wait until connect() completes.
  1209. mon.Wait();
  1210. r_log(LOG_GENERIC, LOG_DEBUG, "NrUdpSocketIpc::connect this=%p completed err_ = %s",
  1211. (void*) this, err_ ? "true" : "false");
  1212. if (err_) {
  1213. ABORT(R_INTERNAL);
  1214. }
  1215. _status = 0;
  1216. abort:
  1217. return _status;
  1218. }
  1219. int NrUdpSocketIpc::write(const void *msg, size_t len, size_t *written) {
  1220. MOZ_ASSERT(false);
  1221. return R_INTERNAL;
  1222. }
  1223. int NrUdpSocketIpc::read(void* buf, size_t maxlen, size_t *len) {
  1224. MOZ_ASSERT(false);
  1225. return R_INTERNAL;
  1226. }
  1227. int NrUdpSocketIpc::listen(int backlog) {
  1228. MOZ_ASSERT(false);
  1229. return R_INTERNAL;
  1230. }
  1231. int NrUdpSocketIpc::accept(nr_transport_addr *addrp, nr_socket **sockp) {
  1232. MOZ_ASSERT(false);
  1233. return R_INTERNAL;
  1234. }
  1235. // IO thread executors
  1236. void NrUdpSocketIpc::create_i(const nsACString &host, const uint16_t port) {
  1237. ASSERT_ON_THREAD(io_thread_);
  1238. uint32_t minBuffSize = 0;
  1239. nsresult rv;
  1240. nsCOMPtr<nsIUDPSocketChild> socketChild = do_CreateInstance("@mozilla.org/udp-socket-child;1", &rv);
  1241. if (NS_FAILED(rv)) {
  1242. ReentrantMonitorAutoEnter mon(monitor_);
  1243. err_ = true;
  1244. MOZ_ASSERT(false, "Failed to create UDPSocketChild");
  1245. return;
  1246. }
  1247. // This can spin the event loop; don't do that with the monitor held
  1248. socketChild->SetBackgroundSpinsEvents();
  1249. ReentrantMonitorAutoEnter mon(monitor_);
  1250. if (!socket_child_) {
  1251. socket_child_ = socketChild;
  1252. socket_child_->SetFilterName(nsCString(NS_NETWORK_SOCKET_FILTER_HANDLER_STUN_SUFFIX));
  1253. } else {
  1254. socketChild = nullptr;
  1255. }
  1256. RefPtr<NrUdpSocketIpcProxy> proxy(new NrUdpSocketIpcProxy);
  1257. rv = proxy->Init(this);
  1258. if (NS_FAILED(rv)) {
  1259. err_ = true;
  1260. mon.NotifyAll();
  1261. return;
  1262. }
  1263. #ifdef XP_WIN
  1264. if (!mozilla::IsWin8OrLater()) {
  1265. // Increase default receive and send buffer size on <= Win7 to be able to
  1266. // receive and send an unpaced HD (>= 720p = 1280x720 - I Frame ~ 21K size)
  1267. // stream without losing packets.
  1268. // Manual testing showed that 100K buffer size was not enough and the
  1269. // packet loss dis-appeared with 256K buffer size.
  1270. // See bug 1252769 for future improvements of this.
  1271. minBuffSize = 256 * 1024;
  1272. }
  1273. #endif
  1274. // XXX bug 1126232 - don't use null Principal!
  1275. if (NS_FAILED(socket_child_->Bind(proxy, nullptr, host, port,
  1276. /* reuse = */ false,
  1277. /* loopback = */ false,
  1278. /* recv buffer size */ minBuffSize,
  1279. /* send buffer size */ minBuffSize))) {
  1280. err_ = true;
  1281. MOZ_ASSERT(false, "Failed to create UDP socket");
  1282. mon.NotifyAll();
  1283. return;
  1284. }
  1285. }
  1286. void NrUdpSocketIpc::connect_i(const nsACString &host, const uint16_t port) {
  1287. ASSERT_ON_THREAD(io_thread_);
  1288. nsresult rv;
  1289. ReentrantMonitorAutoEnter mon(monitor_);
  1290. RefPtr<NrUdpSocketIpcProxy> proxy(new NrUdpSocketIpcProxy);
  1291. rv = proxy->Init(this);
  1292. if (NS_FAILED(rv)) {
  1293. err_ = true;
  1294. mon.NotifyAll();
  1295. return;
  1296. }
  1297. if (NS_FAILED(socket_child_->Connect(proxy, host, port))) {
  1298. err_ = true;
  1299. MOZ_ASSERT(false, "Failed to connect UDP socket");
  1300. mon.NotifyAll();
  1301. return;
  1302. }
  1303. }
  1304. void NrUdpSocketIpc::sendto_i(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf) {
  1305. ASSERT_ON_THREAD(io_thread_);
  1306. ReentrantMonitorAutoEnter mon(monitor_);
  1307. if (!socket_child_) {
  1308. MOZ_ASSERT(false);
  1309. err_ = true;
  1310. return;
  1311. }
  1312. if (NS_FAILED(socket_child_->SendWithAddress(&addr,
  1313. buf->data(),
  1314. buf->len()))) {
  1315. err_ = true;
  1316. }
  1317. }
  1318. void NrUdpSocketIpc::close_i() {
  1319. ASSERT_ON_THREAD(io_thread_);
  1320. if (socket_child_) {
  1321. socket_child_->Close();
  1322. socket_child_ = nullptr;
  1323. }
  1324. }
  1325. #if defined(MOZILLA_INTERNAL_API)
  1326. // close(), but transfer the socket_child_ reference to die as well
  1327. // static
  1328. void NrUdpSocketIpc::release_child_i(nsIUDPSocketChild* aChild,
  1329. nsCOMPtr<nsIEventTarget> sts_thread) {
  1330. RefPtr<nsIUDPSocketChild> socket_child_ref =
  1331. already_AddRefed<nsIUDPSocketChild>(aChild);
  1332. if (socket_child_ref) {
  1333. socket_child_ref->Close();
  1334. }
  1335. // Tell SingletonThreadHolder we're done with it
  1336. RUN_ON_THREAD(sts_thread,
  1337. mozilla::WrapRunnableNM(&NrUdpSocketIpc::release_use_s),
  1338. NS_DISPATCH_NORMAL);
  1339. }
  1340. void NrUdpSocketIpc::release_use_s() {
  1341. sThread->ReleaseUse();
  1342. }
  1343. #endif
  1344. void NrUdpSocketIpc::recv_callback_s(RefPtr<nr_udp_message> msg) {
  1345. ASSERT_ON_THREAD(sts_thread_);
  1346. {
  1347. ReentrantMonitorAutoEnter mon(monitor_);
  1348. if (state_ != NR_CONNECTED) {
  1349. return;
  1350. }
  1351. }
  1352. //enqueue received message
  1353. received_msgs_.push(msg);
  1354. if ((poll_flags() & PR_POLL_READ)) {
  1355. fire_callback(NR_ASYNC_WAIT_READ);
  1356. }
  1357. }
  1358. #if defined(MOZILLA_INTERNAL_API)
  1359. // TCPSocket.
  1360. class NrTcpSocketIpc::TcpSocketReadyRunner: public Runnable
  1361. {
  1362. public:
  1363. explicit TcpSocketReadyRunner(NrTcpSocketIpc *sck)
  1364. : socket_(sck) {}
  1365. NS_IMETHOD Run() override {
  1366. socket_->maybe_post_socket_ready();
  1367. return NS_OK;
  1368. }
  1369. private:
  1370. RefPtr<NrTcpSocketIpc> socket_;
  1371. };
  1372. NS_IMPL_ISUPPORTS(NrTcpSocketIpc,
  1373. nsITCPSocketCallback)
  1374. NrTcpSocketIpc::NrTcpSocketIpc(nsIThread* aThread)
  1375. : NrSocketIpc(static_cast<nsIEventTarget*>(aThread)),
  1376. mirror_state_(NR_INIT),
  1377. state_(NR_INIT),
  1378. buffered_bytes_(0),
  1379. tracking_number_(0) {
  1380. }
  1381. NrTcpSocketIpc::~NrTcpSocketIpc()
  1382. {
  1383. // also guarantees socket_child_ is released from the io_thread
  1384. // close(), but transfer the socket_child_ reference to die as well
  1385. RUN_ON_THREAD(io_thread_,
  1386. mozilla::WrapRunnableNM(&NrTcpSocketIpc::release_child_i,
  1387. socket_child_.forget().take(),
  1388. sts_thread_),
  1389. NS_DISPATCH_NORMAL);
  1390. }
  1391. //
  1392. // nsITCPSocketCallback methods
  1393. //
  1394. NS_IMETHODIMP NrTcpSocketIpc::UpdateReadyState(uint32_t aReadyState) {
  1395. NrSocketIpcState temp = NR_INIT;
  1396. switch (static_cast<dom::TCPReadyState>(aReadyState)) {
  1397. case dom::TCPReadyState::Connecting:
  1398. temp = NR_CONNECTING;
  1399. break;
  1400. case dom::TCPReadyState::Open:
  1401. temp = NR_CONNECTED;
  1402. break;
  1403. case dom::TCPReadyState::Closing:
  1404. temp = NR_CLOSING;
  1405. break;
  1406. case dom::TCPReadyState::Closed:
  1407. temp = NR_CLOSED;
  1408. break;
  1409. default:
  1410. MOZ_ASSERT(false, "Invalid ReadyState");
  1411. return NS_OK;
  1412. }
  1413. if (mirror_state_ != temp) {
  1414. mirror_state_ = temp;
  1415. RUN_ON_THREAD(sts_thread_,
  1416. mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this),
  1417. &NrTcpSocketIpc::update_state_s,
  1418. temp),
  1419. NS_DISPATCH_NORMAL);
  1420. }
  1421. return NS_OK;
  1422. }
  1423. NS_IMETHODIMP NrTcpSocketIpc::UpdateBufferedAmount(uint32_t buffered_amount,
  1424. uint32_t tracking_number) {
  1425. RUN_ON_THREAD(sts_thread_,
  1426. mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this),
  1427. &NrTcpSocketIpc::message_sent_s,
  1428. buffered_amount,
  1429. tracking_number),
  1430. NS_DISPATCH_NORMAL);
  1431. return NS_OK;
  1432. }
  1433. NS_IMETHODIMP NrTcpSocketIpc::FireDataArrayEvent(const nsAString& aType,
  1434. const InfallibleTArray<uint8_t>& buffer) {
  1435. // Called when we received data.
  1436. uint8_t *buf = const_cast<uint8_t*>(buffer.Elements());
  1437. nsAutoPtr<DataBuffer> data_buf(new DataBuffer(buf, buffer.Length()));
  1438. RefPtr<nr_tcp_message> msg = new nr_tcp_message(data_buf);
  1439. RUN_ON_THREAD(sts_thread_,
  1440. mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this),
  1441. &NrTcpSocketIpc::recv_message_s,
  1442. msg),
  1443. NS_DISPATCH_NORMAL);
  1444. return NS_OK;
  1445. }
  1446. NS_IMETHODIMP NrTcpSocketIpc::FireErrorEvent(const nsAString &type,
  1447. const nsAString &name) {
  1448. r_log(LOG_GENERIC, LOG_ERR,
  1449. "Error from TCPSocketChild: type: %s, name: %s",
  1450. NS_LossyConvertUTF16toASCII(type).get(), NS_LossyConvertUTF16toASCII(name).get());
  1451. socket_child_ = nullptr;
  1452. mirror_state_ = NR_CLOSED;
  1453. RUN_ON_THREAD(sts_thread_,
  1454. mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this),
  1455. &NrTcpSocketIpc::update_state_s,
  1456. NR_CLOSED),
  1457. NS_DISPATCH_NORMAL);
  1458. return NS_OK;
  1459. }
  1460. // methods of nsITCPSocketCallback that we are not going to implement.
  1461. NS_IMETHODIMP NrTcpSocketIpc::FireDataStringEvent(const nsAString &type,
  1462. const nsACString &data) {
  1463. return NS_ERROR_NOT_IMPLEMENTED;
  1464. }
  1465. NS_IMETHODIMP NrTcpSocketIpc::FireEvent(const nsAString &type) {
  1466. // XXX support type.mData == 'close' at least
  1467. return NS_ERROR_NOT_IMPLEMENTED;
  1468. }
  1469. //
  1470. // NrSocketBase methods.
  1471. //
  1472. int NrTcpSocketIpc::create(nr_transport_addr *addr) {
  1473. int r, _status;
  1474. nsresult rv;
  1475. int32_t port;
  1476. nsCString host;
  1477. if (state_ != NR_INIT) {
  1478. ABORT(R_INTERNAL);
  1479. }
  1480. sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
  1481. if (NS_FAILED(rv)) {
  1482. MOZ_ASSERT(false, "Failed to get STS thread");
  1483. ABORT(R_INTERNAL);
  1484. }
  1485. // Sanity check
  1486. if ((r=nr_transport_addr_get_addrstring_and_port(addr, &host, &port))) {
  1487. ABORT(r);
  1488. }
  1489. if ((r=nr_transport_addr_copy(&my_addr_, addr))) {
  1490. ABORT(r);
  1491. }
  1492. _status = 0;
  1493. abort:
  1494. return(_status);
  1495. }
  1496. int NrTcpSocketIpc::sendto(const void *msg, size_t len,
  1497. int flags, nr_transport_addr *to) {
  1498. MOZ_ASSERT(false);
  1499. return R_INTERNAL;
  1500. }
  1501. int NrTcpSocketIpc::recvfrom(void * buf, size_t maxlen,
  1502. size_t *len, int flags,
  1503. nr_transport_addr *from) {
  1504. MOZ_ASSERT(false);
  1505. return R_INTERNAL;
  1506. }
  1507. int NrTcpSocketIpc::getaddr(nr_transport_addr *addrp) {
  1508. ASSERT_ON_THREAD(sts_thread_);
  1509. return nr_transport_addr_copy(addrp, &my_addr_);
  1510. }
  1511. void NrTcpSocketIpc::close() {
  1512. ASSERT_ON_THREAD(sts_thread_);
  1513. if (state_ == NR_CLOSED || state_ == NR_CLOSING) {
  1514. return;
  1515. }
  1516. state_ = NR_CLOSING;
  1517. RUN_ON_THREAD(io_thread_,
  1518. mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this),
  1519. &NrTcpSocketIpc::close_i),
  1520. NS_DISPATCH_NORMAL);
  1521. //remove all enqueued messages
  1522. std::queue<RefPtr<nr_tcp_message>> empty;
  1523. std::swap(msg_queue_, empty);
  1524. }
  1525. int NrTcpSocketIpc::connect(nr_transport_addr *addr) {
  1526. nsCString remote_addr, local_addr;
  1527. int32_t remote_port, local_port;
  1528. int r, _status;
  1529. if ((r=nr_transport_addr_get_addrstring_and_port(addr,
  1530. &remote_addr,
  1531. &remote_port))) {
  1532. ABORT(r);
  1533. }
  1534. if ((r=nr_transport_addr_get_addrstring_and_port(&my_addr_,
  1535. &local_addr,
  1536. &local_port))) {
  1537. MOZ_ASSERT(false); // shouldn't fail as it was sanity-checked in ::create()
  1538. ABORT(r);
  1539. }
  1540. state_ = mirror_state_ = NR_CONNECTING;
  1541. RUN_ON_THREAD(io_thread_,
  1542. mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this),
  1543. &NrTcpSocketIpc::connect_i,
  1544. remote_addr,
  1545. static_cast<uint16_t>(remote_port),
  1546. local_addr,
  1547. static_cast<uint16_t>(local_port)),
  1548. NS_DISPATCH_NORMAL);
  1549. // Make caller wait for ready to write.
  1550. _status = R_WOULDBLOCK;
  1551. abort:
  1552. return _status;
  1553. }
  1554. int NrTcpSocketIpc::write(const void *msg, size_t len, size_t *written) {
  1555. ASSERT_ON_THREAD(sts_thread_);
  1556. int _status = 0;
  1557. if (state_ != NR_CONNECTED) {
  1558. ABORT(R_FAILED);
  1559. }
  1560. if (buffered_bytes_ + len >= nsITCPSocketCallback::BUFFER_SIZE) {
  1561. ABORT(R_WOULDBLOCK);
  1562. }
  1563. buffered_bytes_ += len;
  1564. {
  1565. InfallibleTArray<uint8_t>* arr = new InfallibleTArray<uint8_t>();
  1566. arr->AppendElements(static_cast<const uint8_t*>(msg), len);
  1567. // keep track of un-acknowleged writes by tracking number.
  1568. writes_in_flight_.push_back(len);
  1569. RUN_ON_THREAD(io_thread_,
  1570. mozilla::WrapRunnable(RefPtr<NrTcpSocketIpc>(this),
  1571. &NrTcpSocketIpc::write_i,
  1572. nsAutoPtr<InfallibleTArray<uint8_t>>(arr),
  1573. ++tracking_number_),
  1574. NS_DISPATCH_NORMAL);
  1575. }
  1576. *written = len;
  1577. abort:
  1578. return _status;
  1579. }
  1580. int NrTcpSocketIpc::read(void* buf, size_t maxlen, size_t *len) {
  1581. int _status = 0;
  1582. if (state_ != NR_CONNECTED) {
  1583. ABORT(R_FAILED);
  1584. }
  1585. if (msg_queue_.size() == 0) {
  1586. ABORT(R_WOULDBLOCK);
  1587. }
  1588. {
  1589. RefPtr<nr_tcp_message> msg(msg_queue_.front());
  1590. size_t consumed_len = std::min(maxlen, msg->unread_bytes());
  1591. memcpy(buf, msg->reading_pointer(), consumed_len);
  1592. if (consumed_len < msg->unread_bytes()) {
  1593. // There is still something left in buffer.
  1594. msg->read_bytes += consumed_len;
  1595. } else {
  1596. msg_queue_.pop();
  1597. }
  1598. *len = consumed_len;
  1599. }
  1600. abort:
  1601. return _status;
  1602. }
  1603. int NrTcpSocketIpc::listen(int backlog) {
  1604. return R_INTERNAL;
  1605. }
  1606. int NrTcpSocketIpc::accept(nr_transport_addr *addrp, nr_socket **sockp) {
  1607. return R_INTERNAL;
  1608. }
  1609. void NrTcpSocketIpc::connect_i(const nsACString &remote_addr,
  1610. uint16_t remote_port,
  1611. const nsACString &local_addr,
  1612. uint16_t local_port) {
  1613. ASSERT_ON_THREAD(io_thread_);
  1614. mirror_state_ = NR_CONNECTING;
  1615. dom::TCPSocketChild* child = new dom::TCPSocketChild(NS_ConvertUTF8toUTF16(remote_addr), remote_port);
  1616. socket_child_ = child;
  1617. // Bug 1285330: put filtering back in here
  1618. // XXX remove remote!
  1619. socket_child_->SendWindowlessOpenBind(this,
  1620. remote_addr, remote_port,
  1621. local_addr, local_port,
  1622. /* use ssl */ false);
  1623. }
  1624. void NrTcpSocketIpc::write_i(nsAutoPtr<InfallibleTArray<uint8_t>> arr,
  1625. uint32_t tracking_number) {
  1626. ASSERT_ON_THREAD(io_thread_);
  1627. if (!socket_child_) {
  1628. return;
  1629. }
  1630. socket_child_->SendSendArray(*arr, tracking_number);
  1631. }
  1632. void NrTcpSocketIpc::close_i() {
  1633. ASSERT_ON_THREAD(io_thread_);
  1634. mirror_state_ = NR_CLOSING;
  1635. if (!socket_child_) {
  1636. return;
  1637. }
  1638. socket_child_->SendClose();
  1639. }
  1640. // close(), but transfer the socket_child_ reference to die as well
  1641. // static
  1642. void NrTcpSocketIpc::release_child_i(dom::TCPSocketChild* aChild,
  1643. nsCOMPtr<nsIEventTarget> sts_thread) {
  1644. RefPtr<dom::TCPSocketChild> socket_child_ref =
  1645. already_AddRefed<dom::TCPSocketChild>(aChild);
  1646. if (socket_child_ref) {
  1647. socket_child_ref->SendClose();
  1648. }
  1649. // io_thread_ is MainThread, so no use to release
  1650. }
  1651. void NrTcpSocketIpc::message_sent_s(uint32_t buffered_amount,
  1652. uint32_t tracking_number) {
  1653. ASSERT_ON_THREAD(sts_thread_);
  1654. size_t num_unacked_writes = tracking_number_ - tracking_number;
  1655. while (writes_in_flight_.size() > num_unacked_writes) {
  1656. writes_in_flight_.pop_front();
  1657. }
  1658. for (size_t unacked_write_len : writes_in_flight_) {
  1659. buffered_amount += unacked_write_len;
  1660. }
  1661. r_log(LOG_GENERIC, LOG_ERR,
  1662. "UpdateBufferedAmount: (tracking %u): %u, waiting: %s",
  1663. tracking_number, buffered_amount,
  1664. (poll_flags() & PR_POLL_WRITE) ? "yes" : "no");
  1665. buffered_bytes_ = buffered_amount;
  1666. maybe_post_socket_ready();
  1667. }
  1668. void NrTcpSocketIpc::recv_message_s(nr_tcp_message *msg) {
  1669. ASSERT_ON_THREAD(sts_thread_);
  1670. msg_queue_.push(msg);
  1671. maybe_post_socket_ready();
  1672. }
  1673. void NrTcpSocketIpc::update_state_s(NrSocketIpcState next_state) {
  1674. ASSERT_ON_THREAD(sts_thread_);
  1675. // only allow valid transitions
  1676. switch (state_) {
  1677. case NR_CONNECTING:
  1678. if (next_state == NR_CONNECTED) {
  1679. state_ = NR_CONNECTED;
  1680. maybe_post_socket_ready();
  1681. } else {
  1682. state_ = next_state; // all states are valid from CONNECTING
  1683. }
  1684. break;
  1685. case NR_CONNECTED:
  1686. if (next_state != NR_CONNECTING) {
  1687. state_ = next_state;
  1688. }
  1689. break;
  1690. case NR_CLOSING:
  1691. if (next_state == NR_CLOSED) {
  1692. state_ = next_state;
  1693. }
  1694. break;
  1695. case NR_CLOSED:
  1696. break;
  1697. default:
  1698. MOZ_CRASH("update_state_s while in illegal state");
  1699. }
  1700. }
  1701. void NrTcpSocketIpc::maybe_post_socket_ready() {
  1702. bool has_event = false;
  1703. if (state_ == NR_CONNECTED) {
  1704. if (poll_flags() & PR_POLL_WRITE) {
  1705. // This effectively polls via the event loop until the
  1706. // NR_ASYNC_WAIT_WRITE is no longer armed.
  1707. if (buffered_bytes_ < nsITCPSocketCallback::BUFFER_SIZE) {
  1708. r_log(LOG_GENERIC, LOG_INFO, "Firing write callback (%u)",
  1709. (uint32_t)buffered_bytes_);
  1710. fire_callback(NR_ASYNC_WAIT_WRITE);
  1711. has_event = true;
  1712. }
  1713. }
  1714. if (poll_flags() & PR_POLL_READ) {
  1715. if (msg_queue_.size()) {
  1716. r_log(LOG_GENERIC, LOG_INFO, "Firing read callback (%u)",
  1717. (uint32_t)msg_queue_.size());
  1718. fire_callback(NR_ASYNC_WAIT_READ);
  1719. has_event = true;
  1720. }
  1721. }
  1722. }
  1723. // If any event has been posted, we post a runnable to see
  1724. // if the events have to be posted again.
  1725. if (has_event) {
  1726. RefPtr<TcpSocketReadyRunner> runnable = new TcpSocketReadyRunner(this);
  1727. NS_DispatchToCurrentThread(runnable);
  1728. }
  1729. }
  1730. #endif
  1731. } // close namespace
  1732. using namespace mozilla;
  1733. // Bridge to the nr_socket interface
  1734. static int nr_socket_local_destroy(void **objp);
  1735. static int nr_socket_local_sendto(void *obj,const void *msg, size_t len,
  1736. int flags, nr_transport_addr *to);
  1737. static int nr_socket_local_recvfrom(void *obj,void * restrict buf,
  1738. size_t maxlen, size_t *len, int flags, nr_transport_addr *from);
  1739. static int nr_socket_local_getfd(void *obj, NR_SOCKET *fd);
  1740. static int nr_socket_local_getaddr(void *obj, nr_transport_addr *addrp);
  1741. static int nr_socket_local_close(void *obj);
  1742. static int nr_socket_local_connect(void *sock, nr_transport_addr *addr);
  1743. static int nr_socket_local_write(void *obj,const void *msg, size_t len,
  1744. size_t *written);
  1745. static int nr_socket_local_read(void *obj,void * restrict buf, size_t maxlen,
  1746. size_t *len);
  1747. static int nr_socket_local_listen(void *obj, int backlog);
  1748. static int nr_socket_local_accept(void *obj, nr_transport_addr *addrp,
  1749. nr_socket **sockp);
  1750. static nr_socket_vtbl nr_socket_local_vtbl={
  1751. 2,
  1752. nr_socket_local_destroy,
  1753. nr_socket_local_sendto,
  1754. nr_socket_local_recvfrom,
  1755. nr_socket_local_getfd,
  1756. nr_socket_local_getaddr,
  1757. nr_socket_local_connect,
  1758. nr_socket_local_write,
  1759. nr_socket_local_read,
  1760. nr_socket_local_close,
  1761. nr_socket_local_listen,
  1762. nr_socket_local_accept
  1763. };
  1764. /* static */
  1765. int
  1766. NrSocketBase::CreateSocket(nr_transport_addr *addr, RefPtr<NrSocketBase> *sock)
  1767. {
  1768. int r, _status;
  1769. // create IPC bridge for content process
  1770. if (XRE_IsParentProcess()) {
  1771. *sock = new NrSocket();
  1772. } else {
  1773. switch (addr->protocol) {
  1774. case IPPROTO_UDP:
  1775. *sock = new NrUdpSocketIpc();
  1776. break;
  1777. case IPPROTO_TCP:
  1778. #if defined(MOZILLA_INTERNAL_API)
  1779. {
  1780. nsCOMPtr<nsIThread> main_thread;
  1781. NS_GetMainThread(getter_AddRefs(main_thread));
  1782. *sock = new NrTcpSocketIpc(main_thread.get());
  1783. }
  1784. #else
  1785. ABORT(R_REJECTED);
  1786. #endif
  1787. break;
  1788. }
  1789. }
  1790. r = (*sock)->create(addr);
  1791. if (r)
  1792. ABORT(r);
  1793. _status = 0;
  1794. abort:
  1795. if (_status) {
  1796. *sock = nullptr;
  1797. }
  1798. return _status;
  1799. }
  1800. int nr_socket_local_create(void *obj, nr_transport_addr *addr, nr_socket **sockp) {
  1801. RefPtr<NrSocketBase> sock;
  1802. int r, _status;
  1803. r = NrSocketBase::CreateSocket(addr, &sock);
  1804. if (r) {
  1805. ABORT(r);
  1806. }
  1807. r = nr_socket_create_int(static_cast<void *>(sock),
  1808. sock->vtbl(), sockp);
  1809. if (r)
  1810. ABORT(r);
  1811. _status = 0;
  1812. {
  1813. // We will release this reference in destroy(), not exactly the normal
  1814. // ownership model, but it is what it is.
  1815. NrSocketBase* dummy = sock.forget().take();
  1816. (void)dummy;
  1817. }
  1818. abort:
  1819. return _status;
  1820. }
  1821. static int nr_socket_local_destroy(void **objp) {
  1822. if(!objp || !*objp)
  1823. return 0;
  1824. NrSocketBase *sock = static_cast<NrSocketBase *>(*objp);
  1825. *objp = 0;
  1826. sock->close(); // Signal STS that we want not to listen
  1827. sock->Release(); // Decrement the ref count
  1828. return 0;
  1829. }
  1830. static int nr_socket_local_sendto(void *obj,const void *msg, size_t len,
  1831. int flags, nr_transport_addr *addr) {
  1832. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1833. return sock->sendto(msg, len, flags, addr);
  1834. }
  1835. static int nr_socket_local_recvfrom(void *obj,void * restrict buf,
  1836. size_t maxlen, size_t *len, int flags,
  1837. nr_transport_addr *addr) {
  1838. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1839. return sock->recvfrom(buf, maxlen, len, flags, addr);
  1840. }
  1841. static int nr_socket_local_getfd(void *obj, NR_SOCKET *fd) {
  1842. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1843. *fd = sock;
  1844. return 0;
  1845. }
  1846. static int nr_socket_local_getaddr(void *obj, nr_transport_addr *addrp) {
  1847. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1848. return sock->getaddr(addrp);
  1849. }
  1850. static int nr_socket_local_close(void *obj) {
  1851. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1852. sock->close();
  1853. return 0;
  1854. }
  1855. static int nr_socket_local_write(void *obj, const void *msg, size_t len,
  1856. size_t *written) {
  1857. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1858. return sock->write(msg, len, written);
  1859. }
  1860. static int nr_socket_local_read(void *obj, void * restrict buf, size_t maxlen,
  1861. size_t *len) {
  1862. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1863. return sock->read(buf, maxlen, len);
  1864. }
  1865. static int nr_socket_local_connect(void *obj, nr_transport_addr *addr) {
  1866. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1867. return sock->connect(addr);
  1868. }
  1869. static int nr_socket_local_listen(void *obj, int backlog) {
  1870. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1871. return sock->listen(backlog);
  1872. }
  1873. static int nr_socket_local_accept(void *obj, nr_transport_addr *addrp,
  1874. nr_socket **sockp) {
  1875. NrSocketBase *sock = static_cast<NrSocketBase *>(obj);
  1876. return sock->accept(addrp, sockp);
  1877. }
  1878. // Implement async api
  1879. int NR_async_wait(NR_SOCKET sock, int how, NR_async_cb cb,void *cb_arg,
  1880. char *function,int line) {
  1881. NrSocketBase *s = static_cast<NrSocketBase *>(sock);
  1882. return s->async_wait(how, cb, cb_arg, function, line);
  1883. }
  1884. int NR_async_cancel(NR_SOCKET sock,int how) {
  1885. NrSocketBase *s = static_cast<NrSocketBase *>(sock);
  1886. return s->cancel(how);
  1887. }
  1888. nr_socket_vtbl* NrSocketBase::vtbl() {
  1889. return &nr_socket_local_vtbl;
  1890. }