ioselectors_epoll.nim 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements Linux epoll().
  10. import std/[posix, times, epoll]
  11. # Maximum number of events that can be returned
  12. const MAX_EPOLL_EVENTS = 64
  13. when not defined(android):
  14. type
  15. SignalFdInfo* {.importc: "struct signalfd_siginfo",
  16. header: "<sys/signalfd.h>", pure, final.} = object
  17. ssi_signo*: uint32
  18. ssi_errno*: int32
  19. ssi_code*: int32
  20. ssi_pid*: uint32
  21. ssi_uid*: uint32
  22. ssi_fd*: int32
  23. ssi_tid*: uint32
  24. ssi_band*: uint32
  25. ssi_overrun*: uint32
  26. ssi_trapno*: uint32
  27. ssi_status*: int32
  28. ssi_int*: int32
  29. ssi_ptr*: uint64
  30. ssi_utime*: uint64
  31. ssi_stime*: uint64
  32. ssi_addr*: uint64
  33. pad* {.importc: "__pad".}: array[0..47, uint8]
  34. proc timerfd_create(clock_id: ClockId, flags: cint): cint
  35. {.cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".}
  36. proc timerfd_settime(ufd: cint, flags: cint,
  37. utmr: var Itimerspec, otmr: var Itimerspec): cint
  38. {.cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".}
  39. proc eventfd(count: cuint, flags: cint): cint
  40. {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
  41. when not defined(android):
  42. proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint
  43. {.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
  44. when hasThreadSupport:
  45. type
  46. SelectorImpl[T] = object
  47. epollFD: cint
  48. maxFD: int
  49. numFD: int
  50. fds: ptr SharedArray[SelectorKey[T]]
  51. count*: int
  52. Selector*[T] = ptr SelectorImpl[T]
  53. else:
  54. type
  55. SelectorImpl[T] = object
  56. epollFD: cint
  57. maxFD: int
  58. numFD: int
  59. fds: seq[SelectorKey[T]]
  60. count*: int
  61. Selector*[T] = ref SelectorImpl[T]
  62. type
  63. SelectEventImpl = object
  64. efd: cint
  65. SelectEvent* = ptr SelectEventImpl
  66. proc newSelector*[T](): Selector[T] =
  67. proc initialNumFD(): int {.inline.} =
  68. when defined(nuttx):
  69. result = NEPOLL_MAX
  70. else:
  71. result = 1024
  72. # Retrieve the maximum fd count (for current OS) via getrlimit()
  73. var maxFD = maxDescriptors()
  74. doAssert(maxFD > 0)
  75. # Start with a reasonable size, checkFd() will grow this on demand
  76. let numFD = initialNumFD()
  77. var epollFD = epoll_create1(O_CLOEXEC)
  78. if epollFD < 0:
  79. raiseOSError(osLastError())
  80. when hasThreadSupport:
  81. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  82. result.epollFD = epollFD
  83. result.maxFD = maxFD
  84. result.numFD = numFD
  85. result.fds = allocSharedArray[SelectorKey[T]](numFD)
  86. else:
  87. result = Selector[T]()
  88. result.epollFD = epollFD
  89. result.maxFD = maxFD
  90. result.numFD = numFD
  91. result.fds = newSeq[SelectorKey[T]](numFD)
  92. for i in 0 ..< numFD:
  93. result.fds[i].ident = InvalidIdent
  94. proc close*[T](s: Selector[T]) =
  95. let res = posix.close(s.epollFD)
  96. when hasThreadSupport:
  97. deallocSharedArray(s.fds)
  98. deallocShared(cast[pointer](s))
  99. if res != 0:
  100. raiseIOSelectorsError(osLastError())
  101. proc newSelectEvent*(): SelectEvent =
  102. let fdci = eventfd(0, O_CLOEXEC or O_NONBLOCK)
  103. if fdci == -1:
  104. raiseIOSelectorsError(osLastError())
  105. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  106. result.efd = fdci
  107. proc trigger*(ev: SelectEvent) =
  108. var data: uint64 = 1
  109. if posix.write(ev.efd, addr data, sizeof(uint64)) == -1:
  110. raiseIOSelectorsError(osLastError())
  111. proc close*(ev: SelectEvent) =
  112. let res = posix.close(ev.efd)
  113. deallocShared(cast[pointer](ev))
  114. if res != 0:
  115. raiseIOSelectorsError(osLastError())
  116. template checkFd(s, f) =
  117. # TODO: I don't see how this can ever happen. You won't be able to create an
  118. # FD if there is too many. -- DP
  119. if f >= s.maxFD:
  120. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  121. if f >= s.numFD:
  122. var numFD = s.numFD
  123. while numFD <= f: numFD *= 2
  124. when hasThreadSupport:
  125. s.fds = reallocSharedArray(s.fds, s.numFD, numFD)
  126. else:
  127. s.fds.setLen(numFD)
  128. for i in s.numFD ..< numFD:
  129. s.fds[i].ident = InvalidIdent
  130. s.numFD = numFD
  131. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  132. events: set[Event], data: T) =
  133. let fdi = int(fd)
  134. s.checkFd(fdi)
  135. doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor $# already registered" % $fdi)
  136. s.setKey(fdi, events, 0, data)
  137. if events != {}:
  138. var epv = EpollEvent(events: EPOLLRDHUP)
  139. epv.data.u64 = fdi.uint
  140. if Event.Read in events: epv.events = epv.events or EPOLLIN
  141. if Event.Write in events: epv.events = epv.events or EPOLLOUT
  142. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  143. raiseIOSelectorsError(osLastError())
  144. inc(s.count)
  145. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) =
  146. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  147. Event.User, Event.Oneshot, Event.Error}
  148. let fdi = int(fd)
  149. s.checkFd(fdi)
  150. var pkey = addr(s.fds[fdi])
  151. doAssert(pkey.ident != InvalidIdent,
  152. "Descriptor $# is not registered in the selector!" % $fdi)
  153. doAssert(pkey.events * maskEvents == {})
  154. if pkey.events != events:
  155. var epv = EpollEvent(events: EPOLLRDHUP)
  156. epv.data.u64 = fdi.uint
  157. if Event.Read in events: epv.events = epv.events or EPOLLIN
  158. if Event.Write in events: epv.events = epv.events or EPOLLOUT
  159. if pkey.events == {}:
  160. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  161. raiseIOSelectorsError(osLastError())
  162. inc(s.count)
  163. else:
  164. if events != {}:
  165. if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) != 0:
  166. raiseIOSelectorsError(osLastError())
  167. else:
  168. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  169. raiseIOSelectorsError(osLastError())
  170. dec(s.count)
  171. pkey.events = events
  172. proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
  173. let fdi = int(fd)
  174. s.checkFd(fdi)
  175. var pkey = addr(s.fds[fdi])
  176. doAssert(pkey.ident != InvalidIdent,
  177. "Descriptor $# is not registered in the selector!" % $fdi)
  178. if pkey.events != {}:
  179. when not defined(android):
  180. if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events:
  181. var epv = EpollEvent()
  182. # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc.
  183. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  184. raiseIOSelectorsError(osLastError())
  185. dec(s.count)
  186. elif Event.Timer in pkey.events:
  187. if Event.Finished notin pkey.events:
  188. var epv = EpollEvent()
  189. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  190. raiseIOSelectorsError(osLastError())
  191. dec(s.count)
  192. if posix.close(cint(fdi)) != 0:
  193. raiseIOSelectorsError(osLastError())
  194. elif Event.Signal in pkey.events:
  195. var epv = EpollEvent()
  196. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  197. raiseIOSelectorsError(osLastError())
  198. var nmask, omask: Sigset
  199. discard sigemptyset(nmask)
  200. discard sigemptyset(omask)
  201. discard sigaddset(nmask, cint(s.fds[fdi].param))
  202. unblockSignals(nmask, omask)
  203. dec(s.count)
  204. if posix.close(cint(fdi)) != 0:
  205. raiseIOSelectorsError(osLastError())
  206. elif Event.Process in pkey.events:
  207. if Event.Finished notin pkey.events:
  208. var epv = EpollEvent()
  209. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  210. raiseIOSelectorsError(osLastError())
  211. var nmask, omask: Sigset
  212. discard sigemptyset(nmask)
  213. discard sigemptyset(omask)
  214. discard sigaddset(nmask, SIGCHLD)
  215. unblockSignals(nmask, omask)
  216. dec(s.count)
  217. if posix.close(cint(fdi)) != 0:
  218. raiseIOSelectorsError(osLastError())
  219. else:
  220. if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events:
  221. var epv = EpollEvent()
  222. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  223. raiseIOSelectorsError(osLastError())
  224. dec(s.count)
  225. elif Event.Timer in pkey.events:
  226. if Event.Finished notin pkey.events:
  227. var epv = EpollEvent()
  228. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  229. raiseIOSelectorsError(osLastError())
  230. dec(s.count)
  231. if posix.close(cint(fdi)) != 0:
  232. raiseIOSelectorsError(osLastError())
  233. clearKey(pkey)
  234. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  235. let fdi = int(ev.efd)
  236. s.checkFd(fdi)
  237. var pkey = addr(s.fds[fdi])
  238. doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
  239. doAssert(Event.User in pkey.events)
  240. var epv = EpollEvent()
  241. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  242. raiseIOSelectorsError(osLastError())
  243. dec(s.count)
  244. clearKey(pkey)
  245. proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
  246. data: T): int {.discardable.} =
  247. var
  248. newTs: Itimerspec
  249. oldTs: Itimerspec
  250. let fdi = timerfd_create(CLOCK_MONOTONIC, O_CLOEXEC or O_NONBLOCK).int
  251. if fdi == -1:
  252. raiseIOSelectorsError(osLastError())
  253. s.checkFd(fdi)
  254. doAssert(s.fds[fdi].ident == InvalidIdent)
  255. var events = {Event.Timer}
  256. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  257. epv.data.u64 = fdi.uint
  258. if oneshot:
  259. newTs.it_interval.tv_sec = posix.Time(0)
  260. newTs.it_interval.tv_nsec = 0
  261. newTs.it_value.tv_sec = posix.Time(timeout div 1_000)
  262. newTs.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000
  263. incl(events, Event.Oneshot)
  264. epv.events = epv.events or EPOLLONESHOT
  265. else:
  266. newTs.it_interval.tv_sec = posix.Time(timeout div 1000)
  267. newTs.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000
  268. newTs.it_value.tv_sec = newTs.it_interval.tv_sec
  269. newTs.it_value.tv_nsec = newTs.it_interval.tv_nsec
  270. if timerfd_settime(fdi.cint, cint(0), newTs, oldTs) != 0:
  271. raiseIOSelectorsError(osLastError())
  272. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  273. raiseIOSelectorsError(osLastError())
  274. s.setKey(fdi, events, 0, data)
  275. inc(s.count)
  276. result = fdi
  277. when not defined(android):
  278. proc registerSignal*[T](s: Selector[T], signal: int,
  279. data: T): int {.discardable.} =
  280. var
  281. nmask: Sigset
  282. omask: Sigset
  283. discard sigemptyset(nmask)
  284. discard sigemptyset(omask)
  285. discard sigaddset(nmask, cint(signal))
  286. blockSignals(nmask, omask)
  287. let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int
  288. if fdi == -1:
  289. raiseIOSelectorsError(osLastError())
  290. s.checkFd(fdi)
  291. doAssert(s.fds[fdi].ident == InvalidIdent)
  292. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  293. epv.data.u64 = fdi.uint
  294. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  295. raiseIOSelectorsError(osLastError())
  296. s.setKey(fdi, {Event.Signal}, signal, data)
  297. inc(s.count)
  298. result = fdi
  299. proc registerProcess*[T](s: Selector, pid: int,
  300. data: T): int {.discardable.} =
  301. var
  302. nmask: Sigset
  303. omask: Sigset
  304. discard sigemptyset(nmask)
  305. discard sigemptyset(omask)
  306. discard sigaddset(nmask, posix.SIGCHLD)
  307. blockSignals(nmask, omask)
  308. let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int
  309. if fdi == -1:
  310. raiseIOSelectorsError(osLastError())
  311. s.checkFd(fdi)
  312. doAssert(s.fds[fdi].ident == InvalidIdent)
  313. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  314. epv.data.u64 = fdi.uint
  315. epv.events = EPOLLIN or EPOLLRDHUP
  316. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  317. raiseIOSelectorsError(osLastError())
  318. s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data)
  319. inc(s.count)
  320. result = fdi
  321. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  322. let fdi = int(ev.efd)
  323. doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
  324. s.setKey(fdi, {Event.User}, 0, data)
  325. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  326. epv.data.u64 = ev.efd.uint
  327. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0:
  328. raiseIOSelectorsError(osLastError())
  329. inc(s.count)
  330. proc selectInto*[T](s: Selector[T], timeout: int,
  331. results: var openArray[ReadyKey]): int =
  332. var
  333. resTable: array[MAX_EPOLL_EVENTS, EpollEvent]
  334. maxres = MAX_EPOLL_EVENTS
  335. i, k: int
  336. if maxres > len(results):
  337. maxres = len(results)
  338. verifySelectParams(timeout)
  339. let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint,
  340. timeout.cint)
  341. if count < 0:
  342. result = 0
  343. let err = osLastError()
  344. if cint(err) != EINTR:
  345. raiseIOSelectorsError(err)
  346. elif count == 0:
  347. result = 0
  348. else:
  349. i = 0
  350. k = 0
  351. while i < count:
  352. let fdi = int(resTable[i].data.u64)
  353. let pevents = resTable[i].events
  354. var pkey = addr(s.fds[fdi])
  355. doAssert(pkey.ident != InvalidIdent)
  356. var rkey = ReadyKey(fd: fdi, events: {})
  357. if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
  358. if (pevents and EPOLLHUP) != 0:
  359. rkey.errorCode = OSErrorCode ECONNRESET
  360. else:
  361. # Try reading SO_ERROR from fd.
  362. var error: cint
  363. var size = SockLen sizeof(error)
  364. if getsockopt(SocketHandle fdi, SOL_SOCKET, SO_ERROR, addr(error),
  365. addr(size)) == 0'i32:
  366. rkey.errorCode = OSErrorCode error
  367. rkey.events.incl(Event.Error)
  368. if (pevents and EPOLLOUT) != 0:
  369. rkey.events.incl(Event.Write)
  370. when not defined(android):
  371. if (pevents and EPOLLIN) != 0:
  372. if Event.Read in pkey.events:
  373. rkey.events.incl(Event.Read)
  374. elif Event.Timer in pkey.events:
  375. var data: uint64 = 0
  376. if posix.read(cint(fdi), addr data,
  377. sizeof(uint64)) != sizeof(uint64):
  378. raiseIOSelectorsError(osLastError())
  379. rkey.events.incl(Event.Timer)
  380. elif Event.Signal in pkey.events:
  381. var data = SignalFdInfo()
  382. if posix.read(cint(fdi), addr data,
  383. sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
  384. raiseIOSelectorsError(osLastError())
  385. rkey.events.incl(Event.Signal)
  386. elif Event.Process in pkey.events:
  387. var data = SignalFdInfo()
  388. if posix.read(cint(fdi), addr data,
  389. sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
  390. raiseIOSelectorsError(osLastError())
  391. if cast[int](data.ssi_pid) == pkey.param:
  392. rkey.events.incl(Event.Process)
  393. else:
  394. inc(i)
  395. continue
  396. elif Event.User in pkey.events:
  397. var data: uint64 = 0
  398. if posix.read(cint(fdi), addr data,
  399. sizeof(uint64)) != sizeof(uint64):
  400. let err = osLastError()
  401. if err == OSErrorCode(EAGAIN):
  402. inc(i)
  403. continue
  404. else:
  405. raiseIOSelectorsError(err)
  406. rkey.events.incl(Event.User)
  407. else:
  408. if (pevents and EPOLLIN) != 0:
  409. if Event.Read in pkey.events:
  410. rkey.events.incl(Event.Read)
  411. elif Event.Timer in pkey.events:
  412. var data: uint64 = 0
  413. if posix.read(cint(fdi), addr data,
  414. sizeof(uint64)) != sizeof(uint64):
  415. raiseIOSelectorsError(osLastError())
  416. rkey.events.incl(Event.Timer)
  417. elif Event.User in pkey.events:
  418. var data: uint64 = 0
  419. if posix.read(cint(fdi), addr data,
  420. sizeof(uint64)) != sizeof(uint64):
  421. let err = osLastError()
  422. if err == OSErrorCode(EAGAIN):
  423. inc(i)
  424. continue
  425. else:
  426. raiseIOSelectorsError(err)
  427. rkey.events.incl(Event.User)
  428. if Event.Oneshot in pkey.events:
  429. var epv = EpollEvent()
  430. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0:
  431. raiseIOSelectorsError(osLastError())
  432. # we will not clear key until it will be unregistered, so
  433. # application can obtain data, but we will decrease counter,
  434. # because epoll is empty.
  435. dec(s.count)
  436. # we are marking key with `Finished` event, to avoid double decrease.
  437. pkey.events.incl(Event.Finished)
  438. results[k] = rkey
  439. inc(k)
  440. inc(i)
  441. result = k
  442. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  443. result = newSeq[ReadyKey](MAX_EPOLL_EVENTS)
  444. let count = selectInto(s, timeout, result)
  445. result.setLen(count)
  446. template isEmpty*[T](s: Selector[T]): bool =
  447. (s.count == 0)
  448. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  449. return s.fds[fd.int].ident != InvalidIdent
  450. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  451. let fdi = int(fd)
  452. s.checkFd(fdi)
  453. if fdi in s:
  454. result = s.fds[fdi].data
  455. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  456. let fdi = int(fd)
  457. s.checkFd(fdi)
  458. if fdi in s:
  459. s.fds[fdi].data = data
  460. result = true
  461. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  462. body: untyped) =
  463. mixin checkFd
  464. let fdi = int(fd)
  465. s.checkFd(fdi)
  466. if fdi in s:
  467. var value = addr(s.fds[fdi].data)
  468. body
  469. template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
  470. body2: untyped) =
  471. mixin checkFd
  472. let fdi = int(fd)
  473. s.checkFd(fdi)
  474. if fdi in s:
  475. var value = addr(s.fds[fdi].data)
  476. body1
  477. else:
  478. body2
  479. proc getFd*[T](s: Selector[T]): int =
  480. return s.epollFD.int