ioselectors_poll.nim 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements Posix poll().
  10. import std/[posix, times]
  11. # Maximum number of events that can be returned
  12. const MAX_POLL_EVENTS = 64
  13. const hasEventFds = defined(zephyr) or defined(nimPollHasEventFds)
  14. when hasEventFds:
  15. proc eventfd(count: cuint, flags: cint): cint
  16. {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
  17. when hasThreadSupport:
  18. type
  19. SelectorImpl[T] = object
  20. maxFD : int
  21. pollcnt: int
  22. fds: ptr SharedArray[SelectorKey[T]]
  23. pollfds: ptr SharedArray[TPollFd]
  24. count*: int
  25. lock: Lock
  26. Selector*[T] = ptr SelectorImpl[T]
  27. else:
  28. type
  29. SelectorImpl[T] = object
  30. maxFD : int
  31. pollcnt: int
  32. fds: seq[SelectorKey[T]]
  33. pollfds: seq[TPollFd]
  34. count*: int
  35. Selector*[T] = ref SelectorImpl[T]
  36. type
  37. SelectEventImpl = object
  38. rfd: cint
  39. wfd: cint
  40. SelectEvent* = ptr SelectEventImpl
  41. when hasThreadSupport:
  42. template withPollLock[T](s: Selector[T], body: untyped) =
  43. acquire(s.lock)
  44. {.locks: [s.lock].}:
  45. try:
  46. body
  47. finally:
  48. release(s.lock)
  49. else:
  50. template withPollLock(s, body: untyped) =
  51. body
  52. proc newSelector*[T](): Selector[T] =
  53. var maxFD = maxDescriptors()
  54. when hasThreadSupport:
  55. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  56. result.maxFD = maxFD
  57. result.fds = allocSharedArray[SelectorKey[T]](maxFD)
  58. result.pollfds = allocSharedArray[TPollFd](maxFD)
  59. initLock(result.lock)
  60. else:
  61. result = Selector[T]()
  62. result.maxFD = maxFD
  63. result.fds = newSeq[SelectorKey[T]](maxFD)
  64. result.pollfds = newSeq[TPollFd](maxFD)
  65. for i in 0 ..< maxFD:
  66. result.fds[i].ident = InvalidIdent
  67. proc close*[T](s: Selector[T]) =
  68. when hasThreadSupport:
  69. deinitLock(s.lock)
  70. deallocSharedArray(s.fds)
  71. deallocSharedArray(s.pollfds)
  72. deallocShared(cast[pointer](s))
  73. template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) =
  74. withPollLock(s):
  75. var pollev: cshort = 0
  76. if Event.Read in events: pollev = pollev or POLLIN
  77. if Event.Write in events: pollev = pollev or POLLOUT
  78. s.pollfds[s.pollcnt].fd = cint(sock)
  79. s.pollfds[s.pollcnt].events = pollev
  80. inc(s.count)
  81. inc(s.pollcnt)
  82. template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) =
  83. withPollLock(s):
  84. var i = 0
  85. var pollev: cshort = 0
  86. if Event.Read in events: pollev = pollev or POLLIN
  87. if Event.Write in events: pollev = pollev or POLLOUT
  88. while i < s.pollcnt:
  89. if s.pollfds[i].fd == sock:
  90. s.pollfds[i].events = pollev
  91. break
  92. inc(i)
  93. doAssert(i < s.pollcnt,
  94. "Descriptor [" & $sock & "] is not registered in the queue!")
  95. template pollRemove[T](s: Selector[T], sock: cint) =
  96. withPollLock(s):
  97. var i = 0
  98. while i < s.pollcnt:
  99. if s.pollfds[i].fd == sock:
  100. if i == s.pollcnt - 1:
  101. s.pollfds[i].fd = 0
  102. s.pollfds[i].events = 0
  103. s.pollfds[i].revents = 0
  104. else:
  105. while i < (s.pollcnt - 1):
  106. s.pollfds[i].fd = s.pollfds[i + 1].fd
  107. s.pollfds[i].events = s.pollfds[i + 1].events
  108. inc(i)
  109. break
  110. inc(i)
  111. dec(s.pollcnt)
  112. dec(s.count)
  113. template checkFd(s, f) =
  114. if f >= s.maxFD:
  115. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  116. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  117. events: set[Event], data: T) =
  118. var fdi = int(fd)
  119. s.checkFd(fdi)
  120. doAssert(s.fds[fdi].ident == InvalidIdent)
  121. setKey(s, fdi, events, 0, data)
  122. if events != {}: s.pollAdd(fdi.cint, events)
  123. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
  124. events: set[Event]) =
  125. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  126. Event.User, Event.Oneshot, Event.Error}
  127. let fdi = int(fd)
  128. s.checkFd(fdi)
  129. var pkey = addr(s.fds[fdi])
  130. doAssert(pkey.ident != InvalidIdent,
  131. "Descriptor [" & $fdi & "] is not registered in the queue!")
  132. doAssert(pkey.events * maskEvents == {})
  133. if pkey.events != events:
  134. if pkey.events == {}:
  135. s.pollAdd(fd.cint, events)
  136. else:
  137. if events != {}:
  138. s.pollUpdate(fd.cint, events)
  139. else:
  140. s.pollRemove(fd.cint)
  141. pkey.events = events
  142. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  143. var fdi = int(ev.rfd)
  144. doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
  145. var events = {Event.User}
  146. setKey(s, fdi, events, 0, data)
  147. events.incl(Event.Read)
  148. s.pollAdd(fdi.cint, events)
  149. proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
  150. let fdi = int(fd)
  151. s.checkFd(fdi)
  152. var pkey = addr(s.fds[fdi])
  153. doAssert(pkey.ident != InvalidIdent,
  154. "Descriptor [" & $fdi & "] is not registered in the queue!")
  155. pkey.ident = InvalidIdent
  156. if pkey.events != {}:
  157. pkey.events = {}
  158. s.pollRemove(fdi.cint)
  159. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  160. let fdi = int(ev.rfd)
  161. s.checkFd(fdi)
  162. var pkey = addr(s.fds[fdi])
  163. doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
  164. doAssert(Event.User in pkey.events)
  165. pkey.ident = InvalidIdent
  166. pkey.events = {}
  167. s.pollRemove(fdi.cint)
  168. proc newSelectEvent*(): SelectEvent =
  169. when not hasEventFds:
  170. var fds: array[2, cint]
  171. if posix.pipe(fds) != 0:
  172. raiseIOSelectorsError(osLastError())
  173. setNonBlocking(fds[0])
  174. setNonBlocking(fds[1])
  175. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  176. result.rfd = fds[0]
  177. result.wfd = fds[1]
  178. else:
  179. let fdci = eventfd(0, posix.O_NONBLOCK)
  180. if fdci == -1:
  181. raiseIOSelectorsError(osLastError())
  182. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  183. result.rfd = fdci
  184. result.wfd = fdci
  185. proc trigger*(ev: SelectEvent) =
  186. var data: uint64 = 1
  187. if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
  188. raiseIOSelectorsError(osLastError())
  189. proc close*(ev: SelectEvent) =
  190. let res1 = posix.close(ev.rfd)
  191. let res2 =
  192. when hasEventFds: 0
  193. else: posix.close(ev.wfd)
  194. deallocShared(cast[pointer](ev))
  195. if res1 != 0 or res2 != 0:
  196. raiseIOSelectorsError(osLastError())
  197. proc selectInto*[T](s: Selector[T], timeout: int,
  198. results: var openArray[ReadyKey]): int =
  199. var maxres = MAX_POLL_EVENTS
  200. if maxres > len(results):
  201. maxres = len(results)
  202. verifySelectParams(timeout)
  203. s.withPollLock():
  204. let count = posix.poll(addr(s.pollfds[0]), Tnfds(s.pollcnt), timeout)
  205. if count < 0:
  206. result = 0
  207. let err = osLastError()
  208. if cint(err) != EINTR:
  209. raiseIOSelectorsError(err)
  210. elif count == 0:
  211. result = 0
  212. else:
  213. var i = 0
  214. var k = 0
  215. var rindex = 0
  216. while (i < s.pollcnt) and (k < count) and (rindex < maxres):
  217. let revents = s.pollfds[i].revents
  218. if revents != 0:
  219. let fd = s.pollfds[i].fd
  220. var pkey = addr(s.fds[fd])
  221. var rkey = ReadyKey(fd: int(fd), events: {})
  222. if (revents and POLLIN) != 0:
  223. rkey.events.incl(Event.Read)
  224. if Event.User in pkey.events:
  225. var data: uint64 = 0
  226. if posix.read(fd, addr data, sizeof(uint64)) != sizeof(uint64):
  227. let err = osLastError()
  228. if err != OSErrorCode(EAGAIN):
  229. raiseIOSelectorsError(err)
  230. else:
  231. # someone already consumed event data
  232. inc(i)
  233. continue
  234. rkey.events = {Event.User}
  235. if (revents and POLLOUT) != 0:
  236. rkey.events.incl(Event.Write)
  237. if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or
  238. (revents and POLLNVAL) != 0:
  239. rkey.events.incl(Event.Error)
  240. results[rindex] = rkey
  241. s.pollfds[i].revents = 0
  242. inc(rindex)
  243. inc(k)
  244. inc(i)
  245. result = k
  246. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  247. result = newSeq[ReadyKey](MAX_POLL_EVENTS)
  248. let count = selectInto(s, timeout, result)
  249. result.setLen(count)
  250. template isEmpty*[T](s: Selector[T]): bool =
  251. (s.count == 0)
  252. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  253. return s.fds[fd.int].ident != InvalidIdent
  254. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  255. let fdi = int(fd)
  256. s.checkFd(fdi)
  257. if fdi in s:
  258. result = s.fds[fdi].data
  259. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  260. let fdi = int(fd)
  261. s.checkFd(fdi)
  262. if fdi in s:
  263. s.fds[fdi].data = data
  264. result = true
  265. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  266. body: untyped) =
  267. mixin checkFd
  268. let fdi = int(fd)
  269. s.checkFd(fdi)
  270. if fdi in s:
  271. var value = addr(s.getData(fdi))
  272. body
  273. template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
  274. body2: untyped) =
  275. mixin checkFd
  276. let fdi = int(fd)
  277. s.checkFd(fdi)
  278. if fdi in s:
  279. var value = addr(s.getData(fdi))
  280. body1
  281. else:
  282. body2
  283. proc getFd*[T](s: Selector[T]): int =
  284. return -1