ioselectors_poll.nim 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements Posix poll().
  10. import posix, times
  11. # Maximum number of events that can be returned
  12. const MAX_POLL_EVENTS = 64
  13. when hasThreadSupport:
  14. type
  15. SelectorImpl[T] = object
  16. maxFD : int
  17. pollcnt: int
  18. fds: ptr SharedArray[SelectorKey[T]]
  19. pollfds: ptr SharedArray[TPollFd]
  20. count: int
  21. lock: Lock
  22. Selector*[T] = ptr SelectorImpl[T]
  23. else:
  24. type
  25. SelectorImpl[T] = object
  26. maxFD : int
  27. pollcnt: int
  28. fds: seq[SelectorKey[T]]
  29. pollfds: seq[TPollFd]
  30. count: int
  31. Selector*[T] = ref SelectorImpl[T]
  32. type
  33. SelectEventImpl = object
  34. rfd: cint
  35. wfd: cint
  36. SelectEvent* = ptr SelectEventImpl
  37. when hasThreadSupport:
  38. template withPollLock[T](s: Selector[T], body: untyped) =
  39. acquire(s.lock)
  40. {.locks: [s.lock].}:
  41. try:
  42. body
  43. finally:
  44. release(s.lock)
  45. else:
  46. template withPollLock(s, body: untyped) =
  47. body
  48. proc newSelector*[T](): Selector[T] =
  49. var a = RLimit()
  50. if getrlimit(posix.RLIMIT_NOFILE, a) != 0:
  51. raiseIOSelectorsError(osLastError())
  52. var maxFD = int(a.rlim_max)
  53. when hasThreadSupport:
  54. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  55. result.maxFD = maxFD
  56. result.fds = allocSharedArray[SelectorKey[T]](maxFD)
  57. result.pollfds = allocSharedArray[TPollFd](maxFD)
  58. initLock(result.lock)
  59. else:
  60. result = Selector[T]()
  61. result.maxFD = maxFD
  62. result.fds = newSeq[SelectorKey[T]](maxFD)
  63. result.pollfds = newSeq[TPollFd](maxFD)
  64. for i in 0 ..< maxFD:
  65. result.fds[i].ident = InvalidIdent
  66. proc close*[T](s: Selector[T]) =
  67. when hasThreadSupport:
  68. deinitLock(s.lock)
  69. deallocSharedArray(s.fds)
  70. deallocSharedArray(s.pollfds)
  71. deallocShared(cast[pointer](s))
  72. template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) =
  73. withPollLock(s):
  74. var pollev: cshort = 0
  75. if Event.Read in events: pollev = pollev or POLLIN
  76. if Event.Write in events: pollev = pollev or POLLOUT
  77. s.pollfds[s.pollcnt].fd = cint(sock)
  78. s.pollfds[s.pollcnt].events = pollev
  79. inc(s.count)
  80. inc(s.pollcnt)
  81. template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) =
  82. withPollLock(s):
  83. var i = 0
  84. var pollev: cshort = 0
  85. if Event.Read in events: pollev = pollev or POLLIN
  86. if Event.Write in events: pollev = pollev or POLLOUT
  87. while i < s.pollcnt:
  88. if s.pollfds[i].fd == sock:
  89. s.pollfds[i].events = pollev
  90. break
  91. inc(i)
  92. doAssert(i < s.pollcnt,
  93. "Descriptor [" & $sock & "] is not registered in the queue!")
  94. template pollRemove[T](s: Selector[T], sock: cint) =
  95. withPollLock(s):
  96. var i = 0
  97. while i < s.pollcnt:
  98. if s.pollfds[i].fd == sock:
  99. if i == s.pollcnt - 1:
  100. s.pollfds[i].fd = 0
  101. s.pollfds[i].events = 0
  102. s.pollfds[i].revents = 0
  103. else:
  104. while i < (s.pollcnt - 1):
  105. s.pollfds[i].fd = s.pollfds[i + 1].fd
  106. s.pollfds[i].events = s.pollfds[i + 1].events
  107. inc(i)
  108. break
  109. inc(i)
  110. dec(s.pollcnt)
  111. dec(s.count)
  112. template checkFd(s, f) =
  113. if f >= s.maxFD:
  114. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  115. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  116. events: set[Event], data: T) =
  117. var fdi = int(fd)
  118. s.checkFd(fdi)
  119. doAssert(s.fds[fdi].ident == InvalidIdent)
  120. setKey(s, fdi, events, 0, data)
  121. if events != {}: s.pollAdd(fdi.cint, events)
  122. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
  123. events: set[Event]) =
  124. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  125. Event.User, Event.Oneshot, Event.Error}
  126. let fdi = int(fd)
  127. s.checkFd(fdi)
  128. var pkey = addr(s.fds[fdi])
  129. doAssert(pkey.ident != InvalidIdent,
  130. "Descriptor [" & $fdi & "] is not registered in the queue!")
  131. doAssert(pkey.events * maskEvents == {})
  132. if pkey.events != events:
  133. if pkey.events == {}:
  134. s.pollAdd(fd.cint, events)
  135. else:
  136. if events != {}:
  137. s.pollUpdate(fd.cint, events)
  138. else:
  139. s.pollRemove(fd.cint)
  140. pkey.events = events
  141. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  142. var fdi = int(ev.rfd)
  143. doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
  144. var events = {Event.User}
  145. setKey(s, fdi, events, 0, data)
  146. events.incl(Event.Read)
  147. s.pollAdd(fdi.cint, events)
  148. proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
  149. let fdi = int(fd)
  150. s.checkFd(fdi)
  151. var pkey = addr(s.fds[fdi])
  152. doAssert(pkey.ident != InvalidIdent,
  153. "Descriptor [" & $fdi & "] is not registered in the queue!")
  154. pkey.ident = InvalidIdent
  155. if pkey.events != {}:
  156. pkey.events = {}
  157. s.pollRemove(fdi.cint)
  158. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  159. let fdi = int(ev.rfd)
  160. s.checkFd(fdi)
  161. var pkey = addr(s.fds[fdi])
  162. doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
  163. doAssert(Event.User in pkey.events)
  164. pkey.ident = InvalidIdent
  165. pkey.events = {}
  166. s.pollRemove(fdi.cint)
  167. proc newSelectEvent*(): SelectEvent =
  168. var fds: array[2, cint]
  169. if posix.pipe(fds) != 0:
  170. raiseIOSelectorsError(osLastError())
  171. setNonBlocking(fds[0])
  172. setNonBlocking(fds[1])
  173. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  174. result.rfd = fds[0]
  175. result.wfd = fds[1]
  176. proc trigger*(ev: SelectEvent) =
  177. var data: uint64 = 1
  178. if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
  179. raiseIOSelectorsError(osLastError())
  180. proc close*(ev: SelectEvent) =
  181. let res1 = posix.close(ev.rfd)
  182. let res2 = posix.close(ev.wfd)
  183. deallocShared(cast[pointer](ev))
  184. if res1 != 0 or res2 != 0:
  185. raiseIOSelectorsError(osLastError())
  186. proc selectInto*[T](s: Selector[T], timeout: int,
  187. results: var openarray[ReadyKey]): int =
  188. var maxres = MAX_POLL_EVENTS
  189. if maxres > len(results):
  190. maxres = len(results)
  191. s.withPollLock():
  192. let count = posix.poll(addr(s.pollfds[0]), Tnfds(s.pollcnt), timeout)
  193. if count < 0:
  194. result = 0
  195. let err = osLastError()
  196. if cint(err) != EINTR:
  197. raiseIOSelectorsError(err)
  198. elif count == 0:
  199. result = 0
  200. else:
  201. var i = 0
  202. var k = 0
  203. var rindex = 0
  204. while (i < s.pollcnt) and (k < count) and (rindex < maxres):
  205. let revents = s.pollfds[i].revents
  206. if revents != 0:
  207. let fd = s.pollfds[i].fd
  208. var pkey = addr(s.fds[fd])
  209. var rkey = ReadyKey(fd: int(fd), events: {})
  210. if (revents and POLLIN) != 0:
  211. rkey.events.incl(Event.Read)
  212. if Event.User in pkey.events:
  213. var data: uint64 = 0
  214. if posix.read(fd, addr data, sizeof(uint64)) != sizeof(uint64):
  215. let err = osLastError()
  216. if err != OSErrorCode(EAGAIN):
  217. raiseIOSelectorsError(err)
  218. else:
  219. # someone already consumed event data
  220. inc(i)
  221. continue
  222. rkey.events = {Event.User}
  223. if (revents and POLLOUT) != 0:
  224. rkey.events.incl(Event.Write)
  225. if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or
  226. (revents and POLLNVAL) != 0:
  227. rkey.events.incl(Event.Error)
  228. results[rindex] = rkey
  229. s.pollfds[i].revents = 0
  230. inc(rindex)
  231. inc(k)
  232. inc(i)
  233. result = k
  234. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  235. result = newSeq[ReadyKey](MAX_POLL_EVENTS)
  236. let count = selectInto(s, timeout, result)
  237. result.setLen(count)
  238. template isEmpty*[T](s: Selector[T]): bool =
  239. (s.count == 0)
  240. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  241. return s.fds[fd.int].ident != InvalidIdent
  242. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  243. let fdi = int(fd)
  244. s.checkFd(fdi)
  245. if fdi in s:
  246. result = s.fds[fdi].data
  247. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  248. let fdi = int(fd)
  249. s.checkFd(fdi)
  250. if fdi in s:
  251. s.fds[fdi].data = data
  252. result = true
  253. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  254. body: untyped) =
  255. mixin checkFd
  256. let fdi = int(fd)
  257. s.checkFd(fdi)
  258. if fdi in s:
  259. var value = addr(s.getData(fdi))
  260. body
  261. template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
  262. body2: untyped) =
  263. mixin checkFd
  264. let fdi = int(fd)
  265. s.checkFd(fdi)
  266. if fdi in s:
  267. var value = addr(s.getData(fdi))
  268. body1
  269. else:
  270. body2
  271. proc getFd*[T](s: Selector[T]): int =
  272. return -1