netpoll.goc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. // Copyright 2013 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
  5. package net
  6. #include "runtime.h"
  7. #include "defs.h"
  8. #include "arch.h"
  9. #include "malloc.h"
  10. // Map gccgo field names to gc field names.
  11. // Eface aka __go_empty_interface.
  12. #define type __type_descriptor
  13. #define data __object
  14. // Integrated network poller (platform-independent part).
  15. // A particular implementation (epoll/kqueue) must define the following functions:
  16. // void runtime_netpollinit(void); // to initialize the poller
  17. // int32 runtime_netpollopen(uintptr fd, PollDesc *pd); // to arm edge-triggered notifications
  18. // and associate fd with pd.
  19. // An implementation must call the following function to denote that the pd is ready.
  20. // void runtime_netpollready(G **gpp, PollDesc *pd, int32 mode);
  21. // PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
  22. // goroutines respectively. The semaphore can be in the following states:
  23. // READY - io readiness notification is pending;
  24. // a goroutine consumes the notification by changing the state to nil.
  25. // WAIT - a goroutine prepares to park on the semaphore, but not yet parked;
  26. // the goroutine commits to park by changing the state to G pointer,
  27. // or, alternatively, concurrent io notification changes the state to READY,
  28. // or, alternatively, concurrent timeout/close changes the state to nil.
  29. // G pointer - the goroutine is blocked on the semaphore;
  30. // io notification or timeout/close changes the state to READY or nil respectively
  31. // and unparks the goroutine.
  32. // nil - nothing of the above.
  33. #define READY ((G*)1)
  34. #define WAIT ((G*)2)
  35. enum
  36. {
  37. PollBlockSize = 4*1024,
  38. };
  39. struct PollDesc
  40. {
  41. PollDesc* link; // in pollcache, protected by pollcache.Lock
  42. // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
  43. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
  44. // pollReset, pollWait, pollWaitCanceled and runtime_netpollready (IO rediness notification)
  45. // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
  46. // in a lock-free way by all operations.
  47. Lock; // protectes the following fields
  48. uintptr fd;
  49. bool closing;
  50. uintptr seq; // protects from stale timers and ready notifications
  51. G* rg; // READY, WAIT, G waiting for read or nil
  52. Timer rt; // read deadline timer (set if rt.fv != nil)
  53. int64 rd; // read deadline
  54. G* wg; // READY, WAIT, G waiting for write or nil
  55. Timer wt; // write deadline timer
  56. int64 wd; // write deadline
  57. void* user; // user settable cookie
  58. };
  59. static struct
  60. {
  61. Lock;
  62. PollDesc* first;
  63. // PollDesc objects must be type-stable,
  64. // because we can get ready notification from epoll/kqueue
  65. // after the descriptor is closed/reused.
  66. // Stale notifications are detected using seq variable,
  67. // seq is incremented when deadlines are changed or descriptor is reused.
  68. } pollcache;
  69. static bool netpollblock(PollDesc*, int32, bool);
  70. static G* netpollunblock(PollDesc*, int32, bool);
  71. static void deadline(Eface, uintptr);
  72. static void readDeadline(Eface, uintptr);
  73. static void writeDeadline(Eface, uintptr);
  74. static PollDesc* allocPollDesc(void);
  75. static intgo checkerr(PollDesc *pd, int32 mode);
  76. static FuncVal deadlineFn = {(void(*)(void))deadline};
  77. static FuncVal readDeadlineFn = {(void(*)(void))readDeadline};
  78. static FuncVal writeDeadlineFn = {(void(*)(void))writeDeadline};
  79. // runtimeNano returns the current value of the runtime clock in nanoseconds.
  80. func runtimeNano() (ns int64) {
  81. ns = runtime_nanotime();
  82. }
  83. func runtime_pollServerInit() {
  84. runtime_netpollinit();
  85. }
  86. func runtime_pollOpen(fd uintptr) (pd *PollDesc, errno int) {
  87. pd = allocPollDesc();
  88. runtime_lock(pd);
  89. if(pd->wg != nil && pd->wg != READY)
  90. runtime_throw("runtime_pollOpen: blocked write on free descriptor");
  91. if(pd->rg != nil && pd->rg != READY)
  92. runtime_throw("runtime_pollOpen: blocked read on free descriptor");
  93. pd->fd = fd;
  94. pd->closing = false;
  95. pd->seq++;
  96. pd->rg = nil;
  97. pd->rd = 0;
  98. pd->wg = nil;
  99. pd->wd = 0;
  100. runtime_unlock(pd);
  101. errno = runtime_netpollopen(fd, pd);
  102. }
  103. func runtime_pollClose(pd *PollDesc) {
  104. if(!pd->closing)
  105. runtime_throw("runtime_pollClose: close w/o unblock");
  106. if(pd->wg != nil && pd->wg != READY)
  107. runtime_throw("runtime_pollClose: blocked write on closing descriptor");
  108. if(pd->rg != nil && pd->rg != READY)
  109. runtime_throw("runtime_pollClose: blocked read on closing descriptor");
  110. runtime_netpollclose(pd->fd);
  111. runtime_lock(&pollcache);
  112. pd->link = pollcache.first;
  113. pollcache.first = pd;
  114. runtime_unlock(&pollcache);
  115. }
  116. func runtime_pollReset(pd *PollDesc, mode int) (err int) {
  117. err = checkerr(pd, mode);
  118. if(err)
  119. goto ret;
  120. if(mode == 'r')
  121. pd->rg = nil;
  122. else if(mode == 'w')
  123. pd->wg = nil;
  124. ret:
  125. }
  126. func runtime_pollWait(pd *PollDesc, mode int) (err int) {
  127. err = checkerr(pd, mode);
  128. if(err == 0) {
  129. // As for now only Solaris uses level-triggered IO.
  130. if(Solaris)
  131. runtime_netpollarm(pd, mode);
  132. while(!netpollblock(pd, mode, false)) {
  133. err = checkerr(pd, mode);
  134. if(err != 0)
  135. break;
  136. // Can happen if timeout has fired and unblocked us,
  137. // but before we had a chance to run, timeout has been reset.
  138. // Pretend it has not happened and retry.
  139. }
  140. }
  141. }
  142. func runtime_pollWaitCanceled(pd *PollDesc, mode int) {
  143. // This function is used only on windows after a failed attempt to cancel
  144. // a pending async IO operation. Wait for ioready, ignore closing or timeouts.
  145. while(!netpollblock(pd, mode, true))
  146. ;
  147. }
  148. func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
  149. G *rg, *wg;
  150. runtime_lock(pd);
  151. if(pd->closing) {
  152. runtime_unlock(pd);
  153. return;
  154. }
  155. pd->seq++; // invalidate current timers
  156. // Reset current timers.
  157. if(pd->rt.fv) {
  158. runtime_deltimer(&pd->rt);
  159. pd->rt.fv = nil;
  160. }
  161. if(pd->wt.fv) {
  162. runtime_deltimer(&pd->wt);
  163. pd->wt.fv = nil;
  164. }
  165. // Setup new timers.
  166. if(d != 0 && d <= runtime_nanotime())
  167. d = -1;
  168. if(mode == 'r' || mode == 'r'+'w')
  169. pd->rd = d;
  170. if(mode == 'w' || mode == 'r'+'w')
  171. pd->wd = d;
  172. if(pd->rd > 0 && pd->rd == pd->wd) {
  173. pd->rt.fv = &deadlineFn;
  174. pd->rt.when = pd->rd;
  175. // Copy current seq into the timer arg.
  176. // Timer func will check the seq against current descriptor seq,
  177. // if they differ the descriptor was reused or timers were reset.
  178. pd->rt.arg.type = nil; // should be *pollDesc type descriptor.
  179. pd->rt.arg.data = pd;
  180. pd->rt.seq = pd->seq;
  181. runtime_addtimer(&pd->rt);
  182. } else {
  183. if(pd->rd > 0) {
  184. pd->rt.fv = &readDeadlineFn;
  185. pd->rt.when = pd->rd;
  186. pd->rt.arg.type = nil; // should be *pollDesc type descriptor.
  187. pd->rt.arg.data = pd;
  188. pd->rt.seq = pd->seq;
  189. runtime_addtimer(&pd->rt);
  190. }
  191. if(pd->wd > 0) {
  192. pd->wt.fv = &writeDeadlineFn;
  193. pd->wt.when = pd->wd;
  194. pd->wt.arg.type = nil; // should be *pollDesc type descriptor.
  195. pd->wt.arg.data = pd;
  196. pd->wt.seq = pd->seq;
  197. runtime_addtimer(&pd->wt);
  198. }
  199. }
  200. // If we set the new deadline in the past, unblock currently pending IO if any.
  201. rg = nil;
  202. runtime_atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
  203. if(pd->rd < 0)
  204. rg = netpollunblock(pd, 'r', false);
  205. if(pd->wd < 0)
  206. wg = netpollunblock(pd, 'w', false);
  207. runtime_unlock(pd);
  208. if(rg)
  209. runtime_ready(rg);
  210. if(wg)
  211. runtime_ready(wg);
  212. }
  213. func runtime_pollUnblock(pd *PollDesc) {
  214. G *rg, *wg;
  215. runtime_lock(pd);
  216. if(pd->closing)
  217. runtime_throw("runtime_pollUnblock: already closing");
  218. pd->closing = true;
  219. pd->seq++;
  220. runtime_atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock
  221. rg = netpollunblock(pd, 'r', false);
  222. wg = netpollunblock(pd, 'w', false);
  223. if(pd->rt.fv) {
  224. runtime_deltimer(&pd->rt);
  225. pd->rt.fv = nil;
  226. }
  227. if(pd->wt.fv) {
  228. runtime_deltimer(&pd->wt);
  229. pd->wt.fv = nil;
  230. }
  231. runtime_unlock(pd);
  232. if(rg)
  233. runtime_ready(rg);
  234. if(wg)
  235. runtime_ready(wg);
  236. }
  237. uintptr
  238. runtime_netpollfd(PollDesc *pd)
  239. {
  240. return pd->fd;
  241. }
  242. void**
  243. runtime_netpolluser(PollDesc *pd)
  244. {
  245. return &pd->user;
  246. }
  247. bool
  248. runtime_netpollclosing(PollDesc *pd)
  249. {
  250. return pd->closing;
  251. }
  252. void
  253. runtime_netpolllock(PollDesc *pd)
  254. {
  255. runtime_lock(pd);
  256. }
  257. void
  258. runtime_netpollunlock(PollDesc *pd)
  259. {
  260. runtime_unlock(pd);
  261. }
  262. // make pd ready, newly runnable goroutines (if any) are enqueued info gpp list
  263. void
  264. runtime_netpollready(G **gpp, PollDesc *pd, int32 mode)
  265. {
  266. G *rg, *wg;
  267. rg = wg = nil;
  268. if(mode == 'r' || mode == 'r'+'w')
  269. rg = netpollunblock(pd, 'r', true);
  270. if(mode == 'w' || mode == 'r'+'w')
  271. wg = netpollunblock(pd, 'w', true);
  272. if(rg) {
  273. rg->schedlink = *gpp;
  274. *gpp = rg;
  275. }
  276. if(wg) {
  277. wg->schedlink = *gpp;
  278. *gpp = wg;
  279. }
  280. }
  281. static intgo
  282. checkerr(PollDesc *pd, int32 mode)
  283. {
  284. if(pd->closing)
  285. return 1; // errClosing
  286. if((mode == 'r' && pd->rd < 0) || (mode == 'w' && pd->wd < 0))
  287. return 2; // errTimeout
  288. return 0;
  289. }
  290. static bool
  291. blockcommit(G *gp, G **gpp)
  292. {
  293. return runtime_casp(gpp, WAIT, gp);
  294. }
  295. // returns true if IO is ready, or false if timedout or closed
  296. // waitio - wait only for completed IO, ignore errors
  297. static bool
  298. netpollblock(PollDesc *pd, int32 mode, bool waitio)
  299. {
  300. G **gpp, *old;
  301. gpp = &pd->rg;
  302. if(mode == 'w')
  303. gpp = &pd->wg;
  304. // set the gpp semaphore to WAIT
  305. for(;;) {
  306. old = *gpp;
  307. if(old == READY) {
  308. *gpp = nil;
  309. return true;
  310. }
  311. if(old != nil)
  312. runtime_throw("netpollblock: double wait");
  313. if(runtime_casp(gpp, nil, WAIT))
  314. break;
  315. }
  316. // need to recheck error states after setting gpp to WAIT
  317. // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
  318. // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
  319. if(waitio || checkerr(pd, mode) == 0)
  320. runtime_park((bool(*)(G*, void*))blockcommit, gpp, "IO wait");
  321. // be careful to not lose concurrent READY notification
  322. old = runtime_xchgp(gpp, nil);
  323. if(old > WAIT)
  324. runtime_throw("netpollblock: corrupted state");
  325. return old == READY;
  326. }
  327. static G*
  328. netpollunblock(PollDesc *pd, int32 mode, bool ioready)
  329. {
  330. G **gpp, *old, *new;
  331. gpp = &pd->rg;
  332. if(mode == 'w')
  333. gpp = &pd->wg;
  334. for(;;) {
  335. old = *gpp;
  336. if(old == READY)
  337. return nil;
  338. if(old == nil && !ioready) {
  339. // Only set READY for ioready. runtime_pollWait
  340. // will check for timeout/cancel before waiting.
  341. return nil;
  342. }
  343. new = nil;
  344. if(ioready)
  345. new = READY;
  346. if(runtime_casp(gpp, old, new))
  347. break;
  348. }
  349. if(old > WAIT)
  350. return old; // must be G*
  351. return nil;
  352. }
  353. static void
  354. deadlineimpl(Eface arg, uintptr seq, bool read, bool write)
  355. {
  356. PollDesc *pd;
  357. G *rg, *wg;
  358. pd = (PollDesc*)arg.data;
  359. rg = wg = nil;
  360. runtime_lock(pd);
  361. // Seq arg is seq when the timer was set.
  362. // If it's stale, ignore the timer event.
  363. if(seq != pd->seq) {
  364. // The descriptor was reused or timers were reset.
  365. runtime_unlock(pd);
  366. return;
  367. }
  368. if(read) {
  369. if(pd->rd <= 0 || pd->rt.fv == nil)
  370. runtime_throw("deadlineimpl: inconsistent read deadline");
  371. pd->rd = -1;
  372. runtime_atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock
  373. rg = netpollunblock(pd, 'r', false);
  374. }
  375. if(write) {
  376. if(pd->wd <= 0 || (pd->wt.fv == nil && !read))
  377. runtime_throw("deadlineimpl: inconsistent write deadline");
  378. pd->wd = -1;
  379. runtime_atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock
  380. wg = netpollunblock(pd, 'w', false);
  381. }
  382. runtime_unlock(pd);
  383. if(rg)
  384. runtime_ready(rg);
  385. if(wg)
  386. runtime_ready(wg);
  387. }
  388. static void
  389. deadline(Eface arg, uintptr seq)
  390. {
  391. deadlineimpl(arg, seq, true, true);
  392. }
  393. static void
  394. readDeadline(Eface arg, uintptr seq)
  395. {
  396. deadlineimpl(arg, seq, true, false);
  397. }
  398. static void
  399. writeDeadline(Eface arg, uintptr seq)
  400. {
  401. deadlineimpl(arg, seq, false, true);
  402. }
  403. static PollDesc*
  404. allocPollDesc(void)
  405. {
  406. PollDesc *pd;
  407. uint32 i, n;
  408. runtime_lock(&pollcache);
  409. if(pollcache.first == nil) {
  410. n = PollBlockSize/sizeof(*pd);
  411. if(n == 0)
  412. n = 1;
  413. // Must be in non-GC memory because can be referenced
  414. // only from epoll/kqueue internals.
  415. pd = runtime_persistentalloc(n*sizeof(*pd), 0, &mstats.other_sys);
  416. for(i = 0; i < n; i++) {
  417. pd[i].link = pollcache.first;
  418. pollcache.first = &pd[i];
  419. }
  420. }
  421. pd = pollcache.first;
  422. pollcache.first = pd->link;
  423. runtime_unlock(&pollcache);
  424. return pd;
  425. }