ioselectors_kqueue.nim 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements BSD kqueue().
  10. import std/[posix, times, kqueue, nativesockets]
  11. const
  12. # Maximum number of events that can be returned.
  13. MAX_KQUEUE_EVENTS = 64
  14. # SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them
  15. # to be constants and GC-safe.
  16. SIG_DFL = cast[proc(x: cint) {.noconv,gcsafe.}](0)
  17. SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1)
  18. when defined(kqcache):
  19. const CACHE_EVENTS = true
  20. when defined(macosx) or defined(freebsd) or defined(dragonfly):
  21. when defined(macosx):
  22. const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS)
  23. else:
  24. const MAX_DESCRIPTORS_ID = 27 # KERN_MAXFILESPERPROC (FreeBSD)
  25. proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
  26. newp: pointer, newplen: csize_t): cint
  27. {.importc: "sysctl",header: """#include <sys/types.h>
  28. #include <sys/sysctl.h>""".}
  29. elif defined(netbsd) or defined(openbsd):
  30. # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using
  31. # KERN_MAXFILES, because KERN_MAXFILES is always bigger,
  32. # than KERN_MAXFILESPERPROC.
  33. const MAX_DESCRIPTORS_ID = 7 # KERN_MAXFILES
  34. proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
  35. newp: pointer, newplen: csize_t): cint
  36. {.importc: "sysctl",header: """#include <sys/param.h>
  37. #include <sys/sysctl.h>""".}
  38. when hasThreadSupport:
  39. type
  40. SelectorImpl[T] = object
  41. kqFD: cint
  42. maxFD: int
  43. changes: ptr SharedArray[KEvent]
  44. fds: ptr SharedArray[SelectorKey[T]]
  45. count*: int
  46. changesLock: Lock
  47. changesSize: int
  48. changesLength: int
  49. sock: cint
  50. Selector*[T] = ptr SelectorImpl[T]
  51. else:
  52. type
  53. SelectorImpl[T] = object
  54. kqFD: cint
  55. maxFD: int
  56. changes: seq[KEvent]
  57. fds: seq[SelectorKey[T]]
  58. count*: int
  59. sock: cint
  60. Selector*[T] = ref SelectorImpl[T]
  61. type
  62. SelectEventImpl = object
  63. rfd: cint
  64. wfd: cint
  65. SelectEvent* = ptr SelectEventImpl
  66. # SelectEvent is declared as `ptr` to be placed in `shared memory`,
  67. # so you can share one SelectEvent handle between threads.
  68. proc getUnique[T](s: Selector[T]): int {.inline.} =
  69. # we create duplicated handles to get unique indexes for our `fds` array.
  70. result = posix.fcntl(s.sock, F_DUPFD_CLOEXEC, s.sock)
  71. if result == -1:
  72. raiseIOSelectorsError(osLastError())
  73. proc newSelector*[T](): owned(Selector[T]) =
  74. var maxFD = 0.cint
  75. var size = csize_t(sizeof(cint))
  76. var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint]
  77. # Obtain maximum number of opened file descriptors for process
  78. if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size,
  79. nil, 0) != 0:
  80. raiseIOSelectorsError(osLastError())
  81. var kqFD = kqueue()
  82. if kqFD < 0:
  83. raiseIOSelectorsError(osLastError())
  84. # we allocating empty socket to duplicate it handle in future, to get unique
  85. # indexes for `fds` array. This is needed to properly identify
  86. # {Event.Timer, Event.Signal, Event.Process} events.
  87. let usock = createNativeSocket(posix.AF_INET, posix.SOCK_STREAM,
  88. posix.IPPROTO_TCP).cint
  89. if usock == -1:
  90. let err = osLastError()
  91. discard posix.close(kqFD)
  92. raiseIOSelectorsError(err)
  93. when hasThreadSupport:
  94. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  95. result.fds = allocSharedArray[SelectorKey[T]](maxFD)
  96. result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS)
  97. result.changesSize = MAX_KQUEUE_EVENTS
  98. initLock(result.changesLock)
  99. else:
  100. result = Selector[T]()
  101. result.fds = newSeq[SelectorKey[T]](maxFD)
  102. result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS)
  103. for i in 0 ..< maxFD:
  104. result.fds[i].ident = InvalidIdent
  105. result.sock = usock
  106. result.kqFD = kqFD
  107. result.maxFD = maxFD.int
  108. proc close*[T](s: Selector[T]) =
  109. let res1 = posix.close(s.kqFD)
  110. let res2 = posix.close(s.sock)
  111. when hasThreadSupport:
  112. deinitLock(s.changesLock)
  113. deallocSharedArray(s.fds)
  114. deallocShared(cast[pointer](s))
  115. if res1 != 0 or res2 != 0:
  116. raiseIOSelectorsError(osLastError())
  117. proc newSelectEvent*(): SelectEvent =
  118. var fds: array[2, cint]
  119. if posix.pipe(fds) != 0:
  120. raiseIOSelectorsError(osLastError())
  121. setNonBlocking(fds[0])
  122. setNonBlocking(fds[1])
  123. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  124. result.rfd = fds[0]
  125. result.wfd = fds[1]
  126. proc trigger*(ev: SelectEvent) =
  127. var data: uint64 = 1
  128. if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
  129. raiseIOSelectorsError(osLastError())
  130. proc close*(ev: SelectEvent) =
  131. let res1 = posix.close(ev.rfd)
  132. let res2 = posix.close(ev.wfd)
  133. deallocShared(cast[pointer](ev))
  134. if res1 != 0 or res2 != 0:
  135. raiseIOSelectorsError(osLastError())
  136. template checkFd(s, f) =
  137. if f >= s.maxFD:
  138. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  139. when hasThreadSupport:
  140. template withChangeLock[T](s: Selector[T], body: untyped) =
  141. acquire(s.changesLock)
  142. {.locks: [s.changesLock].}:
  143. try:
  144. body
  145. finally:
  146. release(s.changesLock)
  147. else:
  148. template withChangeLock(s, body: untyped) =
  149. body
  150. when hasThreadSupport:
  151. template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
  152. nflags: cushort, nfflags: cuint, ndata: int,
  153. nudata: pointer) =
  154. mixin withChangeLock
  155. s.withChangeLock():
  156. if s.changesLength == s.changesSize:
  157. # if cache array is full, we allocating new with size * 2
  158. let newSize = s.changesSize shl 1
  159. let rdata = allocSharedArray[KEvent](newSize)
  160. copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent))
  161. s.changesSize = newSize
  162. s.changes[s.changesLength] = KEvent(ident: nident,
  163. filter: nfilter, flags: nflags,
  164. fflags: nfflags, data: ndata,
  165. udata: nudata)
  166. inc(s.changesLength)
  167. when not declared(CACHE_EVENTS):
  168. template flushKQueue[T](s: Selector[T]) =
  169. mixin withChangeLock
  170. s.withChangeLock():
  171. if s.changesLength > 0:
  172. if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength),
  173. nil, 0, nil) == -1:
  174. let res = osLastError()
  175. if cint(res) != ENOENT: # ignore pipes whose read end is closed
  176. raiseIOSelectorsError(res)
  177. s.changesLength = 0
  178. else:
  179. template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
  180. nflags: cushort, nfflags: cuint, ndata: int,
  181. nudata: pointer) =
  182. s.changes.add(KEvent(ident: nident,
  183. filter: nfilter, flags: nflags,
  184. fflags: nfflags, data: ndata,
  185. udata: nudata))
  186. when not declared(CACHE_EVENTS):
  187. template flushKQueue[T](s: Selector[T]) =
  188. let length = cint(len(s.changes))
  189. if length > 0:
  190. if kevent(s.kqFD, addr(s.changes[0]), length,
  191. nil, 0, nil) == -1:
  192. let res = osLastError()
  193. if cint(res) != ENOENT: # ignore pipes whose read end is closed
  194. raiseIOSelectorsError(res)
  195. s.changes.setLen(0)
  196. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  197. events: set[Event], data: T) =
  198. let fdi = int(fd)
  199. s.checkFd(fdi)
  200. doAssert(s.fds[fdi].ident == InvalidIdent)
  201. s.setKey(fdi, events, 0, data)
  202. if events != {}:
  203. if Event.Read in events:
  204. modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil)
  205. inc(s.count)
  206. if Event.Write in events:
  207. modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil)
  208. inc(s.count)
  209. when not declared(CACHE_EVENTS):
  210. flushKQueue(s)
  211. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
  212. events: set[Event]) =
  213. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  214. Event.User, Event.Oneshot, Event.Error}
  215. let fdi = int(fd)
  216. s.checkFd(fdi)
  217. var pkey = addr(s.fds[fdi])
  218. doAssert(pkey.ident != InvalidIdent,
  219. "Descriptor $# is not registered in the queue!" % $fdi)
  220. doAssert(pkey.events * maskEvents == {})
  221. if pkey.events != events:
  222. if (Event.Read in pkey.events) and (Event.Read notin events):
  223. modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
  224. dec(s.count)
  225. if (Event.Write in pkey.events) and (Event.Write notin events):
  226. modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil)
  227. dec(s.count)
  228. if (Event.Read notin pkey.events) and (Event.Read in events):
  229. modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
  230. inc(s.count)
  231. if (Event.Write notin pkey.events) and (Event.Write in events):
  232. modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil)
  233. inc(s.count)
  234. when not declared(CACHE_EVENTS):
  235. flushKQueue(s)
  236. pkey.events = events
  237. proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
  238. data: T): int {.discardable.} =
  239. let fdi = getUnique(s)
  240. s.checkFd(fdi)
  241. doAssert(s.fds[fdi].ident == InvalidIdent)
  242. let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer}
  243. let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD
  244. s.setKey(fdi, events, 0, data)
  245. # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds,
  246. # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds
  247. # too
  248. modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil)
  249. when not declared(CACHE_EVENTS):
  250. flushKQueue(s)
  251. inc(s.count)
  252. result = fdi
  253. proc registerSignal*[T](s: Selector[T], signal: int,
  254. data: T): int {.discardable.} =
  255. let fdi = getUnique(s)
  256. s.checkFd(fdi)
  257. doAssert(s.fds[fdi].ident == InvalidIdent)
  258. s.setKey(fdi, {Event.Signal}, signal, data)
  259. var nmask, omask: Sigset
  260. discard sigemptyset(nmask)
  261. discard sigemptyset(omask)
  262. discard sigaddset(nmask, cint(signal))
  263. blockSignals(nmask, omask)
  264. # to be compatible with linux semantic we need to "eat" signals
  265. posix.signal(cint(signal), SIG_IGN)
  266. modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0,
  267. cast[pointer](fdi))
  268. when not declared(CACHE_EVENTS):
  269. flushKQueue(s)
  270. inc(s.count)
  271. result = fdi
  272. proc registerProcess*[T](s: Selector[T], pid: int,
  273. data: T): int {.discardable.} =
  274. let fdi = getUnique(s)
  275. s.checkFd(fdi)
  276. doAssert(s.fds[fdi].ident == InvalidIdent)
  277. var kflags: cushort = EV_ONESHOT or EV_ADD
  278. setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data)
  279. modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0,
  280. cast[pointer](fdi))
  281. when not declared(CACHE_EVENTS):
  282. flushKQueue(s)
  283. inc(s.count)
  284. result = fdi
  285. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  286. let fdi = ev.rfd.int
  287. doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
  288. setKey(s, fdi, {Event.User}, 0, data)
  289. modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
  290. when not declared(CACHE_EVENTS):
  291. flushKQueue(s)
  292. inc(s.count)
  293. template processVnodeEvents(events: set[Event]): cuint =
  294. var rfflags = 0.cuint
  295. if events == {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend,
  296. Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename,
  297. Event.VnodeRevoke}:
  298. rfflags = NOTE_DELETE or NOTE_WRITE or NOTE_EXTEND or NOTE_ATTRIB or
  299. NOTE_LINK or NOTE_RENAME or NOTE_REVOKE
  300. else:
  301. if Event.VnodeDelete in events: rfflags = rfflags or NOTE_DELETE
  302. if Event.VnodeWrite in events: rfflags = rfflags or NOTE_WRITE
  303. if Event.VnodeExtend in events: rfflags = rfflags or NOTE_EXTEND
  304. if Event.VnodeAttrib in events: rfflags = rfflags or NOTE_ATTRIB
  305. if Event.VnodeLink in events: rfflags = rfflags or NOTE_LINK
  306. if Event.VnodeRename in events: rfflags = rfflags or NOTE_RENAME
  307. if Event.VnodeRevoke in events: rfflags = rfflags or NOTE_REVOKE
  308. rfflags
  309. proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) =
  310. let fdi = fd.int
  311. setKey(s, fdi, {Event.Vnode} + events, 0, data)
  312. var fflags = processVnodeEvents(events)
  313. modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil)
  314. when not declared(CACHE_EVENTS):
  315. flushKQueue(s)
  316. inc(s.count)
  317. proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
  318. let fdi = int(fd)
  319. s.checkFd(fdi)
  320. var pkey = addr(s.fds[fdi])
  321. doAssert(pkey.ident != InvalidIdent,
  322. "Descriptor [" & $fdi & "] is not registered in the queue!")
  323. if pkey.events != {}:
  324. if pkey.events * {Event.Read, Event.Write} != {}:
  325. if Event.Read in pkey.events:
  326. modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
  327. dec(s.count)
  328. if Event.Write in pkey.events:
  329. modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil)
  330. dec(s.count)
  331. when not declared(CACHE_EVENTS):
  332. flushKQueue(s)
  333. elif Event.Timer in pkey.events:
  334. if Event.Finished notin pkey.events:
  335. modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil)
  336. when not declared(CACHE_EVENTS):
  337. flushKQueue(s)
  338. dec(s.count)
  339. if posix.close(cint(pkey.ident)) != 0:
  340. raiseIOSelectorsError(osLastError())
  341. elif Event.Signal in pkey.events:
  342. var nmask, omask: Sigset
  343. let signal = cint(pkey.param)
  344. discard sigemptyset(nmask)
  345. discard sigemptyset(omask)
  346. discard sigaddset(nmask, signal)
  347. unblockSignals(nmask, omask)
  348. posix.signal(signal, SIG_DFL)
  349. modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil)
  350. when not declared(CACHE_EVENTS):
  351. flushKQueue(s)
  352. dec(s.count)
  353. if posix.close(cint(pkey.ident)) != 0:
  354. raiseIOSelectorsError(osLastError())
  355. elif Event.Process in pkey.events:
  356. if Event.Finished notin pkey.events:
  357. modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil)
  358. when not declared(CACHE_EVENTS):
  359. flushKQueue(s)
  360. dec(s.count)
  361. if posix.close(cint(pkey.ident)) != 0:
  362. raiseIOSelectorsError(osLastError())
  363. elif Event.Vnode in pkey.events:
  364. modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil)
  365. when not declared(CACHE_EVENTS):
  366. flushKQueue(s)
  367. dec(s.count)
  368. elif Event.User in pkey.events:
  369. modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
  370. when not declared(CACHE_EVENTS):
  371. flushKQueue(s)
  372. dec(s.count)
  373. clearKey(pkey)
  374. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  375. let fdi = int(ev.rfd)
  376. s.checkFd(fdi)
  377. var pkey = addr(s.fds[fdi])
  378. doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
  379. doAssert(Event.User in pkey.events)
  380. modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
  381. when not declared(CACHE_EVENTS):
  382. flushKQueue(s)
  383. clearKey(pkey)
  384. dec(s.count)
  385. proc selectInto*[T](s: Selector[T], timeout: int,
  386. results: var openArray[ReadyKey]): int =
  387. var
  388. tv: Timespec
  389. resTable: array[MAX_KQUEUE_EVENTS, KEvent]
  390. ptv = addr tv
  391. maxres = MAX_KQUEUE_EVENTS
  392. verifySelectParams(timeout)
  393. if timeout != -1:
  394. if timeout >= 1000:
  395. tv.tv_sec = posix.Time(timeout div 1_000)
  396. tv.tv_nsec = (timeout %% 1_000) * 1_000_000
  397. else:
  398. tv.tv_sec = posix.Time(0)
  399. tv.tv_nsec = timeout * 1_000_000
  400. else:
  401. ptv = nil
  402. if maxres > len(results):
  403. maxres = len(results)
  404. var count = 0
  405. when not declared(CACHE_EVENTS):
  406. count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv)
  407. else:
  408. when hasThreadSupport:
  409. s.withChangeLock():
  410. if s.changesLength > 0:
  411. count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength),
  412. addr(resTable[0]), cint(maxres), ptv)
  413. s.changesLength = 0
  414. else:
  415. count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres),
  416. ptv)
  417. else:
  418. let length = cint(len(s.changes))
  419. if length > 0:
  420. count = kevent(s.kqFD, addr(s.changes[0]), length,
  421. addr(resTable[0]), cint(maxres), ptv)
  422. s.changes.setLen(0)
  423. else:
  424. count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres),
  425. ptv)
  426. if count < 0:
  427. result = 0
  428. let err = osLastError()
  429. if cint(err) != EINTR:
  430. raiseIOSelectorsError(err)
  431. elif count == 0:
  432. result = 0
  433. else:
  434. var i = 0
  435. var k = 0 # do not delete this, because `continue` used in cycle.
  436. var pkey: ptr SelectorKey[T]
  437. while i < count:
  438. let kevent = addr(resTable[i])
  439. var rkey = ReadyKey(fd: int(kevent.ident), events: {})
  440. if (kevent.flags and EV_ERROR) != 0:
  441. rkey.events = {Event.Error}
  442. rkey.errorCode = OSErrorCode(kevent.data)
  443. case kevent.filter:
  444. of EVFILT_READ:
  445. pkey = addr(s.fds[int(kevent.ident)])
  446. rkey.events.incl(Event.Read)
  447. if Event.User in pkey.events:
  448. var data: uint64 = 0
  449. if posix.read(cint(kevent.ident), addr data,
  450. sizeof(uint64)) != sizeof(uint64):
  451. let err = osLastError()
  452. if err == OSErrorCode(EAGAIN):
  453. # someone already consumed event data
  454. inc(i)
  455. continue
  456. else:
  457. raiseIOSelectorsError(err)
  458. rkey.events = {Event.User}
  459. of EVFILT_WRITE:
  460. pkey = addr(s.fds[int(kevent.ident)])
  461. rkey.events.incl(Event.Write)
  462. rkey.events = {Event.Write}
  463. of EVFILT_TIMER:
  464. pkey = addr(s.fds[int(kevent.ident)])
  465. if Event.Oneshot in pkey.events:
  466. # we will not clear key until it will be unregistered, so
  467. # application can obtain data, but we will decrease counter,
  468. # because kqueue is empty.
  469. dec(s.count)
  470. # we are marking key with `Finished` event, to avoid double decrease.
  471. pkey.events.incl(Event.Finished)
  472. rkey.events.incl(Event.Timer)
  473. of EVFILT_VNODE:
  474. pkey = addr(s.fds[int(kevent.ident)])
  475. rkey.events.incl(Event.Vnode)
  476. if (kevent.fflags and NOTE_DELETE) != 0:
  477. rkey.events.incl(Event.VnodeDelete)
  478. if (kevent.fflags and NOTE_WRITE) != 0:
  479. rkey.events.incl(Event.VnodeWrite)
  480. if (kevent.fflags and NOTE_EXTEND) != 0:
  481. rkey.events.incl(Event.VnodeExtend)
  482. if (kevent.fflags and NOTE_ATTRIB) != 0:
  483. rkey.events.incl(Event.VnodeAttrib)
  484. if (kevent.fflags and NOTE_LINK) != 0:
  485. rkey.events.incl(Event.VnodeLink)
  486. if (kevent.fflags and NOTE_RENAME) != 0:
  487. rkey.events.incl(Event.VnodeRename)
  488. if (kevent.fflags and NOTE_REVOKE) != 0:
  489. rkey.events.incl(Event.VnodeRevoke)
  490. of EVFILT_SIGNAL:
  491. pkey = addr(s.fds[cast[int](kevent.udata)])
  492. rkey.fd = cast[int](kevent.udata)
  493. rkey.events.incl(Event.Signal)
  494. of EVFILT_PROC:
  495. rkey.fd = cast[int](kevent.udata)
  496. pkey = addr(s.fds[cast[int](kevent.udata)])
  497. # we will not clear key, until it will be unregistered, so
  498. # application can obtain data, but we will decrease counter,
  499. # because kqueue is empty.
  500. dec(s.count)
  501. # we are marking key with `Finished` event, to avoid double decrease.
  502. pkey.events.incl(Event.Finished)
  503. rkey.events.incl(Event.Process)
  504. else:
  505. doAssert(true, "Unsupported kqueue filter in the queue!")
  506. if (kevent.flags and EV_EOF) != 0:
  507. # TODO this error handling needs to be rethought.
  508. # `fflags` can sometimes be `0x80000000` and thus we use 'cast'
  509. # here:
  510. if kevent.fflags != 0:
  511. rkey.errorCode = cast[OSErrorCode](kevent.fflags)
  512. else:
  513. # This assumes we are dealing with sockets.
  514. # TODO: For future-proofing it might be a good idea to give the
  515. # user access to the raw `kevent`.
  516. rkey.errorCode = OSErrorCode(ECONNRESET)
  517. rkey.events.incl(Event.Error)
  518. results[k] = rkey
  519. inc(k)
  520. inc(i)
  521. result = k
  522. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  523. result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS)
  524. let count = selectInto(s, timeout, result)
  525. result.setLen(count)
  526. template isEmpty*[T](s: Selector[T]): bool =
  527. (s.count == 0)
  528. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  529. return s.fds[fd.int].ident != InvalidIdent
  530. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  531. let fdi = int(fd)
  532. s.checkFd(fdi)
  533. if fdi in s:
  534. result = s.fds[fdi].data
  535. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  536. let fdi = int(fd)
  537. s.checkFd(fdi)
  538. if fdi in s:
  539. s.fds[fdi].data = data
  540. result = true
  541. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  542. body: untyped) =
  543. mixin checkFd
  544. let fdi = int(fd)
  545. s.checkFd(fdi)
  546. if fdi in s:
  547. var value = addr(s.fds[fdi].data)
  548. body
  549. template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
  550. body2: untyped) =
  551. mixin checkFd
  552. let fdi = int(fd)
  553. s.checkFd(fdi)
  554. if fdi in s:
  555. var value = addr(s.fds[fdi].data)
  556. body1
  557. else:
  558. body2
  559. proc getFd*[T](s: Selector[T]): int =
  560. return s.kqFD.int