ioselectors_select.nim 13 KB


  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements Posix and Windows select().
  10. import times, nativesockets
  11. when defined(windows):
  12. import winlean
  13. when defined(gcc):
  14. {.passl: "-lws2_32".}
  15. elif defined(vcc):
  16. {.passl: "ws2_32.lib".}
  17. const platformHeaders = """#include <winsock2.h>
  18. #include <windows.h>"""
  19. const EAGAIN = WSAEWOULDBLOCK
  20. else:
  21. const platformHeaders = """#include <sys/select.h>
  22. #include <sys/time.h>
  23. #include <sys/types.h>
  24. #include <unistd.h>"""
  25. type
  26. FdSet {.importc: "fd_set", header: platformHeaders, pure, final.} = object
  27. var
  28. FD_SETSIZE {.importc: "FD_SETSIZE", header: platformHeaders.}: cint
  29. proc IOFD_SET(fd: SocketHandle, fdset: ptr FdSet)
  30. {.cdecl, importc: "FD_SET", header: platformHeaders, inline.}
  31. proc IOFD_CLR(fd: SocketHandle, fdset: ptr FdSet)
  32. {.cdecl, importc: "FD_CLR", header: platformHeaders, inline.}
  33. proc IOFD_ZERO(fdset: ptr FdSet)
  34. {.cdecl, importc: "FD_ZERO", header: platformHeaders, inline.}
  35. when defined(windows):
  36. proc IOFD_ISSET(fd: SocketHandle, fdset: ptr FdSet): cint
  37. {.stdcall, importc: "FD_ISSET", header: platformHeaders, inline.}
  38. proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr FdSet,
  39. timeout: ptr Timeval): cint
  40. {.stdcall, importc: "select", header: platformHeaders.}
  41. else:
  42. proc IOFD_ISSET(fd: SocketHandle, fdset: ptr FdSet): cint
  43. {.cdecl, importc: "FD_ISSET", header: platformHeaders, inline.}
  44. proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr FdSet,
  45. timeout: ptr Timeval): cint
  46. {.cdecl, importc: "select", header: platformHeaders.}
  47. when hasThreadSupport:
  48. type
  49. SelectorImpl[T] = object
  50. rSet: FdSet
  51. wSet: FdSet
  52. eSet: FdSet
  53. maxFD: int
  54. fds: ptr SharedArray[SelectorKey[T]]
  55. count*: int
  56. lock: Lock
  57. Selector*[T] = ptr SelectorImpl[T]
  58. else:
  59. type
  60. SelectorImpl[T] = object
  61. rSet: FdSet
  62. wSet: FdSet
  63. eSet: FdSet
  64. maxFD: int
  65. fds: seq[SelectorKey[T]]
  66. count*: int
  67. Selector*[T] = ref SelectorImpl[T]
  68. type
  69. SelectEventImpl = object
  70. rsock: SocketHandle
  71. wsock: SocketHandle
  72. SelectEvent* = ptr SelectEventImpl
  73. when hasThreadSupport:
  74. template withSelectLock[T](s: Selector[T], body: untyped) =
  75. acquire(s.lock)
  76. {.locks: [s.lock].}:
  77. try:
  78. body
  79. finally:
  80. release(s.lock)
  81. else:
  82. template withSelectLock[T](s: Selector[T], body: untyped) =
  83. body
  84. proc newSelector*[T](): Selector[T] =
  85. when hasThreadSupport:
  86. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  87. result.fds = allocSharedArray[SelectorKey[T]](FD_SETSIZE)
  88. initLock result.lock
  89. else:
  90. result = Selector[T]()
  91. result.fds = newSeq[SelectorKey[T]](FD_SETSIZE)
  92. for i in 0 ..< FD_SETSIZE:
  93. result.fds[i].ident = InvalidIdent
  94. IOFD_ZERO(addr result.rSet)
  95. IOFD_ZERO(addr result.wSet)
  96. IOFD_ZERO(addr result.eSet)
  97. proc close*[T](s: Selector[T]) =
  98. when hasThreadSupport:
  99. deallocSharedArray(s.fds)
  100. deallocShared(cast[pointer](s))
  101. deinitLock(s.lock)
  102. when defined(windows):
  103. proc newSelectEvent*(): SelectEvent =
  104. var ssock = createNativeSocket()
  105. var wsock = createNativeSocket()
  106. var rsock: SocketHandle = INVALID_SOCKET
  107. var saddr = Sockaddr_in()
  108. saddr.sin_family = winlean.AF_INET
  109. saddr.sin_port = 0
  110. saddr.sin_addr.s_addr = INADDR_ANY
  111. if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)),
  112. sizeof(saddr).SockLen) < 0'i32:
  113. raiseIOSelectorsError(osLastError())
  114. if winlean.listen(ssock, 1) != 0:
  115. raiseIOSelectorsError(osLastError())
  116. var namelen = sizeof(saddr).SockLen
  117. if getsockname(ssock, cast[ptr SockAddr](addr(saddr)),
  118. addr(namelen)) != 0'i32:
  119. raiseIOSelectorsError(osLastError())
  120. saddr.sin_addr.s_addr = 0x0100007F
  121. if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)),
  122. sizeof(saddr).SockLen) != 0:
  123. raiseIOSelectorsError(osLastError())
  124. namelen = sizeof(saddr).SockLen
  125. rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)),
  126. cast[ptr SockLen](addr(namelen)))
  127. if rsock == SocketHandle(-1):
  128. raiseIOSelectorsError(osLastError())
  129. if winlean.closesocket(ssock) != 0:
  130. raiseIOSelectorsError(osLastError())
  131. var mode = clong(1)
  132. if ioctlsocket(rsock, FIONBIO, addr(mode)) != 0:
  133. raiseIOSelectorsError(osLastError())
  134. mode = clong(1)
  135. if ioctlsocket(wsock, FIONBIO, addr(mode)) != 0:
  136. raiseIOSelectorsError(osLastError())
  137. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  138. result.rsock = rsock
  139. result.wsock = wsock
  140. proc trigger*(ev: SelectEvent) =
  141. var data: uint64 = 1
  142. if winlean.send(ev.wsock, cast[pointer](addr data),
  143. cint(sizeof(uint64)), 0) != sizeof(uint64):
  144. raiseIOSelectorsError(osLastError())
  145. proc close*(ev: SelectEvent) =
  146. let res1 = winlean.closesocket(ev.rsock)
  147. let res2 = winlean.closesocket(ev.wsock)
  148. deallocShared(cast[pointer](ev))
  149. if res1 != 0 or res2 != 0:
  150. raiseIOSelectorsError(osLastError())
  151. else:
  152. proc newSelectEvent*(): SelectEvent =
  153. var fds: array[2, cint]
  154. if posix.pipe(fds) != 0:
  155. raiseIOSelectorsError(osLastError())
  156. setNonBlocking(fds[0])
  157. setNonBlocking(fds[1])
  158. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  159. result.rsock = SocketHandle(fds[0])
  160. result.wsock = SocketHandle(fds[1])
  161. proc trigger*(ev: SelectEvent) =
  162. var data: uint64 = 1
  163. if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64):
  164. raiseIOSelectorsError(osLastError())
  165. proc close*(ev: SelectEvent) =
  166. let res1 = posix.close(cint(ev.rsock))
  167. let res2 = posix.close(cint(ev.wsock))
  168. deallocShared(cast[pointer](ev))
  169. if res1 != 0 or res2 != 0:
  170. raiseIOSelectorsError(osLastError())
  171. proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event],
  172. data: T) =
  173. var i = 0
  174. let fdi = int(fd)
  175. while i < FD_SETSIZE:
  176. if s.fds[i].ident == InvalidIdent:
  177. var pkey = addr(s.fds[i])
  178. pkey.ident = fdi
  179. pkey.events = events
  180. pkey.data = data
  181. break
  182. inc(i)
  183. if i >= FD_SETSIZE:
  184. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  185. proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
  186. var i = 0
  187. let fdi = int(fd)
  188. while i < FD_SETSIZE:
  189. if s.fds[i].ident == fdi:
  190. result = addr(s.fds[i])
  191. break
  192. inc(i)
  193. doAssert(i < FD_SETSIZE,
  194. "Descriptor [" & $int(fd) & "] is not registered in the queue!")
  195. proc delKey[T](s: Selector[T], fd: SocketHandle) =
  196. var empty: T
  197. var i = 0
  198. while i < FD_SETSIZE:
  199. if s.fds[i].ident == fd.int:
  200. s.fds[i].ident = InvalidIdent
  201. s.fds[i].events = {}
  202. s.fds[i].data = empty
  203. break
  204. inc(i)
  205. doAssert(i < FD_SETSIZE,
  206. "Descriptor [" & $int(fd) & "] is not registered in the queue!")
  207. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  208. events: set[Event], data: T) =
  209. when not defined(windows):
  210. let fdi = int(fd)
  211. s.withSelectLock():
  212. s.setSelectKey(fd, events, data)
  213. when not defined(windows):
  214. if fdi > s.maxFD: s.maxFD = fdi
  215. if Event.Read in events:
  216. IOFD_SET(fd, addr s.rSet)
  217. inc(s.count)
  218. if Event.Write in events:
  219. IOFD_SET(fd, addr s.wSet)
  220. IOFD_SET(fd, addr s.eSet)
  221. inc(s.count)
  222. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  223. when not defined(windows):
  224. let fdi = int(ev.rsock)
  225. s.withSelectLock():
  226. s.setSelectKey(ev.rsock, {Event.User}, data)
  227. when not defined(windows):
  228. if fdi > s.maxFD: s.maxFD = fdi
  229. IOFD_SET(ev.rsock, addr s.rSet)
  230. inc(s.count)
  231. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
  232. events: set[Event]) =
  233. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  234. Event.User, Event.Oneshot, Event.Error}
  235. s.withSelectLock():
  236. var pkey = s.getKey(fd)
  237. doAssert(pkey.events * maskEvents == {})
  238. if pkey.events != events:
  239. if (Event.Read in pkey.events) and (Event.Read notin events):
  240. IOFD_CLR(fd, addr s.rSet)
  241. dec(s.count)
  242. if (Event.Write in pkey.events) and (Event.Write notin events):
  243. IOFD_CLR(fd, addr s.wSet)
  244. IOFD_CLR(fd, addr s.eSet)
  245. dec(s.count)
  246. if (Event.Read notin pkey.events) and (Event.Read in events):
  247. IOFD_SET(fd, addr s.rSet)
  248. inc(s.count)
  249. if (Event.Write notin pkey.events) and (Event.Write in events):
  250. IOFD_SET(fd, addr s.wSet)
  251. IOFD_SET(fd, addr s.eSet)
  252. inc(s.count)
  253. pkey.events = events
  254. proc unregister*[T](s: Selector[T], fd: SocketHandle|int) =
  255. s.withSelectLock():
  256. let fd = fd.SocketHandle
  257. var pkey = s.getKey(fd)
  258. if Event.Read in pkey.events or Event.User in pkey.events:
  259. IOFD_CLR(fd, addr s.rSet)
  260. dec(s.count)
  261. if Event.Write in pkey.events:
  262. IOFD_CLR(fd, addr s.wSet)
  263. IOFD_CLR(fd, addr s.eSet)
  264. dec(s.count)
  265. s.delKey(fd)
  266. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  267. let fd = ev.rsock
  268. s.withSelectLock():
  269. var pkey = s.getKey(fd)
  270. IOFD_CLR(fd, addr s.rSet)
  271. dec(s.count)
  272. s.delKey(fd)
  273. proc selectInto*[T](s: Selector[T], timeout: int,
  274. results: var openArray[ReadyKey]): int =
  275. var tv = Timeval()
  276. var ptv = addr tv
  277. var rset, wset, eset: FdSet
  278. verifySelectParams(timeout)
  279. if timeout != -1:
  280. when defined(genode) or defined(freertos) or defined(zephyr) or defined(nuttx):
  281. tv.tv_sec = Time(timeout div 1_000)
  282. else:
  283. tv.tv_sec = timeout.int32 div 1_000
  284. tv.tv_usec = (timeout.int32 %% 1_000) * 1_000
  285. else:
  286. ptv = nil
  287. s.withSelectLock():
  288. rset = s.rSet
  289. wset = s.wSet
  290. eset = s.eSet
  291. var count = ioselect(cint(s.maxFD) + 1, addr(rset), addr(wset),
  292. addr(eset), ptv)
  293. if count < 0:
  294. result = 0
  295. when defined(windows):
  296. raiseIOSelectorsError(osLastError())
  297. else:
  298. let err = osLastError()
  299. if cint(err) != EINTR:
  300. raiseIOSelectorsError(err)
  301. elif count == 0:
  302. result = 0
  303. else:
  304. var rindex = 0
  305. var i = 0
  306. var k = 0
  307. while (i < FD_SETSIZE) and (k < count):
  308. if s.fds[i].ident != InvalidIdent:
  309. var flag = false
  310. var pkey = addr(s.fds[i])
  311. var rkey = ReadyKey(fd: int(pkey.ident), events: {})
  312. let fd = SocketHandle(pkey.ident)
  313. if IOFD_ISSET(fd, addr rset) != 0:
  314. if Event.User in pkey.events:
  315. var data: uint64 = 0
  316. if recv(fd, cast[pointer](addr(data)),
  317. sizeof(uint64).cint, 0) != sizeof(uint64):
  318. let err = osLastError()
  319. if cint(err) != EAGAIN:
  320. raiseIOSelectorsError(err)
  321. else:
  322. inc(i)
  323. inc(k)
  324. continue
  325. else:
  326. flag = true
  327. rkey.events = {Event.User}
  328. else:
  329. flag = true
  330. rkey.events = {Event.Read}
  331. if IOFD_ISSET(fd, addr wset) != 0:
  332. rkey.events.incl(Event.Write)
  333. if IOFD_ISSET(fd, addr eset) != 0:
  334. rkey.events.incl(Event.Error)
  335. flag = true
  336. if flag:
  337. results[rindex] = rkey
  338. inc(rindex)
  339. inc(k)
  340. inc(i)
  341. result = rindex
  342. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  343. result = newSeq[ReadyKey](FD_SETSIZE)
  344. var count = selectInto(s, timeout, result)
  345. result.setLen(count)
  346. proc flush*[T](s: Selector[T]) = discard
  347. template isEmpty*[T](s: Selector[T]): bool =
  348. (s.count == 0)
  349. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  350. s.withSelectLock():
  351. result = false
  352. let fdi = int(fd)
  353. for i in 0..<FD_SETSIZE:
  354. if s.fds[i].ident == fdi:
  355. return true
  356. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  357. s.withSelectLock():
  358. let fdi = int(fd)
  359. for i in 0..<FD_SETSIZE:
  360. if s.fds[i].ident == fdi:
  361. return s.fds[i].data
  362. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  363. s.withSelectLock():
  364. let fdi = int(fd)
  365. var i = 0
  366. while i < FD_SETSIZE:
  367. if s.fds[i].ident == fdi:
  368. var pkey = addr(s.fds[i])
  369. pkey.data = data
  370. result = true
  371. break
  372. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  373. body: untyped) =
  374. mixin withSelectLock
  375. s.withSelectLock():
  376. var value: ptr T
  377. let fdi = int(fd)
  378. var i = 0
  379. while i < FD_SETSIZE:
  380. if s.fds[i].ident == fdi:
  381. value = addr(s.fds[i].data)
  382. break
  383. inc(i)
  384. if i != FD_SETSIZE:
  385. body
  386. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  387. body1, body2: untyped) =
  388. mixin withSelectLock
  389. s.withSelectLock():
  390. block:
  391. var value: ptr T
  392. let fdi = int(fd)
  393. var i = 0
  394. while i < FD_SETSIZE:
  395. if s.fds[i].ident == fdi:
  396. value = addr(s.fds[i].data)
  397. break
  398. inc(i)
  399. if i != FD_SETSIZE:
  400. body1
  401. else:
  402. body2
  403. proc getFd*[T](s: Selector[T]): int =
  404. return -1