ioselectors_select.nim 13 KB


  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements Posix and Windows select().
  10. import times, nativesockets
  11. when defined(windows):
  12. import winlean
  13. when defined(gcc):
  14. {.passL: "-lws2_32".}
  15. elif defined(vcc):
  16. {.passL: "ws2_32.lib".}
  17. const platformHeaders = """#include <winsock2.h>
  18. #include <windows.h>"""
  19. const EAGAIN = WSAEWOULDBLOCK
  20. else:
  21. const platformHeaders = """#include <sys/select.h>
  22. #include <sys/time.h>
  23. #include <sys/types.h>
  24. #include <unistd.h>"""
  25. type
  26. Fdset {.importc: "fd_set", header: platformHeaders, pure, final.} = object
  27. var
  28. FD_SETSIZE {.importc: "FD_SETSIZE", header: platformHeaders.}: cint
  29. proc IOFD_SET(fd: SocketHandle, fdset: ptr Fdset)
  30. {.cdecl, importc: "FD_SET", header: platformHeaders, inline.}
  31. proc IOFD_CLR(fd: SocketHandle, fdset: ptr Fdset)
  32. {.cdecl, importc: "FD_CLR", header: platformHeaders, inline.}
  33. proc IOFD_ZERO(fdset: ptr Fdset)
  34. {.cdecl, importc: "FD_ZERO", header: platformHeaders, inline.}
  35. when defined(windows):
  36. proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint
  37. {.stdcall, importc: "FD_ISSET", header: platformHeaders, inline.}
  38. proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset,
  39. timeout: ptr Timeval): cint
  40. {.stdcall, importc: "select", header: platformHeaders.}
  41. else:
  42. proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint
  43. {.cdecl, importc: "FD_ISSET", header: platformHeaders, inline.}
  44. proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset,
  45. timeout: ptr Timeval): cint
  46. {.cdecl, importc: "select", header: platformHeaders.}
  47. when hasThreadSupport:
  48. type
  49. SelectorImpl[T] = object
  50. rSet: FdSet
  51. wSet: FdSet
  52. eSet: FdSet
  53. maxFD: int
  54. fds: ptr SharedArray[SelectorKey[T]]
  55. count: int
  56. lock: Lock
  57. Selector*[T] = ptr SelectorImpl[T]
  58. else:
  59. type
  60. SelectorImpl[T] = object
  61. rSet: FdSet
  62. wSet: FdSet
  63. eSet: FdSet
  64. maxFD: int
  65. fds: seq[SelectorKey[T]]
  66. count: int
  67. Selector*[T] = ref SelectorImpl[T]
  68. type
  69. SelectEventImpl = object
  70. rsock: SocketHandle
  71. wsock: SocketHandle
  72. SelectEvent* = ptr SelectEventImpl
  73. when hasThreadSupport:
  74. template withSelectLock[T](s: Selector[T], body: untyped) =
  75. acquire(s.lock)
  76. {.locks: [s.lock].}:
  77. try:
  78. body
  79. finally:
  80. release(s.lock)
  81. else:
  82. template withSelectLock[T](s: Selector[T], body: untyped) =
  83. body
  84. proc newSelector*[T](): Selector[T] =
  85. when hasThreadSupport:
  86. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  87. result.fds = allocSharedArray[SelectorKey[T]](FD_SETSIZE)
  88. initLock result.lock
  89. else:
  90. result = Selector[T]()
  91. result.fds = newSeq[SelectorKey[T]](FD_SETSIZE)
  92. for i in 0 ..< FD_SETSIZE:
  93. result.fds[i].ident = InvalidIdent
  94. IOFD_ZERO(addr result.rSet)
  95. IOFD_ZERO(addr result.wSet)
  96. IOFD_ZERO(addr result.eSet)
  97. proc close*[T](s: Selector[T]) =
  98. when hasThreadSupport:
  99. deallocSharedArray(s.fds)
  100. deallocShared(cast[pointer](s))
  101. when defined(windows):
  102. proc newSelectEvent*(): SelectEvent =
  103. var ssock = newNativeSocket()
  104. var wsock = newNativeSocket()
  105. var rsock: SocketHandle = INVALID_SOCKET
  106. var saddr = Sockaddr_in()
  107. saddr.sin_family = winlean.AF_INET
  108. saddr.sin_port = 0
  109. saddr.sin_addr.s_addr = INADDR_ANY
  110. if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)),
  111. sizeof(saddr).SockLen) < 0'i32:
  112. raiseIOSelectorsError(osLastError())
  113. if winlean.listen(ssock, 1) != 0:
  114. raiseIOSelectorsError(osLastError())
  115. var namelen = sizeof(saddr).SockLen
  116. if getsockname(ssock, cast[ptr SockAddr](addr(saddr)),
  117. addr(namelen)) != 0'i32:
  118. raiseIOSelectorsError(osLastError())
  119. saddr.sin_addr.s_addr = 0x0100007F
  120. if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)),
  121. sizeof(saddr).SockLen) != 0:
  122. raiseIOSelectorsError(osLastError())
  123. namelen = sizeof(saddr).SockLen
  124. rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)),
  125. cast[ptr SockLen](addr(namelen)))
  126. if rsock == SocketHandle(-1):
  127. raiseIOSelectorsError(osLastError())
  128. if winlean.closesocket(ssock) != 0:
  129. raiseIOSelectorsError(osLastError())
  130. var mode = clong(1)
  131. if ioctlsocket(rsock, FIONBIO, addr(mode)) != 0:
  132. raiseIOSelectorsError(osLastError())
  133. mode = clong(1)
  134. if ioctlsocket(wsock, FIONBIO, addr(mode)) != 0:
  135. raiseIOSelectorsError(osLastError())
  136. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  137. result.rsock = rsock
  138. result.wsock = wsock
  139. proc trigger*(ev: SelectEvent) =
  140. var data: uint64 = 1
  141. if winlean.send(ev.wsock, cast[pointer](addr data),
  142. cint(sizeof(uint64)), 0) != sizeof(uint64):
  143. raiseIOSelectorsError(osLastError())
  144. proc close*(ev: SelectEvent) =
  145. let res1 = winlean.closesocket(ev.rsock)
  146. let res2 = winlean.closesocket(ev.wsock)
  147. deallocShared(cast[pointer](ev))
  148. if res1 != 0 or res2 != 0:
  149. raiseIOSelectorsError(osLastError())
  150. else:
  151. proc newSelectEvent*(): SelectEvent =
  152. var fds: array[2, cint]
  153. if posix.pipe(fds) != 0:
  154. raiseIOSelectorsError(osLastError())
  155. setNonBlocking(fds[0])
  156. setNonBlocking(fds[1])
  157. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  158. result.rsock = SocketHandle(fds[0])
  159. result.wsock = SocketHandle(fds[1])
  160. proc trigger*(ev: SelectEvent) =
  161. var data: uint64 = 1
  162. if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64):
  163. raiseIOSelectorsError(osLastError())
  164. proc close*(ev: SelectEvent) =
  165. let res1 = posix.close(cint(ev.rsock))
  166. let res2 = posix.close(cint(ev.wsock))
  167. deallocShared(cast[pointer](ev))
  168. if res1 != 0 or res2 != 0:
  169. raiseIOSelectorsError(osLastError())
  170. proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event],
  171. data: T) =
  172. var i = 0
  173. let fdi = int(fd)
  174. while i < FD_SETSIZE:
  175. if s.fds[i].ident == InvalidIdent:
  176. var pkey = addr(s.fds[i])
  177. pkey.ident = fdi
  178. pkey.events = events
  179. pkey.data = data
  180. break
  181. inc(i)
  182. if i >= FD_SETSIZE:
  183. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  184. proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
  185. var i = 0
  186. let fdi = int(fd)
  187. while i < FD_SETSIZE:
  188. if s.fds[i].ident == fdi:
  189. result = addr(s.fds[i])
  190. break
  191. inc(i)
  192. doAssert(i < FD_SETSIZE,
  193. "Descriptor [" & $int(fd) & "] is not registered in the queue!")
  194. proc delKey[T](s: Selector[T], fd: SocketHandle) =
  195. var empty: T
  196. var i = 0
  197. while i < FD_SETSIZE:
  198. if s.fds[i].ident == fd.int:
  199. s.fds[i].ident = InvalidIdent
  200. s.fds[i].events = {}
  201. s.fds[i].data = empty
  202. break
  203. inc(i)
  204. doAssert(i < FD_SETSIZE,
  205. "Descriptor [" & $int(fd) & "] is not registered in the queue!")
  206. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  207. events: set[Event], data: T) =
  208. when not defined(windows):
  209. let fdi = int(fd)
  210. s.withSelectLock():
  211. s.setSelectKey(fd, events, data)
  212. when not defined(windows):
  213. if fdi > s.maxFD: s.maxFD = fdi
  214. if Event.Read in events:
  215. IOFD_SET(fd, addr s.rSet)
  216. inc(s.count)
  217. if Event.Write in events:
  218. IOFD_SET(fd, addr s.wSet)
  219. IOFD_SET(fd, addr s.eSet)
  220. inc(s.count)
  221. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  222. when not defined(windows):
  223. let fdi = int(ev.rsock)
  224. s.withSelectLock():
  225. s.setSelectKey(ev.rsock, {Event.User}, data)
  226. when not defined(windows):
  227. if fdi > s.maxFD: s.maxFD = fdi
  228. IOFD_SET(ev.rsock, addr s.rSet)
  229. inc(s.count)
  230. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
  231. events: set[Event]) =
  232. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  233. Event.User, Event.Oneshot, Event.Error}
  234. s.withSelectLock():
  235. var pkey = s.getKey(fd)
  236. doAssert(pkey.events * maskEvents == {})
  237. if pkey.events != events:
  238. if (Event.Read in pkey.events) and (Event.Read notin events):
  239. IOFD_CLR(fd, addr s.rSet)
  240. dec(s.count)
  241. if (Event.Write in pkey.events) and (Event.Write notin events):
  242. IOFD_CLR(fd, addr s.wSet)
  243. IOFD_CLR(fd, addr s.eSet)
  244. dec(s.count)
  245. if (Event.Read notin pkey.events) and (Event.Read in events):
  246. IOFD_SET(fd, addr s.rSet)
  247. inc(s.count)
  248. if (Event.Write notin pkey.events) and (Event.Write in events):
  249. IOFD_SET(fd, addr s.wSet)
  250. IOFD_SET(fd, addr s.eSet)
  251. inc(s.count)
  252. pkey.events = events
  253. proc unregister*[T](s: Selector[T], fd: SocketHandle|int) =
  254. s.withSelectLock():
  255. let fd = fd.SocketHandle
  256. var pkey = s.getKey(fd)
  257. if Event.Read in pkey.events:
  258. IOFD_CLR(fd, addr s.rSet)
  259. dec(s.count)
  260. if Event.Write in pkey.events:
  261. IOFD_CLR(fd, addr s.wSet)
  262. IOFD_CLR(fd, addr s.eSet)
  263. dec(s.count)
  264. s.delKey(fd)
  265. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  266. let fd = ev.rsock
  267. s.withSelectLock():
  268. var pkey = s.getKey(fd)
  269. IOFD_CLR(fd, addr s.rSet)
  270. dec(s.count)
  271. s.delKey(fd)
  272. proc selectInto*[T](s: Selector[T], timeout: int,
  273. results: var openarray[ReadyKey]): int =
  274. var tv = Timeval()
  275. var ptv = addr tv
  276. var rset, wset, eset: FdSet
  277. if timeout != -1:
  278. when defined(genode):
  279. tv.tv_sec = Time(timeout div 1_000)
  280. else:
  281. tv.tv_sec = timeout.int32 div 1_000
  282. tv.tv_usec = (timeout.int32 %% 1_000) * 1_000
  283. else:
  284. ptv = nil
  285. s.withSelectLock():
  286. rset = s.rSet
  287. wset = s.wSet
  288. eset = s.eSet
  289. var count = ioselect(cint(s.maxFD) + 1, addr(rset), addr(wset),
  290. addr(eset), ptv)
  291. if count < 0:
  292. result = 0
  293. when defined(windows):
  294. raiseIOSelectorsError(osLastError())
  295. else:
  296. let err = osLastError()
  297. if cint(err) != EINTR:
  298. raiseIOSelectorsError(err)
  299. elif count == 0:
  300. result = 0
  301. else:
  302. var rindex = 0
  303. var i = 0
  304. var k = 0
  305. while (i < FD_SETSIZE) and (k < count):
  306. if s.fds[i].ident != InvalidIdent:
  307. var flag = false
  308. var pkey = addr(s.fds[i])
  309. var rkey = ReadyKey(fd: int(pkey.ident), events: {})
  310. let fd = SocketHandle(pkey.ident)
  311. if IOFD_ISSET(fd, addr rset) != 0:
  312. if Event.User in pkey.events:
  313. var data: uint64 = 0
  314. if recv(fd, cast[pointer](addr(data)),
  315. sizeof(uint64).cint, 0) != sizeof(uint64):
  316. let err = osLastError()
  317. if cint(err) != EAGAIN:
  318. raiseIOSelectorsError(err)
  319. else:
  320. inc(i)
  321. inc(k)
  322. continue
  323. else:
  324. flag = true
  325. rkey.events = {Event.User}
  326. else:
  327. flag = true
  328. rkey.events = {Event.Read}
  329. if IOFD_ISSET(fd, addr wset) != 0:
  330. rkey.events.incl(Event.Write)
  331. if IOFD_ISSET(fd, addr eset) != 0:
  332. rkey.events.incl(Event.Error)
  333. flag = true
  334. if flag:
  335. results[rindex] = rkey
  336. inc(rindex)
  337. inc(k)
  338. inc(i)
  339. result = rindex
  340. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  341. result = newSeq[ReadyKey](FD_SETSIZE)
  342. var count = selectInto(s, timeout, result)
  343. result.setLen(count)
  344. proc flush*[T](s: Selector[T]) = discard
  345. template isEmpty*[T](s: Selector[T]): bool =
  346. (s.count == 0)
  347. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  348. s.withSelectLock():
  349. result = false
  350. let fdi = int(fd)
  351. for i in 0..<FD_SETSIZE:
  352. if s.fds[i].ident == fdi:
  353. return true
  354. when hasThreadSupport:
  355. template withSelectLock[T](s: Selector[T], body: untyped) =
  356. acquire(s.lock)
  357. {.locks: [s.lock].}:
  358. try:
  359. body
  360. finally:
  361. release(s.lock)
  362. else:
  363. template withSelectLock[T](s: Selector[T], body: untyped) =
  364. body
  365. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  366. s.withSelectLock():
  367. let fdi = int(fd)
  368. for i in 0..<FD_SETSIZE:
  369. if s.fds[i].ident == fdi:
  370. return s.fds[i].data
  371. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  372. s.withSelectLock():
  373. let fdi = int(fd)
  374. var i = 0
  375. while i < FD_SETSIZE:
  376. if s.fds[i].ident == fdi:
  377. var pkey = addr(s.fds[i])
  378. pkey.data = data
  379. result = true
  380. break
  381. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  382. body: untyped) =
  383. mixin withSelectLock
  384. s.withSelectLock():
  385. var value: ptr T
  386. let fdi = int(fd)
  387. var i = 0
  388. while i < FD_SETSIZE:
  389. if s.fds[i].ident == fdi:
  390. value = addr(s.fds[i].data)
  391. break
  392. inc(i)
  393. if i != FD_SETSIZE:
  394. body
  395. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  396. body1, body2: untyped) =
  397. mixin withSelectLock
  398. s.withSelectLock():
  399. block:
  400. var value: ptr T
  401. let fdi = int(fd)
  402. var i = 0
  403. while i < FD_SETSIZE:
  404. if s.fds[i].ident == fdi:
  405. value = addr(s.fds[i].data)
  406. break
  407. inc(i)
  408. if i != FD_SETSIZE:
  409. body1
  410. else:
  411. body2
  412. proc getFd*[T](s: Selector[T]): int =
  413. return -1