ioselectors_epoll.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements Linux epoll().
  10. import posix, times, epoll
  11. # Maximum number of events that can be returned
  12. const MAX_EPOLL_EVENTS = 64
  13. when not defined(android):
  14. type
  15. SignalFdInfo* {.importc: "struct signalfd_siginfo",
  16. header: "<sys/signalfd.h>", pure, final.} = object
  17. ssi_signo*: uint32
  18. ssi_errno*: int32
  19. ssi_code*: int32
  20. ssi_pid*: uint32
  21. ssi_uid*: uint32
  22. ssi_fd*: int32
  23. ssi_tid*: uint32
  24. ssi_band*: uint32
  25. ssi_overrun*: uint32
  26. ssi_trapno*: uint32
  27. ssi_status*: int32
  28. ssi_int*: int32
  29. ssi_ptr*: uint64
  30. ssi_utime*: uint64
  31. ssi_stime*: uint64
  32. ssi_addr*: uint64
  33. pad* {.importc: "__pad".}: array[0..47, uint8]
  34. proc timerfd_create(clock_id: ClockId, flags: cint): cint
  35. {.cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".}
  36. proc timerfd_settime(ufd: cint, flags: cint,
  37. utmr: var Itimerspec, otmr: var Itimerspec): cint
  38. {.cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".}
  39. proc eventfd(count: cuint, flags: cint): cint
  40. {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
  41. when not defined(android):
  42. proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint
  43. {.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
  44. when hasThreadSupport:
  45. type
  46. SelectorImpl[T] = object
  47. epollFD: cint
  48. maxFD: int
  49. numFD: int
  50. fds: ptr SharedArray[SelectorKey[T]]
  51. count*: int
  52. Selector*[T] = ptr SelectorImpl[T]
  53. else:
  54. type
  55. SelectorImpl[T] = object
  56. epollFD: cint
  57. maxFD: int
  58. numFD: int
  59. fds: seq[SelectorKey[T]]
  60. count*: int
  61. Selector*[T] = ref SelectorImpl[T]
  62. type
  63. SelectEventImpl = object
  64. efd: cint
  65. SelectEvent* = ptr SelectEventImpl
  66. proc newSelector*[T](): Selector[T] =
  67. # Retrieve the maximum fd count (for current OS) via getrlimit()
  68. var a = RLimit()
  69. if getrlimit(posix.RLIMIT_NOFILE, a) != 0:
  70. raiseOSError(osLastError())
  71. var maxFD = int(a.rlim_max)
  72. doAssert(maxFD > 0)
  73. # Start with a reasonable size, checkFd() will grow this on demand
  74. const numFD = 1024
  75. var epollFD = epoll_create1(O_CLOEXEC)
  76. if epollFD < 0:
  77. raiseOSError(osLastError())
  78. when hasThreadSupport:
  79. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  80. result.epollFD = epollFD
  81. result.maxFD = maxFD
  82. result.numFD = numFD
  83. result.fds = allocSharedArray[SelectorKey[T]](numFD)
  84. else:
  85. result = Selector[T]()
  86. result.epollFD = epollFD
  87. result.maxFD = maxFD
  88. result.numFD = numFD
  89. result.fds = newSeq[SelectorKey[T]](numFD)
  90. for i in 0 ..< numFD:
  91. result.fds[i].ident = InvalidIdent
  92. proc close*[T](s: Selector[T]) =
  93. let res = posix.close(s.epollFD)
  94. when hasThreadSupport:
  95. deallocSharedArray(s.fds)
  96. deallocShared(cast[pointer](s))
  97. if res != 0:
  98. raiseIOSelectorsError(osLastError())
  99. proc newSelectEvent*(): SelectEvent =
  100. let fdci = eventfd(0, O_CLOEXEC or O_NONBLOCK)
  101. if fdci == -1:
  102. raiseIOSelectorsError(osLastError())
  103. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  104. result.efd = fdci
  105. proc trigger*(ev: SelectEvent) =
  106. var data: uint64 = 1
  107. if posix.write(ev.efd, addr data, sizeof(uint64)) == -1:
  108. raiseIOSelectorsError(osLastError())
  109. proc close*(ev: SelectEvent) =
  110. let res = posix.close(ev.efd)
  111. deallocShared(cast[pointer](ev))
  112. if res != 0:
  113. raiseIOSelectorsError(osLastError())
  114. template checkFd(s, f) =
  115. # TODO: I don't see how this can ever happen. You won't be able to create an
  116. # FD if there is too many. -- DP
  117. if f >= s.maxFD:
  118. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  119. if f >= s.numFD:
  120. var numFD = s.numFD
  121. while numFD <= f: numFD *= 2
  122. when hasThreadSupport:
  123. s.fds = reallocSharedArray(s.fds, numFD)
  124. else:
  125. s.fds.setLen(numFD)
  126. for i in s.numFD ..< numFD:
  127. s.fds[i].ident = InvalidIdent
  128. s.numFD = numFD
  129. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  130. events: set[Event], data: T) =
  131. let fdi = int(fd)
  132. s.checkFd(fdi)
  133. doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor $# already registered" % $fdi)
  134. s.setKey(fdi, events, 0, data)
  135. if events != {}:
  136. var epv = EpollEvent(events: EPOLLRDHUP)
  137. epv.data.u64 = fdi.uint
  138. if Event.Read in events: epv.events = epv.events or EPOLLIN
  139. if Event.Write in events: epv.events = epv.events or EPOLLOUT
  140. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  141. raiseIOSelectorsError(osLastError())
  142. inc(s.count)
  143. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) =
  144. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  145. Event.User, Event.Oneshot, Event.Error}
  146. let fdi = int(fd)
  147. s.checkFd(fdi)
  148. var pkey = addr(s.fds[fdi])
  149. doAssert(pkey.ident != InvalidIdent,
  150. "Descriptor $# is not registered in the selector!" % $fdi)
  151. doAssert(pkey.events * maskEvents == {})
  152. if pkey.events != events:
  153. var epv = EpollEvent(events: EPOLLRDHUP)
  154. epv.data.u64 = fdi.uint
  155. if Event.Read in events: epv.events = epv.events or EPOLLIN
  156. if Event.Write in events: epv.events = epv.events or EPOLLOUT
  157. if pkey.events == {}:
  158. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  159. raiseIOSelectorsError(osLastError())
  160. inc(s.count)
  161. else:
  162. if events != {}:
  163. if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) != 0:
  164. raiseIOSelectorsError(osLastError())
  165. else:
  166. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  167. raiseIOSelectorsError(osLastError())
  168. dec(s.count)
  169. pkey.events = events
  170. proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
  171. let fdi = int(fd)
  172. s.checkFd(fdi)
  173. var pkey = addr(s.fds[fdi])
  174. doAssert(pkey.ident != InvalidIdent,
  175. "Descriptor $# is not registered in the selector!" % $fdi)
  176. if pkey.events != {}:
  177. when not defined(android):
  178. if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events:
  179. var epv = EpollEvent()
  180. # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc.
  181. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  182. raiseIOSelectorsError(osLastError())
  183. dec(s.count)
  184. elif Event.Timer in pkey.events:
  185. if Event.Finished notin pkey.events:
  186. var epv = EpollEvent()
  187. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  188. raiseIOSelectorsError(osLastError())
  189. dec(s.count)
  190. if posix.close(cint(fdi)) != 0:
  191. raiseIOSelectorsError(osLastError())
  192. elif Event.Signal in pkey.events:
  193. var epv = EpollEvent()
  194. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  195. raiseIOSelectorsError(osLastError())
  196. var nmask, omask: Sigset
  197. discard sigemptyset(nmask)
  198. discard sigemptyset(omask)
  199. discard sigaddset(nmask, cint(s.fds[fdi].param))
  200. unblockSignals(nmask, omask)
  201. dec(s.count)
  202. if posix.close(cint(fdi)) != 0:
  203. raiseIOSelectorsError(osLastError())
  204. elif Event.Process in pkey.events:
  205. if Event.Finished notin pkey.events:
  206. var epv = EpollEvent()
  207. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  208. raiseIOSelectorsError(osLastError())
  209. var nmask, omask: Sigset
  210. discard sigemptyset(nmask)
  211. discard sigemptyset(omask)
  212. discard sigaddset(nmask, SIGCHLD)
  213. unblockSignals(nmask, omask)
  214. dec(s.count)
  215. if posix.close(cint(fdi)) != 0:
  216. raiseIOSelectorsError(osLastError())
  217. else:
  218. if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events:
  219. var epv = EpollEvent()
  220. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  221. raiseIOSelectorsError(osLastError())
  222. dec(s.count)
  223. elif Event.Timer in pkey.events:
  224. if Event.Finished notin pkey.events:
  225. var epv = EpollEvent()
  226. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  227. raiseIOSelectorsError(osLastError())
  228. dec(s.count)
  229. if posix.close(cint(fdi)) != 0:
  230. raiseIOSelectorsError(osLastError())
  231. clearKey(pkey)
  232. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  233. let fdi = int(ev.efd)
  234. s.checkFd(fdi)
  235. var pkey = addr(s.fds[fdi])
  236. doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
  237. doAssert(Event.User in pkey.events)
  238. var epv = EpollEvent()
  239. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  240. raiseIOSelectorsError(osLastError())
  241. dec(s.count)
  242. clearKey(pkey)
  243. proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
  244. data: T): int {.discardable.} =
  245. var
  246. newTs: Itimerspec
  247. oldTs: Itimerspec
  248. let fdi = timerfd_create(CLOCK_MONOTONIC, O_CLOEXEC or O_NONBLOCK).int
  249. if fdi == -1:
  250. raiseIOSelectorsError(osLastError())
  251. s.checkFd(fdi)
  252. doAssert(s.fds[fdi].ident == InvalidIdent)
  253. var events = {Event.Timer}
  254. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  255. epv.data.u64 = fdi.uint
  256. if oneshot:
  257. newTs.it_interval.tv_sec = posix.Time(0)
  258. newTs.it_interval.tv_nsec = 0
  259. newTs.it_value.tv_sec = posix.Time(timeout div 1_000)
  260. newTs.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000
  261. incl(events, Event.Oneshot)
  262. epv.events = epv.events or EPOLLONESHOT
  263. else:
  264. newTs.it_interval.tv_sec = posix.Time(timeout div 1000)
  265. newTs.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000
  266. newTs.it_value.tv_sec = newTs.it_interval.tv_sec
  267. newTs.it_value.tv_nsec = newTs.it_interval.tv_nsec
  268. if timerfd_settime(fdi.cint, cint(0), newTs, oldTs) != 0:
  269. raiseIOSelectorsError(osLastError())
  270. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  271. raiseIOSelectorsError(osLastError())
  272. s.setKey(fdi, events, 0, data)
  273. inc(s.count)
  274. result = fdi
  275. when not defined(android):
  276. proc registerSignal*[T](s: Selector[T], signal: int,
  277. data: T): int {.discardable.} =
  278. var
  279. nmask: Sigset
  280. omask: Sigset
  281. discard sigemptyset(nmask)
  282. discard sigemptyset(omask)
  283. discard sigaddset(nmask, cint(signal))
  284. blockSignals(nmask, omask)
  285. let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int
  286. if fdi == -1:
  287. raiseIOSelectorsError(osLastError())
  288. s.checkFd(fdi)
  289. doAssert(s.fds[fdi].ident == InvalidIdent)
  290. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  291. epv.data.u64 = fdi.uint
  292. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  293. raiseIOSelectorsError(osLastError())
  294. s.setKey(fdi, {Event.Signal}, signal, data)
  295. inc(s.count)
  296. result = fdi
  297. proc registerProcess*[T](s: Selector, pid: int,
  298. data: T): int {.discardable.} =
  299. var
  300. nmask: Sigset
  301. omask: Sigset
  302. discard sigemptyset(nmask)
  303. discard sigemptyset(omask)
  304. discard sigaddset(nmask, posix.SIGCHLD)
  305. blockSignals(nmask, omask)
  306. let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int
  307. if fdi == -1:
  308. raiseIOSelectorsError(osLastError())
  309. s.checkFd(fdi)
  310. doAssert(s.fds[fdi].ident == InvalidIdent)
  311. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  312. epv.data.u64 = fdi.uint
  313. epv.events = EPOLLIN or EPOLLRDHUP
  314. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  315. raiseIOSelectorsError(osLastError())
  316. s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data)
  317. inc(s.count)
  318. result = fdi
  319. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  320. let fdi = int(ev.efd)
  321. doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
  322. s.setKey(fdi, {Event.User}, 0, data)
  323. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  324. epv.data.u64 = ev.efd.uint
  325. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0:
  326. raiseIOSelectorsError(osLastError())
  327. inc(s.count)
  328. proc selectInto*[T](s: Selector[T], timeout: int,
  329. results: var openArray[ReadyKey]): int =
  330. var
  331. resTable: array[MAX_EPOLL_EVENTS, EpollEvent]
  332. maxres = MAX_EPOLL_EVENTS
  333. i, k: int
  334. if maxres > len(results):
  335. maxres = len(results)
  336. verifySelectParams(timeout)
  337. let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint,
  338. timeout.cint)
  339. if count < 0:
  340. result = 0
  341. let err = osLastError()
  342. if cint(err) != EINTR:
  343. raiseIOSelectorsError(err)
  344. elif count == 0:
  345. result = 0
  346. else:
  347. i = 0
  348. k = 0
  349. while i < count:
  350. let fdi = int(resTable[i].data.u64)
  351. let pevents = resTable[i].events
  352. var pkey = addr(s.fds[fdi])
  353. doAssert(pkey.ident != InvalidIdent)
  354. var rkey = ReadyKey(fd: fdi, events: {})
  355. if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
  356. if (pevents and EPOLLHUP) != 0:
  357. rkey.errorCode = OSErrorCode ECONNRESET
  358. else:
  359. # Try reading SO_ERROR from fd.
  360. var error: cint
  361. var size = SockLen sizeof(error)
  362. if getsockopt(SocketHandle fdi, SOL_SOCKET, SO_ERROR, addr(error),
  363. addr(size)) == 0'i32:
  364. rkey.errorCode = OSErrorCode error
  365. rkey.events.incl(Event.Error)
  366. if (pevents and EPOLLOUT) != 0:
  367. rkey.events.incl(Event.Write)
  368. when not defined(android):
  369. if (pevents and EPOLLIN) != 0:
  370. if Event.Read in pkey.events:
  371. rkey.events.incl(Event.Read)
  372. elif Event.Timer in pkey.events:
  373. var data: uint64 = 0
  374. if posix.read(cint(fdi), addr data,
  375. sizeof(uint64)) != sizeof(uint64):
  376. raiseIOSelectorsError(osLastError())
  377. rkey.events.incl(Event.Timer)
  378. elif Event.Signal in pkey.events:
  379. var data = SignalFdInfo()
  380. if posix.read(cint(fdi), addr data,
  381. sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
  382. raiseIOSelectorsError(osLastError())
  383. rkey.events.incl(Event.Signal)
  384. elif Event.Process in pkey.events:
  385. var data = SignalFdInfo()
  386. if posix.read(cint(fdi), addr data,
  387. sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
  388. raiseIOSelectorsError(osLastError())
  389. if cast[int](data.ssi_pid) == pkey.param:
  390. rkey.events.incl(Event.Process)
  391. else:
  392. inc(i)
  393. continue
  394. elif Event.User in pkey.events:
  395. var data: uint64 = 0
  396. if posix.read(cint(fdi), addr data,
  397. sizeof(uint64)) != sizeof(uint64):
  398. let err = osLastError()
  399. if err == OSErrorCode(EAGAIN):
  400. inc(i)
  401. continue
  402. else:
  403. raiseIOSelectorsError(err)
  404. rkey.events.incl(Event.User)
  405. else:
  406. if (pevents and EPOLLIN) != 0:
  407. if Event.Read in pkey.events:
  408. rkey.events.incl(Event.Read)
  409. elif Event.Timer in pkey.events:
  410. var data: uint64 = 0
  411. if posix.read(cint(fdi), addr data,
  412. sizeof(uint64)) != sizeof(uint64):
  413. raiseIOSelectorsError(osLastError())
  414. rkey.events.incl(Event.Timer)
  415. elif Event.User in pkey.events:
  416. var data: uint64 = 0
  417. if posix.read(cint(fdi), addr data,
  418. sizeof(uint64)) != sizeof(uint64):
  419. let err = osLastError()
  420. if err == OSErrorCode(EAGAIN):
  421. inc(i)
  422. continue
  423. else:
  424. raiseIOSelectorsError(err)
  425. rkey.events.incl(Event.User)
  426. if Event.Oneshot in pkey.events:
  427. var epv = EpollEvent()
  428. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0:
  429. raiseIOSelectorsError(osLastError())
  430. # we will not clear key until it will be unregistered, so
  431. # application can obtain data, but we will decrease counter,
  432. # because epoll is empty.
  433. dec(s.count)
  434. # we are marking key with `Finished` event, to avoid double decrease.
  435. pkey.events.incl(Event.Finished)
  436. results[k] = rkey
  437. inc(k)
  438. inc(i)
  439. result = k
  440. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  441. result = newSeq[ReadyKey](MAX_EPOLL_EVENTS)
  442. let count = selectInto(s, timeout, result)
  443. result.setLen(count)
  444. template isEmpty*[T](s: Selector[T]): bool =
  445. (s.count == 0)
  446. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  447. return s.fds[fd.int].ident != InvalidIdent
  448. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  449. let fdi = int(fd)
  450. s.checkFd(fdi)
  451. if fdi in s:
  452. result = s.fds[fdi].data
  453. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  454. let fdi = int(fd)
  455. s.checkFd(fdi)
  456. if fdi in s:
  457. s.fds[fdi].data = data
  458. result = true
  459. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  460. body: untyped) =
  461. mixin checkFd
  462. let fdi = int(fd)
  463. s.checkFd(fdi)
  464. if fdi in s:
  465. var value = addr(s.fds[fdi].data)
  466. body
  467. template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
  468. body2: untyped) =
  469. mixin checkFd
  470. let fdi = int(fd)
  471. s.checkFd(fdi)
  472. if fdi in s:
  473. var value = addr(s.fds[fdi].data)
  474. body1
  475. else:
  476. body2
  477. proc getFd*[T](s: Selector[T]): int =
  478. return s.epollFd.int