ioselectors_epoll.nim 17 KB


  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements Linux epoll().
  10. import posix, times, epoll
  11. # Maximum number of events that can be returned
  12. const MAX_EPOLL_EVENTS = 64
  13. when not defined(android):
  14. type
  15. SignalFdInfo* {.importc: "struct signalfd_siginfo",
  16. header: "<sys/signalfd.h>", pure, final.} = object
  17. ssi_signo*: uint32
  18. ssi_errno*: int32
  19. ssi_code*: int32
  20. ssi_pid*: uint32
  21. ssi_uid*: uint32
  22. ssi_fd*: int32
  23. ssi_tid*: uint32
  24. ssi_band*: uint32
  25. ssi_overrun*: uint32
  26. ssi_trapno*: uint32
  27. ssi_status*: int32
  28. ssi_int*: int32
  29. ssi_ptr*: uint64
  30. ssi_utime*: uint64
  31. ssi_stime*: uint64
  32. ssi_addr*: uint64
  33. pad* {.importc: "__pad".}: array[0..47, uint8]
  34. proc timerfd_create(clock_id: ClockId, flags: cint): cint
  35. {.cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".}
  36. proc timerfd_settime(ufd: cint, flags: cint,
  37. utmr: var Itimerspec, otmr: var Itimerspec): cint
  38. {.cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".}
  39. proc eventfd(count: cuint, flags: cint): cint
  40. {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
  41. when not defined(android):
  42. proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint
  43. {.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
  44. when hasThreadSupport:
  45. type
  46. SelectorImpl[T] = object
  47. epollFD: cint
  48. maxFD: int
  49. fds: ptr SharedArray[SelectorKey[T]]
  50. count: int
  51. Selector*[T] = ptr SelectorImpl[T]
  52. else:
  53. type
  54. SelectorImpl[T] = object
  55. epollFD: cint
  56. maxFD: int
  57. fds: seq[SelectorKey[T]]
  58. count: int
  59. Selector*[T] = ref SelectorImpl[T]
  60. type
  61. SelectEventImpl = object
  62. efd: cint
  63. SelectEvent* = ptr SelectEventImpl
  64. proc newSelector*[T](): Selector[T] =
  65. # Retrieve the maximum fd count (for current OS) via getrlimit()
  66. var a = RLimit()
  67. if getrlimit(posix.RLIMIT_NOFILE, a) != 0:
  68. raiseOsError(osLastError())
  69. var maxFD = int(a.rlim_max)
  70. doAssert(maxFD > 0)
  71. var epollFD = epoll_create(MAX_EPOLL_EVENTS)
  72. if epollFD < 0:
  73. raiseOsError(osLastError())
  74. when hasThreadSupport:
  75. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  76. result.epollFD = epollFD
  77. result.maxFD = maxFD
  78. result.fds = allocSharedArray[SelectorKey[T]](maxFD)
  79. else:
  80. result = Selector[T]()
  81. result.epollFD = epollFD
  82. result.maxFD = maxFD
  83. result.fds = newSeq[SelectorKey[T]](maxFD)
  84. for i in 0 ..< maxFD:
  85. result.fds[i].ident = InvalidIdent
  86. proc close*[T](s: Selector[T]) =
  87. let res = posix.close(s.epollFD)
  88. when hasThreadSupport:
  89. deallocSharedArray(s.fds)
  90. deallocShared(cast[pointer](s))
  91. if res != 0:
  92. raiseIOSelectorsError(osLastError())
  93. proc newSelectEvent*(): SelectEvent =
  94. let fdci = eventfd(0, 0)
  95. if fdci == -1:
  96. raiseIOSelectorsError(osLastError())
  97. setNonBlocking(fdci)
  98. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  99. result.efd = fdci
  100. proc trigger*(ev: SelectEvent) =
  101. var data: uint64 = 1
  102. if posix.write(ev.efd, addr data, sizeof(uint64)) == -1:
  103. raiseIOSelectorsError(osLastError())
  104. proc close*(ev: SelectEvent) =
  105. let res = posix.close(ev.efd)
  106. deallocShared(cast[pointer](ev))
  107. if res != 0:
  108. raiseIOSelectorsError(osLastError())
  109. template checkFd(s, f) =
  110. # TODO: I don't see how this can ever happen. You won't be able to create an
  111. # FD if there is too many. -- DP
  112. if f >= s.maxFD:
  113. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  114. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  115. events: set[Event], data: T) =
  116. let fdi = int(fd)
  117. s.checkFd(fdi)
  118. doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor $# already registered" % $fdi)
  119. s.setKey(fdi, events, 0, data)
  120. if events != {}:
  121. var epv = EpollEvent(events: EPOLLRDHUP)
  122. epv.data.u64 = fdi.uint
  123. if Event.Read in events: epv.events = epv.events or EPOLLIN
  124. if Event.Write in events: epv.events = epv.events or EPOLLOUT
  125. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  126. raiseIOSelectorsError(osLastError())
  127. inc(s.count)
  128. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) =
  129. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  130. Event.User, Event.Oneshot, Event.Error}
  131. let fdi = int(fd)
  132. s.checkFd(fdi)
  133. var pkey = addr(s.fds[fdi])
  134. doAssert(pkey.ident != InvalidIdent,
  135. "Descriptor $# is not registered in the selector!" % $fdi)
  136. doAssert(pkey.events * maskEvents == {})
  137. if pkey.events != events:
  138. var epv = EpollEvent(events: EPOLLRDHUP)
  139. epv.data.u64 = fdi.uint
  140. if Event.Read in events: epv.events = epv.events or EPOLLIN
  141. if Event.Write in events: epv.events = epv.events or EPOLLOUT
  142. if pkey.events == {}:
  143. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  144. raiseIOSelectorsError(osLastError())
  145. inc(s.count)
  146. else:
  147. if events != {}:
  148. if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) != 0:
  149. raiseIOSelectorsError(osLastError())
  150. else:
  151. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  152. raiseIOSelectorsError(osLastError())
  153. dec(s.count)
  154. pkey.events = events
  155. proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
  156. let fdi = int(fd)
  157. s.checkFd(fdi)
  158. var pkey = addr(s.fds[fdi])
  159. doAssert(pkey.ident != InvalidIdent,
  160. "Descriptor $# is not registered in the selector!" % $fdi)
  161. if pkey.events != {}:
  162. when not defined(android):
  163. if pkey.events * {Event.Read, Event.Write} != {}:
  164. var epv = EpollEvent()
  165. # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc.
  166. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  167. raiseIOSelectorsError(osLastError())
  168. dec(s.count)
  169. elif Event.Timer in pkey.events:
  170. if Event.Finished notin pkey.events:
  171. var epv = EpollEvent()
  172. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  173. raiseIOSelectorsError(osLastError())
  174. dec(s.count)
  175. if posix.close(cint(fdi)) != 0:
  176. raiseIOSelectorsError(osLastError())
  177. elif Event.Signal in pkey.events:
  178. var epv = EpollEvent()
  179. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  180. raiseIOSelectorsError(osLastError())
  181. var nmask, omask: Sigset
  182. discard sigemptyset(nmask)
  183. discard sigemptyset(omask)
  184. discard sigaddset(nmask, cint(s.fds[fdi].param))
  185. unblockSignals(nmask, omask)
  186. dec(s.count)
  187. if posix.close(cint(fdi)) != 0:
  188. raiseIOSelectorsError(osLastError())
  189. elif Event.Process in pkey.events:
  190. if Event.Finished notin pkey.events:
  191. var epv = EpollEvent()
  192. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  193. raiseIOSelectorsError(osLastError())
  194. var nmask, omask: Sigset
  195. discard sigemptyset(nmask)
  196. discard sigemptyset(omask)
  197. discard sigaddset(nmask, SIGCHLD)
  198. unblockSignals(nmask, omask)
  199. dec(s.count)
  200. if posix.close(cint(fdi)) != 0:
  201. raiseIOSelectorsError(osLastError())
  202. else:
  203. if pkey.events * {Event.Read, Event.Write} != {}:
  204. var epv = EpollEvent()
  205. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  206. raiseIOSelectorsError(osLastError())
  207. dec(s.count)
  208. elif Event.Timer in pkey.events:
  209. if Event.Finished notin pkey.events:
  210. var epv = EpollEvent()
  211. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  212. raiseIOSelectorsError(osLastError())
  213. dec(s.count)
  214. if posix.close(cint(fdi)) != 0:
  215. raiseIOSelectorsError(osLastError())
  216. clearKey(pkey)
  217. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  218. let fdi = int(ev.efd)
  219. s.checkFd(fdi)
  220. var pkey = addr(s.fds[fdi])
  221. doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
  222. doAssert(Event.User in pkey.events)
  223. var epv = EpollEvent()
  224. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
  225. raiseIOSelectorsError(osLastError())
  226. dec(s.count)
  227. clearKey(pkey)
  228. proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
  229. data: T): int {.discardable.} =
  230. var
  231. new_ts: Itimerspec
  232. old_ts: Itimerspec
  233. let fdi = timerfd_create(CLOCK_MONOTONIC, 0).int
  234. if fdi == -1:
  235. raiseIOSelectorsError(osLastError())
  236. setNonBlocking(fdi.cint)
  237. s.checkFd(fdi)
  238. doAssert(s.fds[fdi].ident == InvalidIdent)
  239. var events = {Event.Timer}
  240. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  241. epv.data.u64 = fdi.uint
  242. if oneshot:
  243. new_ts.it_interval.tv_sec = posix.Time(0)
  244. new_ts.it_interval.tv_nsec = 0
  245. new_ts.it_value.tv_sec = posix.Time(timeout div 1_000)
  246. new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000
  247. incl(events, Event.Oneshot)
  248. epv.events = epv.events or EPOLLONESHOT
  249. else:
  250. new_ts.it_interval.tv_sec = posix.Time(timeout div 1000)
  251. new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000
  252. new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec
  253. new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec
  254. if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) != 0:
  255. raiseIOSelectorsError(osLastError())
  256. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  257. raiseIOSelectorsError(osLastError())
  258. s.setKey(fdi, events, 0, data)
  259. inc(s.count)
  260. result = fdi
  261. when not defined(android):
  262. proc registerSignal*[T](s: Selector[T], signal: int,
  263. data: T): int {.discardable.} =
  264. var
  265. nmask: Sigset
  266. omask: Sigset
  267. discard sigemptyset(nmask)
  268. discard sigemptyset(omask)
  269. discard sigaddset(nmask, cint(signal))
  270. blockSignals(nmask, omask)
  271. let fdi = signalfd(-1, nmask, 0).int
  272. if fdi == -1:
  273. raiseIOSelectorsError(osLastError())
  274. setNonBlocking(fdi.cint)
  275. s.checkFd(fdi)
  276. doAssert(s.fds[fdi].ident == InvalidIdent)
  277. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  278. epv.data.u64 = fdi.uint
  279. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  280. raiseIOSelectorsError(osLastError())
  281. s.setKey(fdi, {Event.Signal}, signal, data)
  282. inc(s.count)
  283. result = fdi
  284. proc registerProcess*[T](s: Selector, pid: int,
  285. data: T): int {.discardable.} =
  286. var
  287. nmask: Sigset
  288. omask: Sigset
  289. discard sigemptyset(nmask)
  290. discard sigemptyset(omask)
  291. discard sigaddset(nmask, posix.SIGCHLD)
  292. blockSignals(nmask, omask)
  293. let fdi = signalfd(-1, nmask, 0).int
  294. if fdi == -1:
  295. raiseIOSelectorsError(osLastError())
  296. setNonBlocking(fdi.cint)
  297. s.checkFd(fdi)
  298. doAssert(s.fds[fdi].ident == InvalidIdent)
  299. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  300. epv.data.u64 = fdi.uint
  301. epv.events = EPOLLIN or EPOLLRDHUP
  302. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
  303. raiseIOSelectorsError(osLastError())
  304. s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data)
  305. inc(s.count)
  306. result = fdi
  307. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  308. let fdi = int(ev.efd)
  309. doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
  310. s.setKey(fdi, {Event.User}, 0, data)
  311. var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
  312. epv.data.u64 = ev.efd.uint
  313. if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0:
  314. raiseIOSelectorsError(osLastError())
  315. inc(s.count)
  316. proc selectInto*[T](s: Selector[T], timeout: int,
  317. results: var openarray[ReadyKey]): int =
  318. var
  319. resTable: array[MAX_EPOLL_EVENTS, EpollEvent]
  320. maxres = MAX_EPOLL_EVENTS
  321. i, k: int
  322. if maxres > len(results):
  323. maxres = len(results)
  324. let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint,
  325. timeout.cint)
  326. if count < 0:
  327. result = 0
  328. let err = osLastError()
  329. if cint(err) != EINTR:
  330. raiseIOSelectorsError(err)
  331. elif count == 0:
  332. result = 0
  333. else:
  334. i = 0
  335. k = 0
  336. while i < count:
  337. let fdi = int(resTable[i].data.u64)
  338. let pevents = resTable[i].events
  339. var pkey = addr(s.fds[fdi])
  340. doAssert(pkey.ident != InvalidIdent)
  341. var rkey = ReadyKey(fd: fdi, events: {})
  342. if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
  343. if (pevents and EPOLLHUP) != 0:
  344. rkey.errorCode = ECONNRESET.OSErrorCode
  345. else:
  346. # Try reading SO_ERROR from fd.
  347. var error: cint
  348. var size = sizeof(error).SockLen
  349. if getsockopt(fdi.SocketHandle, SOL_SOCKET, SO_ERROR, addr(error),
  350. addr(size)) == 0'i32:
  351. rkey.errorCode = error.OSErrorCode
  352. rkey.events.incl(Event.Error)
  353. if (pevents and EPOLLOUT) != 0:
  354. rkey.events.incl(Event.Write)
  355. when not defined(android):
  356. if (pevents and EPOLLIN) != 0:
  357. if Event.Read in pkey.events:
  358. rkey.events.incl(Event.Read)
  359. elif Event.Timer in pkey.events:
  360. var data: uint64 = 0
  361. if posix.read(cint(fdi), addr data,
  362. sizeof(uint64)) != sizeof(uint64):
  363. raiseIOSelectorsError(osLastError())
  364. rkey.events.incl(Event.Timer)
  365. elif Event.Signal in pkey.events:
  366. var data = SignalFdInfo()
  367. if posix.read(cint(fdi), addr data,
  368. sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
  369. raiseIOSelectorsError(osLastError())
  370. rkey.events.incl(Event.Signal)
  371. elif Event.Process in pkey.events:
  372. var data = SignalFdInfo()
  373. if posix.read(cint(fdi), addr data,
  374. sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
  375. raiseIOSelectorsError(osLastError())
  376. if cast[int](data.ssi_pid) == pkey.param:
  377. rkey.events.incl(Event.Process)
  378. else:
  379. inc(i)
  380. continue
  381. elif Event.User in pkey.events:
  382. var data: uint64 = 0
  383. if posix.read(cint(fdi), addr data,
  384. sizeof(uint64)) != sizeof(uint64):
  385. let err = osLastError()
  386. if err == OSErrorCode(EAGAIN):
  387. inc(i)
  388. continue
  389. else:
  390. raiseIOSelectorsError(err)
  391. rkey.events.incl(Event.User)
  392. else:
  393. if (pevents and EPOLLIN) != 0:
  394. if Event.Read in pkey.events:
  395. rkey.events.incl(Event.Read)
  396. elif Event.Timer in pkey.events:
  397. var data: uint64 = 0
  398. if posix.read(cint(fdi), addr data,
  399. sizeof(uint64)) != sizeof(uint64):
  400. raiseIOSelectorsError(osLastError())
  401. rkey.events.incl(Event.Timer)
  402. elif Event.User in pkey.events:
  403. var data: uint64 = 0
  404. if posix.read(cint(fdi), addr data,
  405. sizeof(uint64)) != sizeof(uint64):
  406. let err = osLastError()
  407. if err == OSErrorCode(EAGAIN):
  408. inc(i)
  409. continue
  410. else:
  411. raiseIOSelectorsError(err)
  412. rkey.events.incl(Event.User)
  413. if Event.Oneshot in pkey.events:
  414. var epv = EpollEvent()
  415. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0:
  416. raiseIOSelectorsError(osLastError())
  417. # we will not clear key until it will be unregistered, so
  418. # application can obtain data, but we will decrease counter,
  419. # because epoll is empty.
  420. dec(s.count)
  421. # we are marking key with `Finished` event, to avoid double decrease.
  422. pkey.events.incl(Event.Finished)
  423. results[k] = rkey
  424. inc(k)
  425. inc(i)
  426. result = k
  427. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  428. result = newSeq[ReadyKey](MAX_EPOLL_EVENTS)
  429. let count = selectInto(s, timeout, result)
  430. result.setLen(count)
  431. template isEmpty*[T](s: Selector[T]): bool =
  432. (s.count == 0)
  433. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  434. return s.fds[fd.int].ident != InvalidIdent
  435. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  436. let fdi = int(fd)
  437. s.checkFd(fdi)
  438. if fdi in s:
  439. result = s.fds[fdi].data
  440. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  441. let fdi = int(fd)
  442. s.checkFd(fdi)
  443. if fdi in s:
  444. s.fds[fdi].data = data
  445. result = true
  446. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  447. body: untyped) =
  448. mixin checkFd
  449. let fdi = int(fd)
  450. s.checkFd(fdi)
  451. if fdi in s:
  452. var value = addr(s.getData(fdi))
  453. body
  454. template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
  455. body2: untyped) =
  456. mixin checkFd
  457. let fdi = int(fd)
  458. s.checkFd(fdi)
  459. if fdi in s:
  460. var value = addr(s.getData(fdi))
  461. body1
  462. else:
  463. body2
  464. proc getFd*[T](s: Selector[T]): int =
  465. return s.epollFd.int