ioselectors_kqueue.nim 21 KB


  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements BSD kqueue().
  10. import std/[posix, times, kqueue, nativesockets]
  11. const
  12. # Maximum number of events that can be returned.
  13. MAX_KQUEUE_EVENTS = 64
  14. # SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them
  15. # to be constants and GC-safe.
  16. SIG_DFL = cast[proc(x: cint) {.noconv,gcsafe.}](0)
  17. SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1)
  18. when defined(kqcache):
  19. const CACHE_EVENTS = true
  20. when defined(macosx) or defined(freebsd) or defined(dragonfly):
  21. when defined(macosx):
  22. const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS)
  23. else:
  24. const MAX_DESCRIPTORS_ID = 27 # KERN_MAXFILESPERPROC (FreeBSD)
  25. proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
  26. newp: pointer, newplen: csize_t): cint
  27. {.importc: "sysctl",header: """#include <sys/types.h>
  28. #include <sys/sysctl.h>""".}
  29. elif defined(netbsd) or defined(openbsd):
  30. # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using
  31. # KERN_MAXFILES, because KERN_MAXFILES is always bigger,
  32. # than KERN_MAXFILESPERPROC.
  33. const MAX_DESCRIPTORS_ID = 7 # KERN_MAXFILES
  34. proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
  35. newp: pointer, newplen: csize_t): cint
  36. {.importc: "sysctl",header: """#include <sys/param.h>
  37. #include <sys/sysctl.h>""".}
  38. when hasThreadSupport:
  39. type
  40. SelectorImpl[T] = object
  41. kqFD: cint
  42. maxFD: int
  43. changes: ptr SharedArray[KEvent]
  44. fds: ptr SharedArray[SelectorKey[T]]
  45. count*: int
  46. changesLock: Lock
  47. changesSize: int
  48. changesLength: int
  49. sock: cint
  50. Selector*[T] = ptr SelectorImpl[T]
  51. else:
  52. type
  53. SelectorImpl[T] = object
  54. kqFD: cint
  55. maxFD: int
  56. changes: seq[KEvent]
  57. fds: seq[SelectorKey[T]]
  58. count*: int
  59. sock: cint
  60. Selector*[T] = ref SelectorImpl[T]
  61. type
  62. SelectEventImpl = object
  63. rfd: cint
  64. wfd: cint
  65. SelectEvent* = ptr SelectEventImpl
  66. # SelectEvent is declared as `ptr` to be placed in `shared memory`,
  67. # so you can share one SelectEvent handle between threads.
  68. proc getUnique[T](s: Selector[T]): int {.inline.} =
  69. # we create duplicated handles to get unique indexes for our `fds` array.
  70. result = posix.fcntl(s.sock, F_DUPFD_CLOEXEC, s.sock)
  71. if result == -1:
  72. raiseIOSelectorsError(osLastError())
  73. proc newSelector*[T](): owned(Selector[T]) =
  74. var maxFD = 0.cint
  75. var size = csize_t(sizeof(cint))
  76. var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint]
  77. # Obtain maximum number of opened file descriptors for process
  78. if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size,
  79. nil, 0) != 0:
  80. raiseIOSelectorsError(osLastError())
  81. var kqFD = kqueue()
  82. if kqFD < 0:
  83. raiseIOSelectorsError(osLastError())
  84. # we allocating empty socket to duplicate it handle in future, to get unique
  85. # indexes for `fds` array. This is needed to properly identify
  86. # {Event.Timer, Event.Signal, Event.Process} events.
  87. let usock = createNativeSocket(posix.AF_INET, posix.SOCK_STREAM,
  88. posix.IPPROTO_TCP).cint
  89. if usock == -1:
  90. let err = osLastError()
  91. discard posix.close(kqFD)
  92. raiseIOSelectorsError(err)
  93. when hasThreadSupport:
  94. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  95. result.fds = allocSharedArray[SelectorKey[T]](maxFD)
  96. result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS)
  97. result.changesSize = MAX_KQUEUE_EVENTS
  98. initLock(result.changesLock)
  99. else:
  100. result = Selector[T]()
  101. result.fds = newSeq[SelectorKey[T]](maxFD)
  102. result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS)
  103. for i in 0 ..< maxFD:
  104. result.fds[i].ident = InvalidIdent
  105. result.sock = usock
  106. result.kqFD = kqFD
  107. result.maxFD = maxFD.int
  108. proc close*[T](s: Selector[T]) =
  109. let res1 = posix.close(s.kqFD)
  110. let res2 = posix.close(s.sock)
  111. when hasThreadSupport:
  112. deinitLock(s.changesLock)
  113. deallocSharedArray(s.fds)
  114. deallocShared(cast[pointer](s))
  115. if res1 != 0 or res2 != 0:
  116. raiseIOSelectorsError(osLastError())
  117. proc newSelectEvent*(): SelectEvent =
  118. var fds: array[2, cint]
  119. if posix.pipe(fds) != 0:
  120. raiseIOSelectorsError(osLastError())
  121. setNonBlocking(fds[0])
  122. setNonBlocking(fds[1])
  123. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  124. result.rfd = fds[0]
  125. result.wfd = fds[1]
  126. proc trigger*(ev: SelectEvent) =
  127. var data: uint64 = 1
  128. if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
  129. raiseIOSelectorsError(osLastError())
  130. proc close*(ev: SelectEvent) =
  131. let res1 = posix.close(ev.rfd)
  132. let res2 = posix.close(ev.wfd)
  133. deallocShared(cast[pointer](ev))
  134. if res1 != 0 or res2 != 0:
  135. raiseIOSelectorsError(osLastError())
  136. template checkFd(s, f) =
  137. if f >= s.maxFD:
  138. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  139. when hasThreadSupport:
  140. template withChangeLock[T](s: Selector[T], body: untyped) =
  141. acquire(s.changesLock)
  142. {.locks: [s.changesLock].}:
  143. try:
  144. body
  145. finally:
  146. release(s.changesLock)
  147. else:
  148. template withChangeLock(s, body: untyped) =
  149. body
  150. when hasThreadSupport:
  151. template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
  152. nflags: cushort, nfflags: cuint, ndata: int,
  153. nudata: pointer) =
  154. mixin withChangeLock
  155. s.withChangeLock():
  156. if s.changesLength == s.changesSize:
  157. # if cache array is full, we allocating new with size * 2
  158. let newSize = s.changesSize shl 1
  159. let rdata = allocSharedArray[KEvent](newSize)
  160. copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent))
  161. s.changesSize = newSize
  162. s.changes[s.changesLength] = KEvent(ident: nident,
  163. filter: nfilter, flags: nflags,
  164. fflags: nfflags, data: ndata,
  165. udata: nudata)
  166. inc(s.changesLength)
  167. when not declared(CACHE_EVENTS):
  168. template flushKQueue[T](s: Selector[T]) =
  169. mixin withChangeLock
  170. s.withChangeLock():
  171. if s.changesLength > 0:
  172. if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength),
  173. nil, 0, nil) == -1:
  174. raiseIOSelectorsError(osLastError())
  175. s.changesLength = 0
  176. else:
  177. template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
  178. nflags: cushort, nfflags: cuint, ndata: int,
  179. nudata: pointer) =
  180. s.changes.add(KEvent(ident: nident,
  181. filter: nfilter, flags: nflags,
  182. fflags: nfflags, data: ndata,
  183. udata: nudata))
  184. when not declared(CACHE_EVENTS):
  185. template flushKQueue[T](s: Selector[T]) =
  186. let length = cint(len(s.changes))
  187. if length > 0:
  188. if kevent(s.kqFD, addr(s.changes[0]), length,
  189. nil, 0, nil) == -1:
  190. raiseIOSelectorsError(osLastError())
  191. s.changes.setLen(0)
  192. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  193. events: set[Event], data: T) =
  194. let fdi = int(fd)
  195. s.checkFd(fdi)
  196. doAssert(s.fds[fdi].ident == InvalidIdent)
  197. s.setKey(fdi, events, 0, data)
  198. if events != {}:
  199. if Event.Read in events:
  200. modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil)
  201. inc(s.count)
  202. if Event.Write in events:
  203. modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil)
  204. inc(s.count)
  205. when not declared(CACHE_EVENTS):
  206. flushKQueue(s)
  207. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
  208. events: set[Event]) =
  209. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  210. Event.User, Event.Oneshot, Event.Error}
  211. let fdi = int(fd)
  212. s.checkFd(fdi)
  213. var pkey = addr(s.fds[fdi])
  214. doAssert(pkey.ident != InvalidIdent,
  215. "Descriptor $# is not registered in the queue!" % $fdi)
  216. doAssert(pkey.events * maskEvents == {})
  217. if pkey.events != events:
  218. if (Event.Read in pkey.events) and (Event.Read notin events):
  219. modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
  220. dec(s.count)
  221. if (Event.Write in pkey.events) and (Event.Write notin events):
  222. modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil)
  223. dec(s.count)
  224. if (Event.Read notin pkey.events) and (Event.Read in events):
  225. modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
  226. inc(s.count)
  227. if (Event.Write notin pkey.events) and (Event.Write in events):
  228. modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil)
  229. inc(s.count)
  230. when not declared(CACHE_EVENTS):
  231. flushKQueue(s)
  232. pkey.events = events
  233. proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
  234. data: T): int {.discardable.} =
  235. let fdi = getUnique(s)
  236. s.checkFd(fdi)
  237. doAssert(s.fds[fdi].ident == InvalidIdent)
  238. let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer}
  239. let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD
  240. s.setKey(fdi, events, 0, data)
  241. # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds,
  242. # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds
  243. # too
  244. modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil)
  245. when not declared(CACHE_EVENTS):
  246. flushKQueue(s)
  247. inc(s.count)
  248. result = fdi
  249. proc registerSignal*[T](s: Selector[T], signal: int,
  250. data: T): int {.discardable.} =
  251. let fdi = getUnique(s)
  252. s.checkFd(fdi)
  253. doAssert(s.fds[fdi].ident == InvalidIdent)
  254. s.setKey(fdi, {Event.Signal}, signal, data)
  255. var nmask, omask: Sigset
  256. discard sigemptyset(nmask)
  257. discard sigemptyset(omask)
  258. discard sigaddset(nmask, cint(signal))
  259. blockSignals(nmask, omask)
  260. # to be compatible with linux semantic we need to "eat" signals
  261. posix.signal(cint(signal), SIG_IGN)
  262. modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0,
  263. cast[pointer](fdi))
  264. when not declared(CACHE_EVENTS):
  265. flushKQueue(s)
  266. inc(s.count)
  267. result = fdi
  268. proc registerProcess*[T](s: Selector[T], pid: int,
  269. data: T): int {.discardable.} =
  270. let fdi = getUnique(s)
  271. s.checkFd(fdi)
  272. doAssert(s.fds[fdi].ident == InvalidIdent)
  273. var kflags: cushort = EV_ONESHOT or EV_ADD
  274. setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data)
  275. modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0,
  276. cast[pointer](fdi))
  277. when not declared(CACHE_EVENTS):
  278. flushKQueue(s)
  279. inc(s.count)
  280. result = fdi
  281. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  282. let fdi = ev.rfd.int
  283. doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
  284. setKey(s, fdi, {Event.User}, 0, data)
  285. modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
  286. when not declared(CACHE_EVENTS):
  287. flushKQueue(s)
  288. inc(s.count)
  289. template processVnodeEvents(events: set[Event]): cuint =
  290. var rfflags = 0.cuint
  291. if events == {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend,
  292. Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename,
  293. Event.VnodeRevoke}:
  294. rfflags = NOTE_DELETE or NOTE_WRITE or NOTE_EXTEND or NOTE_ATTRIB or
  295. NOTE_LINK or NOTE_RENAME or NOTE_REVOKE
  296. else:
  297. if Event.VnodeDelete in events: rfflags = rfflags or NOTE_DELETE
  298. if Event.VnodeWrite in events: rfflags = rfflags or NOTE_WRITE
  299. if Event.VnodeExtend in events: rfflags = rfflags or NOTE_EXTEND
  300. if Event.VnodeAttrib in events: rfflags = rfflags or NOTE_ATTRIB
  301. if Event.VnodeLink in events: rfflags = rfflags or NOTE_LINK
  302. if Event.VnodeRename in events: rfflags = rfflags or NOTE_RENAME
  303. if Event.VnodeRevoke in events: rfflags = rfflags or NOTE_REVOKE
  304. rfflags
  305. proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) =
  306. let fdi = fd.int
  307. setKey(s, fdi, {Event.Vnode} + events, 0, data)
  308. var fflags = processVnodeEvents(events)
  309. modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil)
  310. when not declared(CACHE_EVENTS):
  311. flushKQueue(s)
  312. inc(s.count)
  313. proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
  314. let fdi = int(fd)
  315. s.checkFd(fdi)
  316. var pkey = addr(s.fds[fdi])
  317. doAssert(pkey.ident != InvalidIdent,
  318. "Descriptor [" & $fdi & "] is not registered in the queue!")
  319. if pkey.events != {}:
  320. if pkey.events * {Event.Read, Event.Write} != {}:
  321. if Event.Read in pkey.events:
  322. modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
  323. dec(s.count)
  324. if Event.Write in pkey.events:
  325. modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil)
  326. dec(s.count)
  327. when not declared(CACHE_EVENTS):
  328. flushKQueue(s)
  329. elif Event.Timer in pkey.events:
  330. if Event.Finished notin pkey.events:
  331. modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil)
  332. when not declared(CACHE_EVENTS):
  333. flushKQueue(s)
  334. dec(s.count)
  335. if posix.close(cint(pkey.ident)) != 0:
  336. raiseIOSelectorsError(osLastError())
  337. elif Event.Signal in pkey.events:
  338. var nmask, omask: Sigset
  339. let signal = cint(pkey.param)
  340. discard sigemptyset(nmask)
  341. discard sigemptyset(omask)
  342. discard sigaddset(nmask, signal)
  343. unblockSignals(nmask, omask)
  344. posix.signal(signal, SIG_DFL)
  345. modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil)
  346. when not declared(CACHE_EVENTS):
  347. flushKQueue(s)
  348. dec(s.count)
  349. if posix.close(cint(pkey.ident)) != 0:
  350. raiseIOSelectorsError(osLastError())
  351. elif Event.Process in pkey.events:
  352. if Event.Finished notin pkey.events:
  353. modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil)
  354. when not declared(CACHE_EVENTS):
  355. flushKQueue(s)
  356. dec(s.count)
  357. if posix.close(cint(pkey.ident)) != 0:
  358. raiseIOSelectorsError(osLastError())
  359. elif Event.Vnode in pkey.events:
  360. modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil)
  361. when not declared(CACHE_EVENTS):
  362. flushKQueue(s)
  363. dec(s.count)
  364. elif Event.User in pkey.events:
  365. modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
  366. when not declared(CACHE_EVENTS):
  367. flushKQueue(s)
  368. dec(s.count)
  369. clearKey(pkey)
  370. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  371. let fdi = int(ev.rfd)
  372. s.checkFd(fdi)
  373. var pkey = addr(s.fds[fdi])
  374. doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
  375. doAssert(Event.User in pkey.events)
  376. modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
  377. when not declared(CACHE_EVENTS):
  378. flushKQueue(s)
  379. clearKey(pkey)
  380. dec(s.count)
  381. proc selectInto*[T](s: Selector[T], timeout: int,
  382. results: var openArray[ReadyKey]): int =
  383. var
  384. tv: Timespec
  385. resTable: array[MAX_KQUEUE_EVENTS, KEvent]
  386. ptv = addr tv
  387. maxres = MAX_KQUEUE_EVENTS
  388. verifySelectParams(timeout)
  389. if timeout != -1:
  390. if timeout >= 1000:
  391. tv.tv_sec = posix.Time(timeout div 1_000)
  392. tv.tv_nsec = (timeout %% 1_000) * 1_000_000
  393. else:
  394. tv.tv_sec = posix.Time(0)
  395. tv.tv_nsec = timeout * 1_000_000
  396. else:
  397. ptv = nil
  398. if maxres > len(results):
  399. maxres = len(results)
  400. var count = 0
  401. when not declared(CACHE_EVENTS):
  402. count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv)
  403. else:
  404. when hasThreadSupport:
  405. s.withChangeLock():
  406. if s.changesLength > 0:
  407. count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength),
  408. addr(resTable[0]), cint(maxres), ptv)
  409. s.changesLength = 0
  410. else:
  411. count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres),
  412. ptv)
  413. else:
  414. let length = cint(len(s.changes))
  415. if length > 0:
  416. count = kevent(s.kqFD, addr(s.changes[0]), length,
  417. addr(resTable[0]), cint(maxres), ptv)
  418. s.changes.setLen(0)
  419. else:
  420. count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres),
  421. ptv)
  422. if count < 0:
  423. result = 0
  424. let err = osLastError()
  425. if cint(err) != EINTR:
  426. raiseIOSelectorsError(err)
  427. elif count == 0:
  428. result = 0
  429. else:
  430. var i = 0
  431. var k = 0 # do not delete this, because `continue` used in cycle.
  432. var pkey: ptr SelectorKey[T]
  433. while i < count:
  434. let kevent = addr(resTable[i])
  435. var rkey = ReadyKey(fd: int(kevent.ident), events: {})
  436. if (kevent.flags and EV_ERROR) != 0:
  437. rkey.events = {Event.Error}
  438. rkey.errorCode = OSErrorCode(kevent.data)
  439. case kevent.filter:
  440. of EVFILT_READ:
  441. pkey = addr(s.fds[int(kevent.ident)])
  442. rkey.events.incl(Event.Read)
  443. if Event.User in pkey.events:
  444. var data: uint64 = 0
  445. if posix.read(cint(kevent.ident), addr data,
  446. sizeof(uint64)) != sizeof(uint64):
  447. let err = osLastError()
  448. if err == OSErrorCode(EAGAIN):
  449. # someone already consumed event data
  450. inc(i)
  451. continue
  452. else:
  453. raiseIOSelectorsError(err)
  454. rkey.events = {Event.User}
  455. of EVFILT_WRITE:
  456. pkey = addr(s.fds[int(kevent.ident)])
  457. rkey.events.incl(Event.Write)
  458. rkey.events = {Event.Write}
  459. of EVFILT_TIMER:
  460. pkey = addr(s.fds[int(kevent.ident)])
  461. if Event.Oneshot in pkey.events:
  462. # we will not clear key until it will be unregistered, so
  463. # application can obtain data, but we will decrease counter,
  464. # because kqueue is empty.
  465. dec(s.count)
  466. # we are marking key with `Finished` event, to avoid double decrease.
  467. pkey.events.incl(Event.Finished)
  468. rkey.events.incl(Event.Timer)
  469. of EVFILT_VNODE:
  470. pkey = addr(s.fds[int(kevent.ident)])
  471. rkey.events.incl(Event.Vnode)
  472. if (kevent.fflags and NOTE_DELETE) != 0:
  473. rkey.events.incl(Event.VnodeDelete)
  474. if (kevent.fflags and NOTE_WRITE) != 0:
  475. rkey.events.incl(Event.VnodeWrite)
  476. if (kevent.fflags and NOTE_EXTEND) != 0:
  477. rkey.events.incl(Event.VnodeExtend)
  478. if (kevent.fflags and NOTE_ATTRIB) != 0:
  479. rkey.events.incl(Event.VnodeAttrib)
  480. if (kevent.fflags and NOTE_LINK) != 0:
  481. rkey.events.incl(Event.VnodeLink)
  482. if (kevent.fflags and NOTE_RENAME) != 0:
  483. rkey.events.incl(Event.VnodeRename)
  484. if (kevent.fflags and NOTE_REVOKE) != 0:
  485. rkey.events.incl(Event.VnodeRevoke)
  486. of EVFILT_SIGNAL:
  487. pkey = addr(s.fds[cast[int](kevent.udata)])
  488. rkey.fd = cast[int](kevent.udata)
  489. rkey.events.incl(Event.Signal)
  490. of EVFILT_PROC:
  491. rkey.fd = cast[int](kevent.udata)
  492. pkey = addr(s.fds[cast[int](kevent.udata)])
  493. # we will not clear key, until it will be unregistered, so
  494. # application can obtain data, but we will decrease counter,
  495. # because kqueue is empty.
  496. dec(s.count)
  497. # we are marking key with `Finished` event, to avoid double decrease.
  498. pkey.events.incl(Event.Finished)
  499. rkey.events.incl(Event.Process)
  500. else:
  501. doAssert(true, "Unsupported kqueue filter in the queue!")
  502. if (kevent.flags and EV_EOF) != 0:
  503. # TODO this error handling needs to be rethought.
  504. # `fflags` can sometimes be `0x80000000` and thus we use 'cast'
  505. # here:
  506. if kevent.fflags != 0:
  507. rkey.errorCode = cast[OSErrorCode](kevent.fflags)
  508. else:
  509. # This assumes we are dealing with sockets.
  510. # TODO: For future-proofing it might be a good idea to give the
  511. # user access to the raw `kevent`.
  512. rkey.errorCode = OSErrorCode(ECONNRESET)
  513. rkey.events.incl(Event.Error)
  514. results[k] = rkey
  515. inc(k)
  516. inc(i)
  517. result = k
  518. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  519. result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS)
  520. let count = selectInto(s, timeout, result)
  521. result.setLen(count)
  522. template isEmpty*[T](s: Selector[T]): bool =
  523. (s.count == 0)
  524. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  525. return s.fds[fd.int].ident != InvalidIdent
  526. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  527. let fdi = int(fd)
  528. s.checkFd(fdi)
  529. if fdi in s:
  530. result = s.fds[fdi].data
  531. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  532. let fdi = int(fd)
  533. s.checkFd(fdi)
  534. if fdi in s:
  535. s.fds[fdi].data = data
  536. result = true
  537. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  538. body: untyped) =
  539. mixin checkFd
  540. let fdi = int(fd)
  541. s.checkFd(fdi)
  542. if fdi in s:
  543. var value = addr(s.fds[fdi].data)
  544. body
  545. template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
  546. body2: untyped) =
  547. mixin checkFd
  548. let fdi = int(fd)
  549. s.checkFd(fdi)
  550. if fdi in s:
  551. var value = addr(s.fds[fdi].data)
  552. body1
  553. else:
  554. body2
  555. proc getFd*[T](s: Selector[T]): int =
  556. return s.kqFD.int