pvtcp_off_io_linux.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882
  1. /*
  2. * Linux 2.6.32 and later Kernel module for VMware MVP PVTCP Server
  3. *
  4. * Copyright (C) 2010-2013 VMware, Inc. All rights reserved.
  5. *
  6. * This program is free software; you can redistribute it and/or modify it
  7. * under the terms of the GNU General Public License version 2 as published by
  8. * the Free Software Foundation.
  9. *
  10. * This program is distributed in the hope that it will be useful, but WITHOUT
  11. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12. * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  13. * more details.
  14. *
  15. * You should have received a copy of the GNU General Public License along with
  16. * this program; see the file COPYING. If not, write to the Free Software
  17. * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  18. */
  19. #line 5
  20. /**
  21. * @file
  22. *
  23. * @brief Server (offload) side Linux-specific socket I/O functions.
  24. */
  25. #include "pvtcp.h"
  26. /*
  27. * Data.
  28. */
  29. /* Used to check if OutputAIO()-ing is likely in progress. */
  30. CommOSAtomic PvtcpOutputAIOSection;
  31. /*
  32. * Large datagram bounce buffer (PVTCP_SOCK_BUF_SIZE < size <= 64K).
  33. * Only one such buffer is available, shared across cpus via get/put.
  34. * A preallocated, smaller buffer is used for most over-size 'allocs'.
  35. * A larger, 64K-buffer may need to be __vmalloc()-ed.
  36. */
  37. typedef struct LargeDgramBuf {
  38. unsigned char buf[PVTCP_SOCK_BUF_SIZE << 1]; /* Fast buffer. */
  39. void *spareBuf; /* Dynamically allocated. */
  40. CommOSMutex lock;
  41. } LargeDgramBuf;
  42. static LargeDgramBuf largeDgramBuf;
  43. /**
  44. * @brief One time initialization of large datagram buffer.
  45. */
  46. void
  47. PvtcpOffLargeDgramBufInit(void)
  48. {
  49. largeDgramBuf.spareBuf = NULL;
  50. CommOS_MutexInit(&largeDgramBuf.lock);
  51. }
  52. /**
  53. * @brief Reserves/holds the large datagram buffer.
  54. * @param size size of buffer.
  55. * @sideeffect may sleep until the buffer is available.
  56. * @return address of buffer, or NULL if size too large or allocation failed.
  57. */
  58. static inline void *
  59. LargeDgramBufGet(int size)
  60. {
  61. static const unsigned int maxSize = 64 * 1024;
  62. /* coverity[alloc_fn] */
  63. /* coverity[var_assign] */
  64. CommOS_MutexLockUninterruptible(&largeDgramBuf.lock);
  65. if (size <= sizeof(largeDgramBuf.buf)) {
  66. return largeDgramBuf.buf;
  67. }
  68. if (size <= maxSize) {
  69. if (!largeDgramBuf.spareBuf) {
  70. largeDgramBuf.spareBuf = __vmalloc(maxSize,
  71. (GFP_ATOMIC | __GFP_HIGHMEM),
  72. PAGE_KERNEL);
  73. }
  74. if (largeDgramBuf.spareBuf) {
  75. return largeDgramBuf.spareBuf;
  76. }
  77. }
  78. CommOS_MutexUnlock(&largeDgramBuf.lock);
  79. return NULL;
  80. }
  81. /**
  82. * @brief Releases hold on the large datagram buffer.
  83. * @param buf buffer to put back.
  84. */
  85. static inline void
  86. LargeDgramBufPut(void *buf)
  87. {
  88. static unsigned int spareBufPuts;
  89. BUG_ON((buf != largeDgramBuf.buf) && (buf != largeDgramBuf.spareBuf));
  90. if (largeDgramBuf.spareBuf && (++spareBufPuts % 2) == 0) {
  91. /* Deallocate the spare buffer every now and then. */
  92. vfree(largeDgramBuf.spareBuf);
  93. largeDgramBuf.spareBuf = NULL;
  94. }
  95. CommOS_MutexUnlock(&largeDgramBuf.lock);
  96. }
  97. /*
  98. * I/O offload operations.
  99. */
  100. /**
  101. * @brief Flow control notification received when more (enough) data was
  102. * consumed from a PV socket.
  103. * @param channel communication channel with offloader
  104. * @param upperLayerState state associated with this channel
  105. * @param packet first packet received in reply
  106. * @param vec payload buffer descriptors
  107. * @param vecLen payload buffer descriptor count
  108. * @sideeffect A writer task is scheduled
  109. */
  110. void
  111. PvtcpFlowOp(CommChannel channel,
  112. void *upperLayerState,
  113. CommPacket *packet,
  114. struct kvec *vec,
  115. unsigned int vecLen)
  116. {
  117. PvtcpSock *pvsk = PvtcpGetPvskOrReturn(packet->data64, upperLayerState);
  118. PvtcpHoldSock(pvsk);
  119. PVTCP_UNLOCK_DISP_DISCARD_VEC();
  120. CommOS_SubReturnAtomic(&pvsk->rcvdSize, (int)packet->data32);
  121. PvtcpSchedSock(pvsk);
  122. PvtcpPutSock(pvsk);
  123. }
  124. /**
  125. * @brief Outputs Zero-sized datagram to socket.
  126. * @param sock socket on which to send.
  127. * @param msg message header containing destination address.
  128. * @return size of sent datagram (0), or error code.
  129. */
  130. static inline int
  131. SendZeroSizedDgram(struct socket *sock,
  132. struct msghdr *msg)
  133. {
  134. int rc;
  135. struct kvec dummy = { .iov_base = NULL, .iov_len = 0 };
  136. BUG_ON((sock == NULL) || (msg == NULL));
  137. rc = kernel_sendmsg(sock, msg, &dummy, 0, 0);
  138. if (rc != dummy.iov_len) {
  139. #if defined(PVTCP_FULL_DEBUG)
  140. CommOS_Debug(("%s: Dgram [0x%p] sent [%d], expected [%d]\n",
  141. __func__, sock, rc, dummy.iov_len));
  142. #endif
  143. if (rc == -EAGAIN) { /* As if lost on the wire. */
  144. rc = 0;
  145. }
  146. }
  147. return rc;
  148. }
  149. /**
  150. * @brief Outputs bytes to socket.
  151. * @param channel communication channel with offloader.
  152. * @param upperLayerState state associated with this channel.
  153. * @param packet received packet header.
  154. * @param vec payload buffer descriptors.
  155. * @param vecLen payload buffer descriptor count.
  156. * @sideeffect Changes send size/capacity ratio. May schedule AIO processing
  157. * for enqueued bytes, if applicable.
  158. */
  159. void
  160. PvtcpIoOp(CommChannel channel,
  161. void *upperLayerState,
  162. CommPacket *packet,
  163. struct kvec *vec,
  164. unsigned int vecLen)
  165. {
  166. int rc;
  167. unsigned int vecOff;
  168. PvtcpOffBuf *internalBuf;
  169. PvtcpSock *pvsk = PvtcpGetPvskOrReturn(packet->data64, upperLayerState);
  170. struct sock *sk = SkFromPvsk(pvsk);
  171. struct socket *sock = sk->sk_socket;
  172. unsigned int dataLen = packet->len - sizeof(*packet);
  173. struct msghdr msg = {
  174. .msg_controllen = 0,
  175. .msg_control = NULL
  176. };
  177. int tmpSize;
  178. int needSched = 0;
  179. PvtcpHoldSock(pvsk);
  180. rc = 0;
  181. if (!pvsk->peerSockSet || PvskTestFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_WR)) {
  182. PVTCP_UNLOCK_DISP_DISCARD_VEC();
  183. goto out;
  184. }
  185. tmpSize = (int)COMM_OPF_GET_VAL(packet->flags);
  186. if (tmpSize) {
  187. /* It was requested that we update deltaAckSize. */
  188. tmpSize = 1 << tmpSize;
  189. CommOS_WriteAtomic(&pvsk->deltaAckSize, tmpSize);
  190. }
  191. if (sk->sk_type == SOCK_STREAM) {
  192. unsigned int queueSize = 0;
  193. if (!SOCK_OUT_TRYLOCK(pvsk)) {
  194. if (pvsk->peerSockSet &&
  195. (sk->sk_state == TCP_ESTABLISHED) &&
  196. (CommOS_ReadAtomic(&pvsk->queueSize) == 0)) {
  197. /* Attempt to write directly as many bytes as we can. */
  198. /*
  199. * kernel_sendmsg() may use memcpy_fromiovec() that
  200. * "modifies the original iovec".
  201. */
  202. struct kvec *vecTmp = kmemdup(vec, vecLen * sizeof(*vec), GFP_ATOMIC);
  203. if (vecTmp) {
  204. msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
  205. rc = kernel_sendmsg(sock, &msg, vecTmp, vecLen, dataLen);
  206. kfree(vecTmp);
  207. if (rc == -EAGAIN) {
  208. rc = 0;
  209. }
  210. } else {
  211. rc = 0;
  212. }
  213. if (rc >= 0) {
  214. dataLen = rc;
  215. for (vecOff = 0; vecOff < vecLen; vecOff++) {
  216. if (rc >= vec[vecOff].iov_len) {
  217. /* Dispose of all fully consumed buffers. */
  218. PvtcpBufFree(vec[vecOff].iov_base);
  219. rc -= vec[vecOff].iov_len;
  220. } else {
  221. /* Place partly consumed / unconsumed buffers in queue. */
  222. internalBuf =
  223. PvtcpOffInternalFromBuf(vec[vecOff].iov_base);
  224. BUG_ON(internalBuf == NULL);
  225. if (rc > 0) {
  226. internalBuf->len -= rc;
  227. internalBuf->off += rc;
  228. rc = 0;
  229. }
  230. CommOS_ListAddTail(&pvsk->queue, &internalBuf->link);
  231. queueSize += internalBuf->len;
  232. }
  233. }
  234. if (queueSize > 0) {
  235. CommOS_AddReturnAtomic(&pvsk->queueSize, queueSize);
  236. needSched = 1;
  237. }
  238. } else {
  239. /*
  240. * We never close offload sockets unless told by the PV side,
  241. * or when the comm goes down. Getting out of sync with PV
  242. * sockets is a dangerously bad idea.
  243. * This is very likely an EPIPE/ECONNRESET.
  244. */
  245. dataLen = 0;
  246. for (vecOff = 0; vecOff < vecLen; vecOff++) {
  247. PvtcpBufFree(vec[vecOff].iov_base);
  248. }
  249. }
  250. SOCK_OUT_UNLOCK(pvsk);
  251. } else {
  252. SOCK_OUT_UNLOCK(pvsk);
  253. goto enqueueBytes;
  254. }
  255. } else {
  256. /*
  257. * We enqueue the bytes for aio processing. Note that request
  258. * level ordering is preserved since we're still under the dispatch
  259. * lock. However, accessing 'queue' must be protected via
  260. * the state lock to serialize with aio changes.
  261. * Note that the struct socket *sock may have been released, but here
  262. * we only access sk which is held (albeit potentially orphaned).
  263. */
  264. CommOSList bufList;
  265. enqueueBytes:
  266. dataLen = 0;
  267. if (pvsk->peerSockSet && (sk->sk_state == TCP_ESTABLISHED)) {
  268. queueSize = 0;
  269. CommOS_ListInit(&bufList);
  270. for (vecOff = 0; vecOff < vecLen; vecOff++) {
  271. internalBuf = PvtcpOffInternalFromBuf(vec[vecOff].iov_base);
  272. BUG_ON(internalBuf == NULL);
  273. CommOS_ListAddTail(&bufList, &internalBuf->link);
  274. queueSize += internalBuf->len;
  275. }
  276. if (queueSize > 0) {
  277. SOCK_STATE_LOCK(pvsk);
  278. CommOS_ListSpliceTail(&pvsk->queue, &bufList);
  279. SOCK_STATE_UNLOCK(pvsk);
  280. CommOS_AddReturnAtomic(&pvsk->queueSize, queueSize);
  281. needSched = 1;
  282. }
  283. } else {
  284. for (vecOff = 0; vecOff < vecLen; vecOff++) {
  285. PvtcpBufFree(vec[vecOff].iov_base);
  286. }
  287. }
  288. }
  289. } else { /* SOCK_DGRAM || SOCK_RAW */
  290. struct sockaddr *addr;
  291. struct sockaddr_in sin;
  292. struct sockaddr_in6 sin6;
  293. int addrLen;
  294. /*
  295. * Non-stream sockets don't use the send queue, packets are sent
  296. * directly and they must _not_ be merged.
  297. */
  298. if (sk->sk_family == AF_INET) {
  299. sin.sin_family = AF_INET;
  300. sin.sin_port = packet->data16;
  301. addr = (struct sockaddr *)&sin;
  302. addrLen = sizeof(sin);
  303. sin.sin_addr.s_addr = (unsigned int)packet->data64ex;
  304. PvtcpTestAndBindLoopbackInet4(pvsk, &sin.sin_addr.s_addr, 0);
  305. } else { /* AF_INET6 */
  306. sin6.sin6_family = AF_INET6;
  307. sin6.sin6_port = packet->data16;
  308. addr = (struct sockaddr *)&sin6;
  309. addrLen = sizeof(sin6);
  310. PvtcpTestAndBindLoopbackInet6(pvsk, &packet->data64ex,
  311. &packet->data64ex2, 0);
  312. PvtcpI6AddrUnpack(&sin6.sin6_addr.s6_addr32[0],
  313. packet->data64ex, packet->data64ex2);
  314. }
  315. msg.msg_flags = packet->data32 | MSG_DONTWAIT | MSG_NOSIGNAL;
  316. msg.msg_name = addr;
  317. msg.msg_namelen = addrLen;
  318. if (pvsk->peerSockSet) {
  319. /*
  320. * Flow-control already done, based on PVTCP_SOCK_SAFE_RCVSIZE, just
  321. * as with stream sockets. Meaning that we block the senders in the
  322. * guest (if applicable).
  323. *
  324. * The send buffer size was set high enough, at socket creation time,
  325. * to avoid dropping datagrams during the (non-blocking) write.
  326. */
  327. if (vecLen == 0) {
  328. rc = SendZeroSizedDgram(sock, &msg);
  329. }
  330. for (vecOff = 0; vecOff < vecLen; vecOff++) {
  331. if (vec[vecOff].iov_len == 0) {
  332. BUG_ON(vec[vecOff].iov_base != NULL);
  333. rc = SendZeroSizedDgram(sock, &msg);
  334. } else {
  335. /*
  336. * Backup iov_base as it may be modified by kernel_sendmsg().
  337. * New net/ipv4/ping.c is using memcpy_fromiovec() that
  338. * "modifies the original iovec".
  339. */
  340. void *buf = vec[vecOff].iov_base;
  341. size_t len = vec[vecOff].iov_len;
  342. internalBuf = PvtcpOffInternalFromBuf(buf);
  343. BUG_ON(internalBuf == NULL);
  344. if (internalBuf->off == USHRT_MAX) {
  345. /* Fragmented payload containing an embedded iovec. */
  346. rc = kernel_sendmsg(sock, &msg,
  347. (struct kvec *)buf,
  348. internalBuf->len, len);
  349. } else {
  350. rc = kernel_sendmsg(sock, &msg, &vec[vecOff], 1, len);
  351. }
  352. PvtcpBufFree(buf);
  353. if (rc != len) {
  354. #if defined(PVTCP_FULL_DEBUG)
  355. CommOS_Debug(("%s: Dgram [0x%p] sent [%d], expected [%d]\n",
  356. __func__, sk, rc, len));
  357. #endif
  358. if (rc == -EAGAIN) { /* As if lost on the wire. */
  359. rc = 0;
  360. }
  361. }
  362. }
  363. }
  364. if (COMM_OPF_TEST_ERR(packet->flags)) {
  365. /* PV client wants an automatic bind. */
  366. PvskSetOpFlag(pvsk, PVTCP_OP_BIND);
  367. PvtcpSchedSock(pvsk);
  368. }
  369. } else {
  370. for (vecOff = 0; vecOff < vecLen; vecOff++) {
  371. PvtcpBufFree(vec[vecOff].iov_base);
  372. }
  373. }
  374. }
  375. CommSvc_DispatchUnlock(channel);
  376. out:
  377. if (rc < 0) {
  378. pvsk->err = -rc;
  379. }
  380. tmpSize = CommOS_AddReturnAtomic(&pvsk->sentSize, dataLen);
  381. if ((tmpSize >= CommOS_ReadAtomic(&pvsk->deltaAckSize)) ||
  382. pvsk->err || needSched) {
  383. if (CommOS_AddReturnAtomic(&PvtcpOutputAIOSection, 1) == 1) {
  384. /* OutputAIO() (likely) not running. */
  385. PvtcpSchedSock(pvsk);
  386. }
  387. CommOS_SubReturnAtomic(&PvtcpOutputAIOSection, 1);
  388. }
  389. PvtcpPutSock(pvsk);
  390. }
  391. /*
  392. * AI/O functions called from the main AIO processing function.
  393. */
  394. /**
  395. * @brief Processes socket flow control acks and error notifications in an
  396. * AIO thread. This function is called with the socket 'in' lock taken.
  397. * @param[in,out] pvsk socket to process.
  398. * @param err non-zero if offload was closed, zero otherwise.
  399. * @sideeffect May resume PV socket sending or raise errors.
  400. */
  401. void
  402. PvtcpFlowAIO(PvtcpSock *pvsk,
  403. int err)
  404. {
  405. CommPacket packet = { .flags = 0 };
  406. unsigned long long timeout;
  407. int tmpSize;
  408. COMM_OPF_CLEAR_ERR(packet.flags);
  409. packet.data32 = PVTCP_FLOW_OP_INVALID_SIZE;
  410. if (pvsk->err || err) {
  411. COMM_OPF_SET_ERR(packet.flags);
  412. packet.data32ex = !pvsk->err ? 0 : xchg(&pvsk->err, 0);
  413. if (!packet.data32ex) {
  414. packet.data32ex = -err;
  415. }
  416. #if defined(PVTCP_FULL_DEBUG)
  417. CommOS_Debug(("%s: Sending socket error [%u] on [0x%p -> 0x%0x].\n",
  418. __func__, packet.data32ex, pvsk,
  419. (unsigned)(pvsk->peerSock)));
  420. #endif
  421. } else {
  422. SOCK_STATE_LOCK(pvsk);
  423. tmpSize = CommOS_ReadAtomic(&pvsk->deltaAckSize);
  424. if (CommOS_ReadAtomic(&pvsk->sentSize) >= tmpSize) {
  425. if ((SkFromPvsk(pvsk)->sk_type != SOCK_STREAM) &&
  426. !sock_writeable(SkFromPvsk(pvsk))) {
  427. /* Don't send dgram flow op until WriteSpaceCB tells us to do so. */
  428. packet.data32 = PVTCP_FLOW_OP_INVALID_SIZE;
  429. } else {
  430. packet.data32 = CommOS_ReadAtomic(&pvsk->sentSize);
  431. CommOS_WriteAtomic(&pvsk->sentSize, 0);
  432. if (tmpSize > (1 << (PVTCP_SOCK_SMALL_ACK_ORDER + 1))) {
  433. tmpSize >>= 1;
  434. CommOS_WriteAtomic(&pvsk->deltaAckSize, tmpSize);
  435. }
  436. }
  437. }
  438. SOCK_STATE_UNLOCK(pvsk);
  439. packet.data32ex = 0;
  440. }
  441. if (((packet.data32 != PVTCP_FLOW_OP_INVALID_SIZE) ||
  442. COMM_OPF_TEST_ERR(packet.flags)) &&
  443. pvsk->peerSockSet) {
  444. packet.len = sizeof(packet);
  445. packet.opCode = PVTCP_OP_FLOW;
  446. packet.data64 = pvsk->peerSock;
  447. timeout = COMM_MAX_TO;
  448. CommSvc_Write(pvsk->channel, &packet, &timeout);
  449. }
  450. }
  451. /**
  452. * @brief Processes queued socket output in an AIO thread. This function is
  453. * called with the socket 'out' lock taken.
  454. * @param[in,out] pvsk socket to process.
  455. * @sideeffect Changes send size/capacity ratio.
  456. */
  457. void
  458. PvtcpOutputAIO(PvtcpSock *pvsk)
  459. {
  460. struct sock *sk;
  461. struct socket *sock;
  462. PvtcpOffBuf *internalBuf;
  463. PvtcpOffBuf *tmp;
  464. CommOSList queue;
  465. #define VEC_SIZE 32
  466. struct kvec vec[VEC_SIZE];
  467. unsigned int vecLen;
  468. unsigned int dataLen;
  469. struct msghdr msg = {
  470. .msg_controllen = 0,
  471. .msg_control = NULL,
  472. .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL
  473. };
  474. int queueDelta = 0;
  475. int done = 0;
  476. int rc;
  477. sk = SkFromPvsk(pvsk);
  478. if (!sk) {
  479. /* This is an error socket, we don't process it. */
  480. return;
  481. }
  482. sock = sk->sk_socket;
  483. again:
  484. CommOS_AddReturnAtomic(&PvtcpOutputAIOSection, 1);
  485. while (!done && CommOS_ReadAtomic(&pvsk->queueSize) > 0) {
  486. /* Note: only stream sockets can have a positive send queue size.
  487. * Similar to PvtcpIoOp: we must check if sock (struct socket *) is
  488. * still valid.
  489. */
  490. /* Take the current queue private. */
  491. SOCK_STATE_LOCK(pvsk);
  492. queue = pvsk->queue;
  493. if (CommOS_ListEmpty(&queue)) {
  494. SOCK_STATE_UNLOCK(pvsk);
  495. return;
  496. }
  497. queue.next->prev = &queue;
  498. queue.prev->next = &queue;
  499. CommOS_ListInit(&pvsk->queue);
  500. SOCK_STATE_UNLOCK(pvsk);
  501. vecLen = 0;
  502. dataLen = 0;
  503. if (sk->sk_state == TCP_ESTABLISHED) {
  504. CommOS_ListForEach(&queue, internalBuf, link) {
  505. if (vecLen == VEC_SIZE) {
  506. break;
  507. }
  508. vec[vecLen].iov_base = PvtcpOffBufFromInternalOff(internalBuf);
  509. vec[vecLen].iov_len = internalBuf->len;
  510. dataLen += internalBuf->len;
  511. vecLen++;
  512. }
  513. rc = kernel_sendmsg(sock, &msg, vec, vecLen, dataLen);
  514. if (rc == -EAGAIN) {
  515. rc = 0;
  516. }
  517. if (rc >= 0) {
  518. /* If we wrote anything, dispose of the buffers in question. */
  519. queueDelta = rc;
  520. if (queueDelta > 0) {
  521. CommOS_ListForEachSafe(&queue, internalBuf, tmp, link) {
  522. if (rc >= internalBuf->len) {
  523. rc -= internalBuf->len;
  524. CommOS_ListDel(&internalBuf->link);
  525. PvtcpBufFree(PvtcpOffBufFromInternal(internalBuf));
  526. } else {
  527. internalBuf->len -= rc;
  528. internalBuf->off += rc;
  529. break;
  530. }
  531. }
  532. }
  533. if (!CommOS_ListEmpty(&queue)) {
  534. /* Add the remaining bytes to the beginning of the queue. */
  535. SOCK_STATE_LOCK(pvsk);
  536. CommOS_ListSplice(&pvsk->queue, &queue);
  537. SOCK_STATE_UNLOCK(pvsk);
  538. }
  539. if (queueDelta == 0) {
  540. /* Bail out if no bytes written, WriteSpaceCB() will resched. */
  541. done = 1;
  542. break;
  543. }
  544. CommOS_AddReturnAtomic(&pvsk->sentSize, queueDelta);
  545. CommOS_SubReturnAtomic(&pvsk->queueSize, queueDelta);
  546. } else {
  547. /*
  548. * Very likely, this is due to the socket being closed, so fine.
  549. */
  550. goto discardOutput;
  551. }
  552. } else {
  553. /* Dispose of all buffers in the queue and mark it empty. */
  554. discardOutput:
  555. if (!CommOS_ListEmpty(&queue)) {
  556. CommOS_ListForEachSafe(&queue, internalBuf, tmp, link) {
  557. CommOS_ListDel(&internalBuf->link);
  558. PvtcpBufFree(PvtcpOffBufFromInternal(internalBuf));
  559. }
  560. }
  561. CommOS_WriteAtomic(&pvsk->queueSize, 0);
  562. break;
  563. }
  564. }
  565. if (CommOS_SubReturnAtomic(&PvtcpOutputAIOSection, 1) > 0) {
  566. if (!done) {
  567. goto again;
  568. }
  569. }
  570. if (PvskTestFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_WR)) {
  571. kernel_sock_shutdown(sock, SHUT_WR);
  572. PvskSetFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_WR, 0);
  573. }
  574. #undef VEC_SIZE
  575. }
  576. /**
  577. * @brief Processes socket input in an AIO thread. This function is
  578. * called with the socket 'in' lock taken.
  579. * @param[in,out] pvsk socket to process.
  580. * @param[in,out] perCpuBuf per-cpu socket read buffer.
  581. * @return zero if eof was not detected, non-zero otherwise.
  582. * @sideeffect Changes receive size/capacity ratio.
  583. */
  584. int
  585. PvtcpInputAIO(PvtcpSock *pvsk,
  586. void *perCpuBuf)
  587. {
  588. struct sock *sk;
  589. struct socket *sock;
  590. int err = 0;
  591. CommPacket packet = {
  592. .opCode = PVTCP_OP_IO
  593. };
  594. unsigned long long timeout;
  595. sk = SkFromPvsk(pvsk);
  596. if (!sk) {
  597. /* IO processing is skipped on socket create-error sockets. */
  598. return -1;
  599. }
  600. if (!perCpuBuf) {
  601. /* No read buffer. */
  602. return -1;
  603. }
  604. sock = sk->sk_socket;
  605. packet.data64 = pvsk->peerSock;
  606. COMM_OPF_CLEAR_ERR(packet.flags);
  607. if (sk->sk_state == TCP_LISTEN) {
  608. /* Process stream listen 'input'. */
  609. packet.len = sizeof(packet);
  610. packet.data16 = sk->sk_ack_backlog;
  611. timeout = COMM_MAX_TO;
  612. if (pvsk->peerSockSet) {
  613. CommSvc_Write(pvsk->channel, &packet, &timeout);
  614. CommOS_Debug(("%s: Listen sock [0x%p] 'ack_backlog' [%hu].\n",
  615. __func__, sk, packet.data16));
  616. }
  617. } else {
  618. /* Common path for both stream and datagram sockets. */
  619. int rc;
  620. int tmpSize;
  621. struct kvec vec[2];
  622. void *ioBuf = perCpuBuf;
  623. struct kvec *inVec;
  624. unsigned int inVecLen;
  625. unsigned int iovOffset = 0;
  626. unsigned int inputSize = 0;
  627. unsigned int coalescingSize = PVTCP_SOCK_RCVSIZE >> 2;
  628. struct sockaddr_in sin = { .sin_family = AF_INET };
  629. struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
  630. struct msghdr msg = {
  631. .msg_controllen = 0,
  632. .msg_control = NULL,
  633. .msg_flags = MSG_DONTWAIT
  634. };
  635. int tmpFlags = msg.msg_flags;
  636. PvtcpDgramPseudoHeader dgramHeader;
  637. tmpSize = CommOS_ReadAtomic(&pvsk->rcvdSize);
  638. while ((tmpSize < PVTCP_SOCK_SAFE_RCVSIZE) && pvsk->peerSockSet) {
  639. if (ioBuf != perCpuBuf) {
  640. LargeDgramBufPut(ioBuf);
  641. ioBuf = perCpuBuf;
  642. }
  643. vec[0].iov_base = (char *)ioBuf;
  644. if (sk->sk_type == SOCK_STREAM) {
  645. if (PvskTestFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_RD)) {
  646. break;
  647. }
  648. msg.msg_name = NULL;
  649. msg.msg_namelen = 0;
  650. vec[0].iov_len = PVTCP_SOCK_STREAM_BUF_SIZE;
  651. } else { /* SOCK_DGRAM || SOCK_RAW */
  652. if (sk->sk_family == AF_INET) {
  653. msg.msg_name = &sin;
  654. msg.msg_namelen = sizeof(sin);
  655. } else {
  656. msg.msg_name = &sin6;
  657. msg.msg_namelen = sizeof(sin6);
  658. }
  659. /*
  660. * Check if datagram larger than the per cpu buffer; if so,
  661. * allocate a large enough buffer. This should happen quite
  662. * rarely, as well-behaved applications don't rely on IP
  663. * fragmentation to accommodate large sizes.
  664. */
  665. vec[0].iov_len = 1;
  666. msg.msg_flags |= (MSG_PEEK | MSG_TRUNC);
  667. rc = kernel_recvmsg(sock, &msg, vec, 1, 1, msg.msg_flags);
  668. if (rc < 0) {
  669. break;
  670. }
  671. msg.msg_flags = tmpFlags;
  672. if (rc > PVTCP_SOCK_DGRAM_BUF_SIZE) {
  673. /*
  674. * Track large datagram allocations, whether allocation succeeds
  675. * or not. No need for atomic overhead, approximating is OK.
  676. */
  677. pvtcpOffDgramAllocations++;
  678. ioBuf = LargeDgramBufGet(rc);
  679. if (!ioBuf) {
  680. /*
  681. * We reset it to the per-cpu buffer such that we can still
  682. * consume the datagram in the next recvmsg, which will set
  683. * MSG_TRUNC so we won't put it on the channel.
  684. */
  685. CommOS_Debug(("%s: Dropping datagram (alloc failure)!\n",
  686. __func__));
  687. ioBuf = perCpuBuf;
  688. vec[0].iov_len = PVTCP_SOCK_DGRAM_BUF_SIZE;
  689. } else {
  690. vec[0].iov_len = rc;
  691. }
  692. } else {
  693. vec[0].iov_len = PVTCP_SOCK_DGRAM_BUF_SIZE;
  694. }
  695. vec[0].iov_base = (char *)ioBuf;
  696. }
  697. rc = kernel_recvmsg(sock, &msg, vec, 1, vec[0].iov_len, msg.msg_flags);
  698. if (rc < 0) {
  699. break;
  700. }
  701. if ((rc == 0) && (sk->sk_type == SOCK_STREAM)) {
  702. PvskSetFlag(pvsk, PVTCP_OFF_PVSKF_SHUT_RD, 1);
  703. err = -ECONNRESET;
  704. break;
  705. }
  706. if (msg.msg_flags & MSG_TRUNC) {
  707. continue;
  708. }
  709. inputSize += rc;
  710. tmpSize = CommOS_AddReturnAtomic(&pvsk->rcvdSize, rc);
  711. if (tmpSize >= PVTCP_SOCK_LARGE_ACK_WM) {
  712. COMM_OPF_SET_VAL(packet.flags, PVTCP_SOCK_LARGE_ACK_ORDER);
  713. } else {
  714. COMM_OPF_SET_VAL(packet.flags, 0);
  715. }
  716. if (sk->sk_type == SOCK_STREAM) {
  717. vec[0].iov_base = ioBuf;
  718. vec[0].iov_len = rc;
  719. inVecLen = 1;
  720. packet.len = sizeof(packet) + rc;
  721. } else { /* SOCK_DGRAM || SOCK_RAW */
  722. if (sk->sk_family == AF_INET) {
  723. dgramHeader.d0 = (unsigned long long)sin.sin_port;
  724. PvtcpResetLoopbackInet4(pvsk, &sin.sin_addr.s_addr);
  725. dgramHeader.d1 = (unsigned long long)sin.sin_addr.s_addr;
  726. } else { /* AF_INET6 */
  727. dgramHeader.d0 = (unsigned long long)sin6.sin6_port;
  728. PvtcpResetLoopbackInet6(pvsk, &sin6.sin6_addr);
  729. PvtcpI6AddrPack(&sin6.sin6_addr.s6_addr32[0],
  730. &dgramHeader.d1, &dgramHeader.d2);
  731. }
  732. vec[0].iov_base = &dgramHeader;
  733. vec[0].iov_len = sizeof(dgramHeader);
  734. vec[1].iov_base = ioBuf;
  735. vec[1].iov_len = rc;
  736. inVecLen = 2;
  737. packet.len = sizeof(packet) + sizeof(dgramHeader) + rc;
  738. }
  739. inVec = vec;
  740. timeout = COMM_MAX_TO;
  741. rc = CommSvc_WriteVec(pvsk->channel, &packet,
  742. &inVec, &inVecLen, &timeout, &iovOffset, 1);
  743. if (rc != packet.len) {
  744. CommOS_Log(("%s: BOOG -- WROTE INCOMPLETE PACKET [%u->%d]!\n",
  745. __func__, packet.len, rc));
  746. break;
  747. }
  748. /*
  749. * If the write failed, we could print a warning. But if this
  750. * happened, the comm channel went down.
  751. */
  752. if (inputSize >= coalescingSize) {
  753. PvtcpSchedSock(pvsk); /* We must schedule ourselves back in. */
  754. break;
  755. }
  756. }
  757. if (ioBuf != perCpuBuf) {
  758. LargeDgramBufPut(ioBuf);
  759. }
  760. }
  761. return err;
  762. }