NetconEthernetTap.cpp 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2015 ZeroTier, Inc.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * ZeroTier may be used and distributed under the terms of the GPLv3, which
  21. * are available at: http://www.gnu.org/licenses/gpl-3.0.html
  22. *
  23. * If you would like to embed ZeroTier into a commercial application or
  24. * redistribute it in a modified binary form, please contact ZeroTier Networks
  25. * LLC. Start here: http://www.zerotier.com/
  26. */
  27. #include <algorithm>
  28. #include <utility>
  29. #include <dlfcn.h>
  30. #include <sys/poll.h>
  31. #include <stdint.h>
  32. #include <utility>
  33. #include <string>
  34. #include <sys/resource.h>
  35. #include <sys/syscall.h>
  36. #include "NetconEthernetTap.hpp"
  37. #include "../node/Utils.hpp"
  38. #include "../osdep/OSUtils.hpp"
  39. #include "../osdep/Phy.hpp"
  40. #include "Intercept.h"
  41. #include "LWIPStack.hpp"
  42. #include "lwip/tcp_impl.h"
  43. #include "netif/etharp.h"
  44. #include "lwip/api.h"
  45. #include "lwip/ip.h"
  46. #include "lwip/ip_addr.h"
  47. #include "lwip/ip_frag.h"
  48. #include "lwip/tcp.h"
  49. #include "common.inc.c"
  50. #include "RPC.h"
  51. namespace ZeroTier {
  52. // ---------------------------------------------------------------------------
  53. static err_t tapif_init(struct netif *netif)
  54. {
  55. // Actual init functionality is in addIp() of tap
  56. return ERR_OK;
  57. }
  58. /*
  59. * Outputs data from the pbuf queue to the interface
  60. */
  61. static err_t low_level_output(struct netif *netif, struct pbuf *p)
  62. {
  63. struct pbuf *q;
  64. char buf[ZT_MAX_MTU+32];
  65. char *bufptr;
  66. int totalLength = 0;
  67. ZeroTier::NetconEthernetTap *tap = (ZeroTier::NetconEthernetTap*)netif->state;
  68. bufptr = buf;
  69. // Copy data from each pbuf, one at a time
  70. for(q = p; q != NULL; q = q->next) {
  71. memcpy(bufptr, q->payload, q->len);
  72. bufptr += q->len;
  73. totalLength += q->len;
  74. }
  75. // [Send packet to network]
  76. // Split ethernet header and feed into handler
  77. struct eth_hdr *ethhdr;
  78. ethhdr = (struct eth_hdr *)buf;
  79. ZeroTier::MAC src_mac;
  80. ZeroTier::MAC dest_mac;
  81. src_mac.setTo(ethhdr->src.addr, 6);
  82. dest_mac.setTo(ethhdr->dest.addr, 6);
  83. tap->_handler(tap->_arg,tap->_nwid,src_mac,dest_mac,
  84. Utils::ntoh((uint16_t)ethhdr->type),0,buf + sizeof(struct eth_hdr),totalLength - sizeof(struct eth_hdr));
  85. return ERR_OK;
  86. }
  87. // ---------------------------------------------------------------------------
  88. NetconEthernetTap::NetconEthernetTap(
  89. const char *homePath,
  90. const MAC &mac,
  91. unsigned int mtu,
  92. unsigned int metric,
  93. uint64_t nwid,
  94. const char *friendlyName,
  95. void (*handler)(void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int),
  96. void *arg) :
  97. _nwid(nwid),
  98. _handler(handler),
  99. _arg(arg),
  100. _phy(this,false,true),
  101. _unixListenSocket((PhySocket *)0),
  102. _mac(mac),
  103. _homePath(homePath),
  104. _mtu(mtu),
  105. _enabled(true),
  106. _run(true)
  107. {
  108. char sockPath[4096],lwipPath[4096];
  109. rpcCounter = -1;
  110. Utils::snprintf(sockPath,sizeof(sockPath),"%s%snc_%.16llx",homePath,ZT_PATH_SEPARATOR_S,_nwid,ZT_PATH_SEPARATOR_S,(unsigned long long)nwid);
  111. _dev = sockPath; // in netcon mode, set device to be just the network ID
  112. Utils::snprintf(lwipPath,sizeof(lwipPath),"%s%sliblwip.so",homePath,ZT_PATH_SEPARATOR_S);
  113. lwipstack = new LWIPStack(lwipPath);
  114. if(!lwipstack)
  115. throw std::runtime_error("unable to dynamically load a new instance of liblwip.so (searched ZeroTier home path)");
  116. lwipstack->lwip_init();
  117. _unixListenSocket = _phy.unixListen(sockPath,(void *)this);
  118. fprintf(stderr," NetconEthernetTap initialized on: %s\n", sockPath);
  119. if (!_unixListenSocket)
  120. throw std::runtime_error(std::string("unable to bind to ")+sockPath);
  121. _thread = Thread::start(this);
  122. }
  123. NetconEthernetTap::~NetconEthernetTap()
  124. {
  125. _run = false;
  126. _phy.whack();
  127. _phy.whack(); // TODO: Rationale?
  128. Thread::join(_thread);
  129. _phy.close(_unixListenSocket,false);
  130. delete lwipstack;
  131. }
  132. void NetconEthernetTap::setEnabled(bool en)
  133. {
  134. _enabled = en;
  135. }
  136. bool NetconEthernetTap::enabled() const
  137. {
  138. return _enabled;
  139. }
  140. bool NetconEthernetTap::addIp(const InetAddress &ip)
  141. {
  142. Mutex::Lock _l(_ips_m);
  143. if (std::find(_ips.begin(),_ips.end(),ip) == _ips.end()) {
  144. _ips.push_back(ip);
  145. std::sort(_ips.begin(),_ips.end());
  146. if (ip.isV4()) {
  147. // Set IP
  148. static ip_addr_t ipaddr, netmask, gw;
  149. IP4_ADDR(&gw,192,168,0,1);
  150. ipaddr.addr = *((u32_t *)ip.rawIpData());
  151. netmask.addr = *((u32_t *)ip.netmask().rawIpData());
  152. // Set up the lwip-netif for LWIP's sake
  153. lwipstack->netif_add(&interface,&ipaddr, &netmask, &gw, NULL, tapif_init, lwipstack->_ethernet_input);
  154. interface.state = this;
  155. interface.output = lwipstack->_etharp_output;
  156. _mac.copyTo(interface.hwaddr, 6);
  157. interface.mtu = _mtu;
  158. interface.name[0] = 't';
  159. interface.name[1] = 'p';
  160. interface.linkoutput = low_level_output;
  161. interface.hwaddr_len = 6;
  162. interface.flags = NETIF_FLAG_BROADCAST | NETIF_FLAG_ETHARP | NETIF_FLAG_IGMP;
  163. lwipstack->netif_set_default(&interface);
  164. lwipstack->netif_set_up(&interface);
  165. }
  166. }
  167. return true;
  168. }
  169. bool NetconEthernetTap::removeIp(const InetAddress &ip)
  170. {
  171. Mutex::Lock _l(_ips_m);
  172. std::vector<InetAddress>::iterator i(std::find(_ips.begin(),_ips.end(),ip));
  173. if (i == _ips.end())
  174. return false;
  175. _ips.erase(i);
  176. if (ip.isV4()) {
  177. // TODO: dealloc from LWIP
  178. }
  179. return true;
  180. }
  181. std::vector<InetAddress> NetconEthernetTap::ips() const
  182. {
  183. Mutex::Lock _l(_ips_m);
  184. return _ips;
  185. }
  186. void NetconEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len)
  187. {
  188. struct pbuf *p,*q;
  189. if (!_enabled)
  190. return;
  191. struct eth_hdr ethhdr;
  192. from.copyTo(ethhdr.src.addr, 6);
  193. to.copyTo(ethhdr.dest.addr, 6);
  194. ethhdr.type = Utils::hton((uint16_t)etherType);
  195. // We allocate a pbuf chain of pbufs from the pool.
  196. p = lwipstack->pbuf_alloc(PBUF_RAW, len+sizeof(struct eth_hdr), PBUF_POOL);
  197. if (p != NULL) {
  198. const char *dataptr = reinterpret_cast<const char *>(data);
  199. // First pbuf gets ethernet header at start
  200. q = p;
  201. if (q->len < sizeof(ethhdr)) {
  202. dwr(MSG_ERROR,"_put(): Dropped packet: first pbuf smaller than ethernet header\n");
  203. return;
  204. }
  205. memcpy(q->payload,&ethhdr,sizeof(ethhdr));
  206. memcpy((char*)q->payload + sizeof(ethhdr),dataptr,q->len - sizeof(ethhdr));
  207. dataptr += q->len - sizeof(ethhdr);
  208. // Remaining pbufs (if any) get rest of data
  209. while ((q = q->next)) {
  210. memcpy(q->payload,dataptr,q->len);
  211. dataptr += q->len;
  212. }
  213. } else {
  214. dwr(MSG_ERROR,"put(): Dropped packet: no pbufs available\n");
  215. return;
  216. }
  217. {
  218. Mutex::Lock _l2(lwipstack->_lock);
  219. if(interface.input(p, &interface) != ERR_OK) {
  220. dwr(MSG_ERROR,"put(): Error while RXing packet (netif->input)\n");
  221. }
  222. }
  223. }
  224. std::string NetconEthernetTap::deviceName() const
  225. {
  226. return _dev;
  227. }
  228. void NetconEthernetTap::setFriendlyName(const char *friendlyName) {
  229. }
  230. void NetconEthernetTap::scanMulticastGroups(std::vector<MulticastGroup> &added,std::vector<MulticastGroup> &removed)
  231. {
  232. std::vector<MulticastGroup> newGroups;
  233. Mutex::Lock _l(_multicastGroups_m);
  234. // TODO: get multicast subscriptions from LWIP
  235. std::vector<InetAddress> allIps(ips());
  236. for(std::vector<InetAddress>::iterator ip(allIps.begin());ip!=allIps.end();++ip)
  237. newGroups.push_back(MulticastGroup::deriveMulticastGroupForAddressResolution(*ip));
  238. std::sort(newGroups.begin(),newGroups.end());
  239. std::unique(newGroups.begin(),newGroups.end());
  240. for(std::vector<MulticastGroup>::iterator m(newGroups.begin());m!=newGroups.end();++m) {
  241. if (!std::binary_search(_multicastGroups.begin(),_multicastGroups.end(),*m))
  242. added.push_back(*m);
  243. }
  244. for(std::vector<MulticastGroup>::iterator m(_multicastGroups.begin());m!=_multicastGroups.end();++m) {
  245. if (!std::binary_search(newGroups.begin(),newGroups.end(),*m))
  246. removed.push_back(*m);
  247. }
  248. _multicastGroups.swap(newGroups);
  249. }
  250. void NetconEthernetTap::threadMain()
  251. throw()
  252. {
  253. uint64_t prev_tcp_time = 0, prev_status_time = 0, prev_etharp_time = 0;
  254. // Main timer loop
  255. while (_run) {
  256. uint64_t now = OSUtils::now();
  257. uint64_t since_tcp = now - prev_tcp_time;
  258. uint64_t since_etharp = now - prev_etharp_time;
  259. uint64_t since_status = now - prev_status_time;
  260. uint64_t tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL;
  261. uint64_t etharp_remaining = ARP_TMR_INTERVAL;
  262. // Connection prunning
  263. if (since_status >= STATUS_TMR_INTERVAL) {
  264. prev_status_time = now;
  265. for(size_t i=0;i<_TcpConnections.size();++i) {
  266. if(!_TcpConnections[i]->sock)
  267. continue;
  268. int fd = _phy.getDescriptor(_TcpConnections[i]->sock);
  269. dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", _TcpConnections.size(), jobmap.size());
  270. // If there's anything on the RX buf, set to notify in case we stalled
  271. if(_TcpConnections[i]->rxsz > 0)
  272. _phy.setNotifyWritable(_TcpConnections[i]->sock, true);
  273. fcntl(fd, F_SETFL, O_NONBLOCK);
  274. unsigned char tmpbuf[BUF_SZ];
  275. int n = read(fd,&tmpbuf,BUF_SZ);
  276. if(_TcpConnections[i]->pcb->state == SYN_SENT) {
  277. dwr(MSG_DEBUG_EXTRA," tap_thread(): <%x> state = SYN_SENT, should finish or be removed soon\n", _TcpConnections[i]->sock);
  278. }
  279. if((n < 0 && errno != EAGAIN) || (n == 0 && errno == EAGAIN)) {
  280. dwr(MSG_DEBUG," tap_thread(): closing sock (%x)\n", _TcpConnections[i]->sock);
  281. closeConnection(_TcpConnections[i]->sock);
  282. } else if (n > 0) {
  283. dwr(MSG_DEBUG," tap_thread(): data read during connection check (%d bytes)\n", n);
  284. phyOnUnixData(_TcpConnections[i]->sock,_phy.getuptr(_TcpConnections[i]->sock),&tmpbuf,n);
  285. }
  286. }
  287. }
  288. // Main TCP/ETHARP timer section
  289. if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) {
  290. prev_tcp_time = now;
  291. lwipstack->tcp_tmr();
  292. // Makeshift poll
  293. for(size_t i=0;i<_TcpConnections.size();++i) {
  294. if(_TcpConnections[i]->txsz > 0){
  295. lwipstack->_lock.lock();
  296. handleWrite(_TcpConnections[i]);
  297. lwipstack->_lock.unlock();
  298. }
  299. }
  300. } else {
  301. tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL - since_tcp;
  302. }
  303. if (since_etharp >= ARP_TMR_INTERVAL) {
  304. prev_etharp_time = now;
  305. lwipstack->etharp_tmr();
  306. } else {
  307. etharp_remaining = ARP_TMR_INTERVAL - since_etharp;
  308. }
  309. _phy.poll((unsigned long)std::min(tcp_remaining,etharp_remaining));
  310. }
  311. dlclose(lwipstack->_libref);
  312. }
  313. // Unused -- no UDP or TCP from this thread/Phy<>
  314. void NetconEthernetTap::phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) {}
  315. void NetconEthernetTap::phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) {}
  316. void NetconEthernetTap::phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) {}
  317. void NetconEthernetTap::phyOnTcpClose(PhySocket *sock,void **uptr) {}
  318. void NetconEthernetTap::phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) {}
  319. void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {}
  320. TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock)
  321. {
  322. for(size_t i=0;i<_TcpConnections.size();++i) {
  323. if(_TcpConnections[i]->sock == sock)
  324. return _TcpConnections[i];
  325. }
  326. return NULL;
  327. }
  328. void NetconEthernetTap::closeConnection(PhySocket *sock)
  329. {
  330. // Here we assume _tcpconns_m is already locked by caller
  331. if(!sock) {
  332. dwr(MSG_DEBUG," closeConnection(): invalid PhySocket\n");
  333. return;
  334. }
  335. TcpConnection *conn = getConnection(sock);
  336. if(!conn)
  337. return;
  338. if(conn->pcb && conn->pcb->state != CLOSED) {
  339. dwr(MSG_DEBUG," closeConnection(%x): PCB->state = %d\n", sock, conn->pcb->state);
  340. if(conn->pcb->state == SYN_SENT) {
  341. dwr(MSG_DEBUG," closeConnection(%x): invalid PCB state for this operation. ignoring.\n", sock);
  342. return;
  343. }
  344. if(lwipstack->_tcp_close(conn->pcb) == ERR_OK) {
  345. // Unregister callbacks for this PCB
  346. lwipstack->_tcp_arg(conn->pcb, NULL);
  347. lwipstack->_tcp_recv(conn->pcb, NULL);
  348. lwipstack->_tcp_err(conn->pcb, NULL);
  349. lwipstack->_tcp_sent(conn->pcb, NULL);
  350. lwipstack->_tcp_poll(conn->pcb, NULL, 1);
  351. }
  352. else {
  353. dwr(MSG_ERROR," closeConnection(%x): error while calling tcp_close()\n", sock);
  354. }
  355. }
  356. for(size_t i=0;i<_TcpConnections.size();++i) {
  357. if(_TcpConnections[i] == conn){
  358. _TcpConnections.erase(_TcpConnections.begin() + i);
  359. delete conn;
  360. break;
  361. }
  362. }
  363. if(!sock)
  364. return;
  365. close(_phy.getDescriptor(sock));
  366. _phy.close(sock, false);
  367. }
  368. void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) {
  369. Mutex::Lock _l(_tcpconns_m);
  370. closeConnection(sock);
  371. }
  372. void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked)
  373. {
  374. if(!lwip_invoked) {
  375. _tcpconns_m.lock();
  376. _rx_buf_m.lock();
  377. }
  378. TcpConnection *conn = getConnection(sock);
  379. if(conn && conn->rxsz) {
  380. int n = _phy.streamSend(conn->sock, conn->rxbuf, conn->rxsz);
  381. if(n > 0) {
  382. if(conn->rxsz-n > 0)
  383. memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxsz-n);
  384. conn->rxsz -= n;
  385. lwipstack->_tcp_recved(conn->pcb, n);
  386. } else {
  387. dwr(MSG_DEBUG," phyOnUnixWritable(): errno = %d, rxsz = %d\n", errno, conn->rxsz);
  388. _phy.setNotifyWritable(conn->sock, false);
  389. }
  390. }
  391. if(!lwip_invoked) {
  392. _tcpconns_m.unlock();
  393. _rx_buf_m.unlock();
  394. }
  395. }
  396. void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len)
  397. {
  398. uint64_t CANARY_num;
  399. pid_t pid, tid;
  400. int rpcCount, wlen = len;
  401. char cmd, timestamp[20], CANARY[CANARY_SZ], padding[] = {PADDING};
  402. void *payload;
  403. unsigned char *buf = (unsigned char*)data;
  404. std::pair<PhySocket*, void*> sockdata;
  405. PhySocket *rpcSock;
  406. bool foundJob = false, detected_rpc = false;
  407. TcpConnection *conn;
  408. // RPC
  409. char phrase[RPC_PHRASE_SZ];
  410. memset(phrase, 0, RPC_PHRASE_SZ);
  411. if(len == BUF_SZ) {
  412. memcpy(phrase, buf, RPC_PHRASE_SZ);
  413. if(strcmp(phrase, RPC_PHRASE) == 0)
  414. detected_rpc = true;
  415. }
  416. if(detected_rpc) {
  417. unloadRPC(data, pid, tid, rpcCount, timestamp, CANARY, cmd, payload);
  418. memcpy(&CANARY_num, CANARY, CANARY_SZ);
  419. dwr(MSG_DEBUG," <%x> RPC: (pid=%d, tid=%d, rpcCount=%d, timestamp=%s, cmd=%d)\n",
  420. sock, pid, tid, rpcCount, timestamp, cmd);
  421. if(cmd == RPC_SOCKET) {
  422. dwr(MSG_DEBUG," <%x> RPC_SOCKET\n", sock);
  423. // Create new lwip socket and associate it with this sock
  424. struct socket_st socket_rpc;
  425. memcpy(&socket_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct socket_st));
  426. TcpConnection * new_conn;
  427. if((new_conn = handleSocket(sock, uptr, &socket_rpc))) {
  428. new_conn->pid = pid; // Merely kept to look up application path/names later, not strictly necessary
  429. }
  430. } else {
  431. jobmap[CANARY_num] = std::make_pair<PhySocket*, void*>(sock, data);
  432. }
  433. write(_phy.getDescriptor(sock), "z", 1); // RPC ACK byte to maintain order
  434. }
  435. // STREAM
  436. else {
  437. int data_start = -1, data_end = -1, canary_pos = -1, padding_pos = -1;
  438. // Look for padding
  439. std::string padding_pattern(padding, padding+PADDING_SZ);
  440. std::string buffer(buf, buf + len);
  441. padding_pos = buffer.find(padding_pattern);
  442. canary_pos = padding_pos-CANARY_SZ;
  443. // Grab token, next we'll use it to look up an RPC job
  444. if(canary_pos > -1) {
  445. memcpy(&CANARY_num, buf+canary_pos, CANARY_SZ);
  446. if(CANARY_num != 0) {
  447. // Find job
  448. sockdata = jobmap[CANARY_num];
  449. if(!sockdata.first) {
  450. dwr(MSG_DEBUG," <%x> unable to locate job entry for %llu\n", sock, CANARY_num);
  451. return;
  452. } else
  453. foundJob = true;
  454. }
  455. }
  456. conn = getConnection(sock);
  457. if(!conn)
  458. return;
  459. if(padding_pos == -1) { // [DATA]
  460. memcpy(&conn->txbuf[conn->txsz], buf, wlen);
  461. } else { // Padding found, implies a canary is present
  462. // [CANARY]
  463. if(len == CANARY_SZ+PADDING_SZ && canary_pos == 0) {
  464. wlen = 0; // Nothing to write
  465. } else {
  466. // [CANARY] + [DATA]
  467. if(len > CANARY_SZ+PADDING_SZ && canary_pos == 0) {
  468. wlen = len - CANARY_SZ+PADDING_SZ;
  469. data_start = padding_pos+PADDING_SZ;
  470. memcpy((&conn->txbuf)+conn->txsz, buf+data_start, wlen);
  471. }
  472. // [DATA] + [CANARY]
  473. if(len > CANARY_SZ+PADDING_SZ && canary_pos > 0 && canary_pos == len - CANARY_SZ+PADDING_SZ) {
  474. wlen = len - CANARY_SZ+PADDING_SZ;
  475. data_start = 0;
  476. memcpy((&conn->txbuf)+conn->txsz, buf+data_start, wlen);
  477. }
  478. // [DATA] + [CANARY] + [DATA]
  479. if(len > CANARY_SZ+PADDING_SZ && canary_pos > 0 && len > (canary_pos + CANARY_SZ+PADDING_SZ)) {
  480. wlen = len - CANARY_SZ+PADDING_SZ;
  481. data_start = 0;
  482. data_end = padding_pos-CANARY_SZ;
  483. memcpy((&conn->txbuf)+conn->txsz, buf+data_start, (data_end-data_start)+1);
  484. memcpy((&conn->txbuf)+conn->txsz, buf+(padding_pos+PADDING_SZ), len-(canary_pos+CANARY_SZ+PADDING_SZ));
  485. }
  486. }
  487. }
  488. // Write data from stream
  489. if(conn->txsz > (DEFAULT_BUF_SZ / 2)) {
  490. _phy.setNotifyReadable(sock, false);
  491. }
  492. lwipstack->_lock.lock();
  493. conn->txsz += wlen;
  494. handleWrite(conn);
  495. lwipstack->_lock.unlock();
  496. }
  497. if(foundJob) {
  498. rpcSock = sockdata.first;
  499. buf = (unsigned char*)sockdata.second;
  500. }
  501. // Process RPC if we have a corresponding jobmap entry
  502. if(foundJob) {
  503. unloadRPC(buf, pid, tid, rpcCount, timestamp, CANARY, cmd, payload);
  504. dwr(MSG_DEBUG," <%x> RPC: (pid=%d, tid=%d, rpcCount=%d, timestamp=%s, cmd=%d)\n",
  505. sock, pid, tid, rpcCount, timestamp, cmd);
  506. switch(cmd) {
  507. case RPC_BIND:
  508. struct bind_st bind_rpc;
  509. memcpy(&bind_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct bind_st));
  510. handleBind(sock, rpcSock, uptr, &bind_rpc);
  511. break;
  512. case RPC_LISTEN:
  513. struct listen_st listen_rpc;
  514. memcpy(&listen_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct listen_st));
  515. handleListen(sock, rpcSock, uptr, &listen_rpc);
  516. break;
  517. case RPC_GETSOCKNAME:
  518. struct getsockname_st getsockname_rpc;
  519. memcpy(&getsockname_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct getsockname_st));
  520. handleGetsockname(sock, rpcSock, uptr, &getsockname_rpc);
  521. break;
  522. case RPC_CONNECT:
  523. struct connect_st connect_rpc;
  524. memcpy(&connect_rpc, &buf[IDX_PAYLOAD+STRUCT_IDX], sizeof(struct connect_st));
  525. handleConnect(sock, rpcSock, conn, &connect_rpc);
  526. jobmap.erase(CANARY_num);
  527. return; // Keep open RPC, we'll use it once in nc_connected to send retval
  528. default:
  529. break;
  530. }
  531. Mutex::Lock _l(_tcpconns_m);
  532. closeConnection(sockdata.first); // close RPC after sending retval, no longer needed
  533. jobmap.erase(CANARY_num);
  534. return;
  535. }
  536. }
  537. int NetconEthernetTap::sendReturnValue(PhySocket *sock, int retval, int _errno = 0){
  538. return sendReturnValue(_phy.getDescriptor(sock), retval, _errno);
  539. }
  540. int NetconEthernetTap::sendReturnValue(int fd, int retval, int _errno = 0)
  541. {
  542. dwr(MSG_DEBUG," sendReturnValue(): fd = %d, retval = %d, errno = %d\n", fd, retval, _errno);
  543. int sz = sizeof(char) + sizeof(retval) + sizeof(errno);
  544. char retmsg[sz];
  545. memset(&retmsg, 0, sizeof(retmsg));
  546. retmsg[0]=RPC_RETVAL;
  547. memcpy(&retmsg[1], &retval, sizeof(retval));
  548. memcpy(&retmsg[1]+sizeof(retval), &_errno, sizeof(_errno));
  549. return write(fd, &retmsg, sz);
  550. }
  551. void NetconEthernetTap::unloadRPC(void *data, pid_t &pid, pid_t &tid,
  552. int &rpcCount, char (timestamp[RPC_TIMESTAMP_SZ]), char (CANARY[sizeof(uint64_t)]), char &cmd, void* &payload)
  553. {
  554. unsigned char *buf = (unsigned char*)data;
  555. memcpy(&pid, &buf[IDX_PID], sizeof(pid_t));
  556. memcpy(&tid, &buf[IDX_TID], sizeof(pid_t));
  557. memcpy(&rpcCount, &buf[IDX_COUNT], sizeof(int));
  558. memcpy(timestamp, &buf[IDX_TIME], RPC_TIMESTAMP_SZ);
  559. memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char));
  560. memcpy(CANARY, &buf[IDX_PAYLOAD+1], CANARY_SZ);
  561. }
  562. /*------------------------------------------------------------------------------
  563. --------------------------------- LWIP callbacks -------------------------------
  564. ------------------------------------------------------------------------------*/
  565. err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newPCB, err_t err)
  566. {
  567. Larg *l = (Larg*)arg;
  568. Mutex::Lock _l(l->tap->_tcpconns_m);
  569. TcpConnection *conn = l->conn;
  570. NetconEthernetTap *tap = l->tap;
  571. if(!conn->sock)
  572. return -1;
  573. int fd = tap->_phy.getDescriptor(conn->sock);
  574. if(conn) {
  575. // create new socketpair
  576. ZT_PHY_SOCKFD_TYPE fds[2];
  577. if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) {
  578. if(errno < 0) {
  579. l->tap->sendReturnValue(conn, -1, errno);
  580. dwr(MSG_ERROR," nc_accept(): unable to create socketpair\n");
  581. return ERR_MEM;
  582. }
  583. }
  584. // create and populate new TcpConnection
  585. TcpConnection *newTcpConn = new TcpConnection();
  586. l->tap->_TcpConnections.push_back(newTcpConn);
  587. newTcpConn->pcb = newPCB;
  588. newTcpConn->sock = tap->_phy.wrapSocket(fds[0], newTcpConn);
  589. if(sock_fd_write(fd, fds[1]) < 0)
  590. return -1;
  591. tap->lwipstack->_tcp_arg(newPCB, new Larg(tap, newTcpConn));
  592. tap->lwipstack->_tcp_recv(newPCB, nc_recved);
  593. tap->lwipstack->_tcp_err(newPCB, nc_err);
  594. tap->lwipstack->_tcp_sent(newPCB, nc_sent);
  595. tap->lwipstack->_tcp_poll(newPCB, nc_poll, 1);
  596. if(conn->pcb->state == LISTEN) {
  597. dwr(MSG_DEBUG," nc_accept(): can't call tcp_accept() on LISTEN socket (pcb = %x)\n", conn->pcb);
  598. return ERR_OK;
  599. }
  600. tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections
  601. return ERR_OK;
  602. } else
  603. dwr(MSG_ERROR," nc_accept(): can't locate Connection object for PCB.\n");
  604. return -1;
  605. }
  606. err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf *p, err_t err)
  607. {
  608. Larg *l = (Larg*)arg;
  609. int tot = 0;
  610. struct pbuf* q = p;
  611. Mutex::Lock _l(l->tap->_tcpconns_m);
  612. if(!l->conn) {
  613. dwr(MSG_ERROR," nc_recved(): no connection\n");
  614. return ERR_OK;
  615. }
  616. if(p == NULL) {
  617. if(l->conn->pcb->state == CLOSE_WAIT){
  618. l->tap->closeConnection(l->conn->sock);
  619. return ERR_ABRT;
  620. }
  621. return err;
  622. }
  623. Mutex::Lock _l2(l->tap->_rx_buf_m);
  624. // Cycle through pbufs and write them to the RX buffer
  625. // The RX buffer will be emptied via phyOnUnixWritable()
  626. while(p != NULL) {
  627. if(p->len <= 0)
  628. break;
  629. int avail = DEFAULT_BUF_SZ - l->conn->rxsz;
  630. int len = p->len;
  631. if(avail < len)
  632. dwr(MSG_ERROR," nc_recved(): not enough room (%d bytes) on RX buffer\n", avail);
  633. memcpy(l->conn->rxbuf + (l->conn->rxsz), p->payload, len);
  634. l->conn->rxsz += len;
  635. p = p->next;
  636. tot += len;
  637. }
  638. if(tot) {
  639. l->tap->phyOnUnixWritable(l->conn->sock, NULL, true);
  640. l->tap->_phy.setNotifyWritable(l->conn->sock, true);
  641. }
  642. l->tap->lwipstack->_pbuf_free(q);
  643. return ERR_OK;
  644. }
  645. err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *PCB, u16_t len)
  646. {
  647. Larg *l = (Larg*)arg;
  648. Mutex::Lock _l(l->tap->_tcpconns_m);
  649. if(l->conn->probation && l->conn->txsz == 0){
  650. l->conn->probation = false; // TX buffer now empty, removing from probation
  651. }
  652. if(l && l->conn && len && !l->conn->probation) {
  653. if(l->conn->txsz < (float)DEFAULT_BUF_SOFTMAX) {
  654. l->tap->_phy.setNotifyReadable(l->conn->sock, true);
  655. l->tap->_phy.whack();
  656. }
  657. }
  658. return ERR_OK;
  659. }
  660. err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *PCB, err_t err)
  661. {
  662. Larg *l = (Larg*)arg;
  663. if(l && l->conn)
  664. l->tap->sendReturnValue(l->tap->_phy.getDescriptor(l->conn->rpcSock), ERR_OK);
  665. return ERR_OK;
  666. }
  667. err_t NetconEthernetTap::nc_poll(void* arg, struct tcp_pcb *PCB)
  668. {
  669. return ERR_OK;
  670. }
  671. void NetconEthernetTap::nc_err(void *arg, err_t err)
  672. {
  673. dwr(MSG_DEBUG,"nc_err() = %d\n", err);
  674. Larg *l = (Larg*)arg;
  675. Mutex::Lock _l(l->tap->_tcpconns_m);
  676. if(!l->conn)
  677. dwr(MSG_ERROR,"nc_err(): connection is NULL!\n");
  678. int fd = l->tap->_phy.getDescriptor(l->conn->sock);
  679. switch(err)
  680. {
  681. case ERR_MEM:
  682. dwr(MSG_ERROR,"nc_err(): ERR_MEM->ENOMEM\n");
  683. l->tap->sendReturnValue(fd, -1, ENOMEM);
  684. break;
  685. case ERR_BUF:
  686. dwr(MSG_ERROR,"nc_err(): ERR_BUF->ENOBUFS\n");
  687. l->tap->sendReturnValue(fd, -1, ENOBUFS);
  688. break;
  689. case ERR_TIMEOUT:
  690. dwr(MSG_ERROR,"nc_err(): ERR_TIMEOUT->ETIMEDOUT\n");
  691. l->tap->sendReturnValue(fd, -1, ETIMEDOUT);
  692. break;
  693. case ERR_RTE:
  694. dwr(MSG_ERROR,"nc_err(): ERR_RTE->ENETUNREACH\n");
  695. l->tap->sendReturnValue(fd, -1, ENETUNREACH);
  696. break;
  697. case ERR_INPROGRESS:
  698. dwr(MSG_ERROR,"nc_err(): ERR_INPROGRESS->EINPROGRESS\n");
  699. l->tap->sendReturnValue(fd, -1, EINPROGRESS);
  700. break;
  701. case ERR_VAL:
  702. dwr(MSG_ERROR,"nc_err(): ERR_VAL->EINVAL\n");
  703. l->tap->sendReturnValue(fd, -1, EINVAL);
  704. break;
  705. case ERR_WOULDBLOCK:
  706. dwr(MSG_ERROR,"nc_err(): ERR_WOULDBLOCK->EWOULDBLOCK\n");
  707. l->tap->sendReturnValue(fd, -1, EWOULDBLOCK);
  708. break;
  709. case ERR_USE:
  710. dwr(MSG_ERROR,"nc_err(): ERR_USE->EADDRINUSE\n");
  711. l->tap->sendReturnValue(fd, -1, EADDRINUSE);
  712. break;
  713. case ERR_ISCONN:
  714. dwr(MSG_ERROR,"nc_err(): ERR_ISCONN->EISCONN\n");
  715. l->tap->sendReturnValue(fd, -1, EISCONN);
  716. break;
  717. case ERR_ABRT:
  718. dwr(MSG_ERROR,"nc_err(): ERR_ABRT->ECONNREFUSED\n");
  719. l->tap->sendReturnValue(fd, -1, ECONNREFUSED);
  720. break;
  721. // FIXME: Below are errors which don't have a standard errno correlate
  722. case ERR_RST:
  723. l->tap->sendReturnValue(fd, -1, -1);
  724. break;
  725. case ERR_CLSD:
  726. l->tap->sendReturnValue(fd, -1, -1);
  727. break;
  728. case ERR_CONN:
  729. l->tap->sendReturnValue(fd, -1, -1);
  730. break;
  731. case ERR_ARG:
  732. l->tap->sendReturnValue(fd, -1, -1);
  733. break;
  734. case ERR_IF:
  735. l->tap->sendReturnValue(fd, -1, -1);
  736. break;
  737. default:
  738. break;
  739. }
  740. dwr(MSG_ERROR,"nc_err(): closing connection\n");
  741. l->tap->closeConnection(l->conn);
  742. }
  743. /*------------------------------------------------------------------------------
  744. ----------------------------- RPC Handler functions ----------------------------
  745. ------------------------------------------------------------------------------*/
  746. void NetconEthernetTap::handleGetsockname(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct getsockname_st *getsockname_rpc)
  747. {
  748. Mutex::Lock _l(_tcpconns_m);
  749. TcpConnection *conn = getConnection(sock);
  750. char retmsg[sizeof(struct sockaddr_storage)];
  751. memset(&retmsg, 0, sizeof(retmsg));
  752. if ((conn)&&(conn->addr))
  753. memcpy(&retmsg, conn->addr, sizeof(struct sockaddr_storage));
  754. write(_phy.getDescriptor(rpcSock), &retmsg, sizeof(struct sockaddr_storage));
  755. }
  756. void NetconEthernetTap::handleBind(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct bind_st *bind_rpc)
  757. {
  758. Mutex::Lock _l(_tcpconns_m);
  759. struct sockaddr_in *rawAddr = (struct sockaddr_in *) &bind_rpc->addr;
  760. int port = lwipstack->ntohs(rawAddr->sin_port);
  761. ip_addr_t connAddr;
  762. connAddr.addr = *((u32_t *)_ips[0].rawIpData());
  763. TcpConnection *conn = getConnection(sock);
  764. dwr(MSG_DEBUG," handleBind(%d)\n", bind_rpc->sockfd);
  765. if(conn) {
  766. if(conn->pcb->state == CLOSED){
  767. int err = lwipstack->tcp_bind(conn->pcb, &connAddr, port);
  768. int ip = rawAddr->sin_addr.s_addr;
  769. unsigned char d[4];
  770. d[0] = ip & 0xFF;
  771. d[1] = (ip >> 8) & 0xFF;
  772. d[2] = (ip >> 16) & 0xFF;
  773. d[3] = (ip >> 24) & 0xFF;
  774. dwr(MSG_DEBUG," handleBind(): %d.%d.%d.%d : %d\n", d[0],d[1],d[2],d[3], port);
  775. if(err != ERR_OK) {
  776. dwr(MSG_ERROR," handleBind(): err = %d\n", err);
  777. if(err == ERR_USE)
  778. sendReturnValue(rpcSock, -1, EADDRINUSE);
  779. if(err == ERR_MEM)
  780. sendReturnValue(rpcSock, -1, ENOMEM);
  781. if(err == ERR_BUF)
  782. sendReturnValue(rpcSock, -1, ENOMEM);
  783. } else {
  784. conn->addr = (struct sockaddr_storage *) &bind_rpc->addr;
  785. sendReturnValue(rpcSock, ERR_OK, ERR_OK); // Success
  786. }
  787. } else {
  788. dwr(MSG_ERROR," handleBind(): PCB (%x) not in CLOSED state. Ignoring BIND request.\n", conn->pcb);
  789. sendReturnValue(rpcSock, -1, EINVAL);
  790. }
  791. } else {
  792. dwr(MSG_ERROR," handleBind(): unable to locate TcpConnection.\n");
  793. sendReturnValue(rpcSock, -1, EBADF);
  794. }
  795. }
  796. void NetconEthernetTap::handleListen(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct listen_st *listen_rpc)
  797. {
  798. Mutex::Lock _l(_tcpconns_m);
  799. TcpConnection *conn = getConnection(sock);
  800. if(!conn){
  801. dwr(MSG_ERROR," handleListen(): unable to locate TcpConnection.\n");
  802. sendReturnValue(rpcSock, -1, EBADF);
  803. return;
  804. }
  805. if(conn->pcb->state == LISTEN) {
  806. dwr(MSG_ERROR," handleListen(): PCB is already in listening state.\n");
  807. sendReturnValue(rpcSock, ERR_OK, ERR_OK);
  808. return;
  809. }
  810. struct tcp_pcb* listeningPCB;
  811. #ifdef TCP_LISTEN_BACKLOG
  812. listeningPCB = lwipstack->tcp_listen_with_backlog(conn->pcb, listen_rpc->backlog);
  813. #else
  814. listeningPCB = lwipstack->tcp_listen(conn->pcb);
  815. #endif
  816. if(listeningPCB != NULL) {
  817. conn->pcb = listeningPCB;
  818. lwipstack->tcp_accept(listeningPCB, nc_accept);
  819. lwipstack->tcp_arg(listeningPCB, new Larg(this, conn));
  820. /* we need to wait for the client to send us the fd allocated on their end
  821. for this listening socket */
  822. fcntl(_phy.getDescriptor(conn->sock), F_SETFL, O_NONBLOCK);
  823. conn->listening = true;
  824. sendReturnValue(rpcSock, ERR_OK, ERR_OK);
  825. return;
  826. }
  827. sendReturnValue(rpcSock, -1, -1);
  828. }
  829. TcpConnection * NetconEthernetTap::handleSocket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc)
  830. {
  831. Mutex::Lock _l(_tcpconns_m);
  832. struct tcp_pcb *newPCB = lwipstack->tcp_new();
  833. if(newPCB != NULL) {
  834. TcpConnection *newConn = new TcpConnection();
  835. *uptr = newConn;
  836. newConn->sock = sock;
  837. newConn->pcb = newPCB;
  838. _TcpConnections.push_back(newConn);
  839. return newConn;
  840. }
  841. dwr(MSG_ERROR," handleSocket(): Memory not available for new PCB\n");
  842. sendReturnValue(_phy.getDescriptor(sock), -1, ENOMEM);
  843. return NULL;
  844. }
  845. void NetconEthernetTap::handleConnect(PhySocket *sock, PhySocket *rpcSock, TcpConnection *conn, struct connect_st* connect_rpc)
  846. {
  847. Mutex::Lock _l(_tcpconns_m);
  848. struct sockaddr_in *rawAddr = (struct sockaddr_in *) &connect_rpc->__addr;
  849. int port = lwipstack->ntohs(rawAddr->sin_port);
  850. ip_addr_t connAddr = convert_ip(rawAddr);
  851. if(conn != NULL) {
  852. lwipstack->tcp_sent(conn->pcb, nc_sent);
  853. lwipstack->tcp_recv(conn->pcb, nc_recved);
  854. lwipstack->tcp_err(conn->pcb, nc_err);
  855. lwipstack->tcp_poll(conn->pcb, nc_poll, APPLICATION_POLL_FREQ);
  856. lwipstack->tcp_arg(conn->pcb, new Larg(this, conn));
  857. int err = 0, ip = rawAddr->sin_addr.s_addr;
  858. unsigned char d[4];
  859. d[0] = ip & 0xFF;
  860. d[1] = (ip >> 8) & 0xFF;
  861. d[2] = (ip >> 16) & 0xFF;
  862. d[3] = (ip >> 24) & 0xFF;
  863. dwr(MSG_DEBUG," handleConnect(): %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], port);
  864. dwr(MSG_DEBUG," handleConnect(): pcb->state = %x\n", conn->pcb->state);
  865. if(conn->pcb->state != CLOSED) {
  866. dwr(MSG_DEBUG," handleConnect(): PCB != CLOSED, cannot connect using this PCB\n");
  867. sendReturnValue(rpcSock, -1, EAGAIN);
  868. return;
  869. }
  870. if((err = lwipstack->tcp_connect(conn->pcb,&connAddr,port,nc_connected)) < 0)
  871. {
  872. if(err == ERR_ISCONN) {
  873. sendReturnValue(rpcSock, -1, EISCONN); // Already in connected state
  874. return;
  875. } if(err == ERR_USE) {
  876. sendReturnValue(rpcSock, -1, EADDRINUSE); // Already in use
  877. return;
  878. } if(err == ERR_VAL) {
  879. sendReturnValue(rpcSock, -1, EINVAL); // Invalid ipaddress parameter
  880. return;
  881. } if(err == ERR_RTE) {
  882. sendReturnValue(rpcSock, -1, ENETUNREACH); // No route to host
  883. return;
  884. } if(err == ERR_BUF) {
  885. sendReturnValue(rpcSock, -1, EAGAIN); // No more ports available
  886. return;
  887. }
  888. if(err == ERR_MEM) {
  889. /* Can occur for the following reasons: tcp_enqueue_flags()
  890. 1) tcp_enqueue_flags is always called with either SYN or FIN in flags.
  891. We need one available snd_buf byte to do that.
  892. This means we can't send FIN while snd_buf==0. A better fix would be to
  893. not include SYN and FIN sequence numbers in the snd_buf count.
  894. 2) Cannot allocate new pbuf
  895. 3) Cannot allocate new TCP segment
  896. */
  897. sendReturnValue(rpcSock, -1, EAGAIN); // FIXME: Doesn't describe the problem well, but closest match
  898. return;
  899. }
  900. // We should only return a value if failure happens immediately
  901. // Otherwise, we still need to wait for a callback from lwIP.
  902. // - This is because an ERR_OK from tcp_connect() only verifies
  903. // that the SYN packet was enqueued onto the stack properly,
  904. // that's it!
  905. // - Most instances of a retval for a connect() should happen
  906. // in the nc_connect() and nc_err() callbacks!
  907. dwr(MSG_ERROR," handleConnect(): unable to connect\n");
  908. sendReturnValue(rpcSock, -1, EAGAIN);
  909. }
  910. // Everything seems to be ok, but we don't have enough info to retval
  911. conn->listening=true;
  912. conn->rpcSock=rpcSock; // used for return value from lwip CB
  913. } else {
  914. dwr(MSG_ERROR," handleConnect(): could not locate PCB based on their fd\n");
  915. sendReturnValue(rpcSock, -1, EBADF);
  916. }
  917. }
  918. void NetconEthernetTap::handleWrite(TcpConnection *conn)
  919. {
  920. if(!conn || !conn->pcb) {
  921. dwr(MSG_ERROR," handleWrite(): invalid connection/PCB\n");
  922. return;
  923. }
  924. // How much we are currently allowed to write to the connection
  925. int err, sz, r, sndbuf = conn->pcb->snd_buf;
  926. if(!sndbuf) {
  927. /* PCB send buffer is full, turn off readability notifications for the
  928. corresponding PhySocket until nc_sent() is called and confirms that there is
  929. now space on the buffer */
  930. if(!conn->probation) {
  931. dwr(MSG_DEBUG," handleWrite(): sndbuf == 0, LWIP stack is full\n");
  932. _phy.setNotifyReadable(conn->sock, false);
  933. conn->probation = true;
  934. }
  935. return;
  936. }
  937. if(conn->txsz <= 0)
  938. return; // Nothing to write
  939. if(!conn->listening)
  940. lwipstack->_tcp_output(conn->pcb);
  941. if(conn->sock) {
  942. r = conn->txsz < sndbuf ? conn->txsz : sndbuf;
  943. /* Writes data pulled from the client's socket buffer to LWIP. This merely sends the
  944. * data to LWIP to be enqueued and eventually sent to the network. */
  945. if(r > 0) {
  946. err = lwipstack->_tcp_write(conn->pcb, &conn->txbuf, r, TCP_WRITE_FLAG_COPY);
  947. lwipstack->_tcp_output(conn->pcb);
  948. if(err != ERR_OK) {
  949. dwr(MSG_ERROR," handleWrite(): error while writing to PCB, (err = %d)\n", err);
  950. if(err == -1)
  951. dwr(MSG_DEBUG," handleWrite(): out of memory\n");
  952. return;
  953. } else {
  954. sz = (conn->txsz)-r;
  955. if(sz)
  956. memmove(&conn->txbuf, (conn->txbuf+r), sz);
  957. conn->txsz -= r;
  958. float max = (float)DEFAULT_BUF_SZ;
  959. dwr(MSG_TRANSFER," TX ---> :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n",
  960. (float)conn->txsz / max, (float)conn->rxsz / max, conn->sock, r);
  961. return;
  962. }
  963. }
  964. }
  965. }
  966. } // namespace ZeroTier