asyncdispatch.nim 69 KB


  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous IO. This includes a dispatcher,
  10. ## a ``Future`` type implementation, and an ``async`` macro which allows
  11. ## asynchronous code to be written in a synchronous style with the ``await``
  12. ## keyword.
  13. ##
  14. ## The dispatcher acts as a kind of event loop. You must call ``poll`` on it
  15. ## (or a function which does so for you such as ``waitFor`` or ``runForever``)
  16. ## in order to poll for any outstanding events. The underlying implementation
  17. ## is based on epoll on Linux, IO Completion Ports on Windows and select on
  18. ## other operating systems.
  19. ##
  20. ## The ``poll`` function will not, on its own, return any events. Instead
  21. ## an appropriate ``Future`` object will be completed. A ``Future`` is a
  22. ## type which holds a value which is not yet available, but which *may* be
  23. ## available in the future. You can check whether a future is finished
  24. ## by using the ``finished`` function. When a future is finished it means that
  25. ## either the value that it holds is now available or it holds an error instead.
  26. ## The latter situation occurs when the operation to complete a future fails
  27. ## with an exception. You can distinguish between the two situations with the
  28. ## ``failed`` function.
  29. ##
  30. ## Future objects can also store a callback procedure which will be called
  31. ## automatically once the future completes.
  32. ##
  33. ## Futures therefore can be thought of as an implementation of the proactor
  34. ## pattern. In this
  35. ## pattern you make a request for an action, and once that action is fulfilled
  36. ## a future is completed with the result of that action. Requests can be
  37. ## made by calling the appropriate functions. For example: calling the ``recv``
  38. ## function will create a request for some data to be read from a socket. The
  39. ## future which the ``recv`` function returns will then complete once the
  40. ## requested amount of data is read **or** an exception occurs.
  41. ##
  42. ## Code to read some data from a socket may look something like this:
  43. ##
  44. ## .. code-block::nim
  45. ## var future = socket.recv(100)
  46. ## future.addCallback(
  47. ## proc () =
  48. ## echo(future.read)
  49. ## )
  50. ##
  51. ## All asynchronous functions returning a ``Future`` will not block. They
  52. ## will not however return immediately. An asynchronous function will have
  53. ## code which will be executed before an asynchronous request is made, in most
  54. ## cases this code sets up the request.
  55. ##
  56. ## In the above example, the ``recv`` function will return a brand new
  57. ## ``Future`` instance once the request for data to be read from the socket
  58. ## is made. This ``Future`` instance will complete once the requested amount
  59. ## of data is read, in this case it is 100 bytes. The second line sets a
  60. ## callback on this future which will be called once the future completes.
  61. ## All the callback does is write the data stored in the future to ``stdout``.
  62. ## The ``read`` function is used for this and it checks whether the future
  63. ## completes with an error for you (if it did it will simply raise the
  64. ## error), if there is no error however it returns the value of the future.
  65. ##
  66. ## Asynchronous procedures
  67. ## =======================
  68. ##
  69. ## Asynchronous procedures remove the pain of working with callbacks. They do
  70. ## this by allowing you to write asynchronous code the same way as you would
  71. ## write synchronous code.
  72. ##
  73. ## An asynchronous procedure is marked using the ``{.async.}`` pragma.
  74. ## When marking a procedure with the ``{.async.}`` pragma it must have a
  75. ## ``Future[T]`` return type or no return type at all. If you do not specify
  76. ## a return type then ``Future[void]`` is assumed.
  77. ##
  78. ## Inside asynchronous procedures ``await`` can be used to call any
  79. ## procedures which return a
  80. ## ``Future``; this includes asynchronous procedures. When a procedure is
  81. ## "awaited", the asynchronous procedure it is awaited in will
  82. ## suspend its execution
  83. ## until the awaited procedure's Future completes. At which point the
  84. ## asynchronous procedure will resume its execution. During the period
  85. ## when an asynchronous procedure is suspended other asynchronous procedures
  86. ## will be run by the dispatcher.
  87. ##
  88. ## The ``await`` call may be used in many contexts. It can be used on the right
  89. ## hand side of a variable declaration: ``var data = await socket.recv(100)``,
  90. ## in which case the variable will be set to the value of the future
  91. ## automatically. It can be used to await a ``Future`` object, and it can
  92. ## be used to await a procedure returning a ``Future[void]``:
  93. ## ``await socket.send("foobar")``.
  94. ##
  95. ## If an awaited future completes with an error, then ``await`` will re-raise
  96. ## this error. To avoid this, you can use the ``yield`` keyword instead of
  97. ## ``await``. The following section shows different ways that you can handle
  98. ## exceptions in async procs.
  99. ##
  100. ## Handling Exceptions
  101. ## -------------------
  102. ##
  103. ## The most reliable way to handle exceptions is to use ``yield`` on a future
  104. ## then check the future's ``failed`` property. For example:
  105. ##
  106. ## .. code-block:: Nim
  107. ## var future = sock.recv(100)
  108. ## yield future
  109. ## if future.failed:
  110. ## # Handle exception
  111. ##
  112. ## The ``async`` procedures also offer limited support for the try statement.
  113. ##
  114. ## .. code-block:: Nim
  115. ## try:
  116. ## let data = await sock.recv(100)
  117. ## echo("Received ", data)
  118. ## except:
  119. ## # Handle exception
  120. ##
  121. ## Unfortunately the semantics of the try statement may not always be correct,
  122. ## and occasionally the compilation may fail altogether.
  123. ## As such it is better to use the former style when possible.
  124. ##
  125. ##
  126. ## Discarding futures
  127. ## ==================
  128. ##
  129. ## Futures should **never** be discarded. This is because they may contain
  130. ## errors. If you do not care for the result of a Future then you should
  131. ## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. Note
  132. ## however that this does not wait for completion, and you should use
  133. ## ``waitFor`` for that purpose.
  134. ##
  135. ## Examples
  136. ## ========
  137. ##
  138. ## For examples take a look at the documentation for the modules implementing
  139. ## asynchronous IO. A good place to start is the
  140. ## `asyncnet module <asyncnet.html>`_.
  141. ##
  142. ## Investigating pending futures
  143. ## =============================
  144. ##
  145. ## It's possible to get into a situation where an async proc, or more accurately
  146. ## a ``Future[T]`` gets stuck and
  147. ## never completes. This can happen for various reasons and can cause serious
  148. ## memory leaks. When this occurs it's hard to identify the procedure that is
  149. ## stuck.
  150. ##
  151. ## Thankfully there is a mechanism which tracks the count of each pending future.
  152. ## All you need to do to enable it is compile with ``-d:futureLogging`` and
  153. ## use the ``getFuturesInProgress`` procedure to get the list of pending futures
  154. ## together with the stack traces to the moment of their creation.
  155. ##
  156. ## You may also find it useful to use this
  157. ## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
  158. ## the pending futures into prometheus, allowing you to analyse them via a nice
  159. ## graph.
  160. ##
  161. ##
  162. ##
  163. ## Limitations/Bugs
  164. ## ================
  165. ##
  166. ## * The effect system (``raises: []``) does not work with async procedures.
  167. include "system/inclrtl"
  168. import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
  169. import options, math, std/monotimes
  170. import asyncfutures except callSoon
  171. import nativesockets, net, deques
  172. export Port, SocketFlag
  173. export asyncfutures except callSoon
  174. export asyncstreams
  175. #{.injectStmt: newGcInvariant().}
  176. # TODO: Check if yielded future is nil and throw a more meaningful exception
  177. type
  178. PDispatcherBase = ref object of RootRef
  179. timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
  180. callbacks*: Deque[proc () {.gcsafe.}]
  181. proc processTimers(
  182. p: PDispatcherBase, didSomeWork: var bool
  183. ): Option[int] {.inline.} =
  184. # Pop the timers in the order in which they will expire (smaller `finishAt`).
  185. var count = p.timers.len
  186. let t = getMonoTime()
  187. while count > 0 and t >= p.timers[0].finishAt:
  188. p.timers.pop().fut.complete()
  189. dec count
  190. didSomeWork = true
  191. # Return the number of milliseconds in which the next timer will expire.
  192. if p.timers.len == 0: return
  193. let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
  194. return some(millisecs.int + 1)
  195. proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
  196. while p.callbacks.len > 0:
  197. var cb = p.callbacks.popFirst()
  198. cb()
  199. didSomeWork = true
  200. proc adjustTimeout(
  201. p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
  202. ): int {.inline.} =
  203. if p.callbacks.len != 0:
  204. return 0
  205. if nextTimer.isNone() or pollTimeout == -1:
  206. return pollTimeout
  207. result = max(nextTimer.get(), 0)
  208. result = min(pollTimeout, result)
  209. proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
  210. ## Schedule `cbproc` to be called as soon as possible.
  211. ## The callback is called when control returns to the event loop.
  212. proc initCallSoonProc =
  213. if asyncfutures.getCallSoonProc().isNil:
  214. asyncfutures.setCallSoonProc(callSoon)
  215. when defined(windows) or defined(nimdoc):
  216. import winlean, sets, hashes
  217. type
  218. CompletionKey = ULONG_PTR
  219. CompletionData* = object
  220. fd*: AsyncFD # TODO: Rename this.
  221. cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
  222. errcode: OSErrorCode) {.closure, gcsafe.})
  223. cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
  224. # when using RegisterWaitForSingleObject, because
  225. # waiting is done in different thread.
  226. PDispatcher* = ref object of PDispatcherBase
  227. ioPort: Handle
  228. handles: HashSet[AsyncFD]
  229. CustomOverlapped = object of OVERLAPPED
  230. data*: CompletionData
  231. PCustomOverlapped* = ref CustomOverlapped
  232. AsyncFD* = distinct int
  233. PostCallbackData = object
  234. ioPort: Handle
  235. handleFd: AsyncFD
  236. waitFd: Handle
  237. ovl: owned PCustomOverlapped
  238. PostCallbackDataPtr = ptr PostCallbackData
  239. AsyncEventImpl = object
  240. hEvent: Handle
  241. hWaiter: Handle
  242. pcd: PostCallbackDataPtr
  243. AsyncEvent* = ptr AsyncEventImpl
  244. Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
  245. proc hash(x: AsyncFD): Hash {.borrow.}
  246. proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
  247. proc newDispatcher*(): owned PDispatcher =
  248. ## Creates a new Dispatcher instance.
  249. new result
  250. result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
  251. result.handles = initSet[AsyncFD]()
  252. result.timers.newHeapQueue()
  253. result.callbacks = initDeque[proc ()](64)
  254. var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
  255. proc setGlobalDispatcher*(disp: owned PDispatcher) =
  256. if not gDisp.isNil:
  257. assert gDisp.callbacks.len == 0
  258. gDisp = disp
  259. initCallSoonProc()
  260. proc getGlobalDispatcher*(): PDispatcher =
  261. if gDisp.isNil:
  262. setGlobalDispatcher(newDispatcher())
  263. result = gDisp
  264. proc getIoHandler*(disp: PDispatcher): Handle =
  265. ## Returns the underlying IO Completion Port handle (Windows) or selector
  266. ## (Unix) for the specified dispatcher.
  267. return disp.ioPort
  268. proc register*(fd: AsyncFD) =
  269. ## Registers ``fd`` with the dispatcher.
  270. let p = getGlobalDispatcher()
  271. if createIoCompletionPort(fd.Handle, p.ioPort,
  272. cast[CompletionKey](fd), 1) == 0:
  273. raiseOSError(osLastError())
  274. p.handles.incl(fd)
  275. proc verifyPresence(fd: AsyncFD) =
  276. ## Ensures that file descriptor has been registered with the dispatcher.
  277. ## Raises ValueError if `fd` has not been registered.
  278. let p = getGlobalDispatcher()
  279. if fd notin p.handles:
  280. raise newException(ValueError,
  281. "Operation performed on a socket which has not been registered with" &
  282. " the dispatcher yet.")
  283. proc hasPendingOperations*(): bool =
  284. ## Returns `true` if the global dispatcher has pending operations.
  285. let p = getGlobalDispatcher()
  286. p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
  287. proc runOnce(timeout = 500): bool =
  288. let p = getGlobalDispatcher()
  289. if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
  290. raise newException(ValueError,
  291. "No handles or timers registered in dispatcher.")
  292. result = false
  293. let nextTimer = processTimers(p, result)
  294. let at = adjustTimeout(p, timeout, nextTimer)
  295. var llTimeout =
  296. if at == -1: winlean.INFINITE
  297. else: at.int32
  298. var lpNumberOfBytesTransferred: DWORD
  299. var lpCompletionKey: ULONG_PTR
  300. var customOverlapped: PCustomOverlapped
  301. let res = getQueuedCompletionStatus(p.ioPort,
  302. addr lpNumberOfBytesTransferred, addr lpCompletionKey,
  303. cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
  304. result = true
  305. # http://stackoverflow.com/a/12277264/492186
  306. # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
  307. if res:
  308. # This is useful for ensuring the reliability of the overlapped struct.
  309. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  310. customOverlapped.data.cb(customOverlapped.data.fd,
  311. lpNumberOfBytesTransferred, OSErrorCode(-1))
  312. # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
  313. # so we need to dispose our `cb` environment, because it is not needed
  314. # anymore.
  315. if customOverlapped.data.cell.data != nil:
  316. system.dispose(customOverlapped.data.cell)
  317. GC_unref(customOverlapped)
  318. else:
  319. let errCode = osLastError()
  320. if customOverlapped != nil:
  321. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  322. customOverlapped.data.cb(customOverlapped.data.fd,
  323. lpNumberOfBytesTransferred, errCode)
  324. if customOverlapped.data.cell.data != nil:
  325. system.dispose(customOverlapped.data.cell)
  326. GC_unref(customOverlapped)
  327. else:
  328. if errCode.int32 == WAIT_TIMEOUT:
  329. # Timed out
  330. result = false
  331. else: raiseOSError(errCode)
  332. # Timer processing.
  333. discard processTimers(p, result)
  334. # Callback queue processing
  335. processPendingCallbacks(p, result)
  336. var acceptEx: WSAPROC_ACCEPTEX
  337. var connectEx: WSAPROC_CONNECTEX
  338. var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
  339. proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
  340. # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
  341. var bytesRet: DWORD
  342. fun = nil
  343. result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
  344. sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
  345. addr bytesRet, nil, nil) == 0
  346. proc initAll() =
  347. let dummySock = newNativeSocket()
  348. if dummySock == INVALID_SOCKET:
  349. raiseOSError(osLastError())
  350. var fun: pointer = nil
  351. if not initPointer(dummySock, fun, WSAID_CONNECTEX):
  352. raiseOSError(osLastError())
  353. connectEx = cast[WSAPROC_CONNECTEX](fun)
  354. if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
  355. raiseOSError(osLastError())
  356. acceptEx = cast[WSAPROC_ACCEPTEX](fun)
  357. if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
  358. raiseOSError(osLastError())
  359. getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
  360. close(dummySock)
  361. proc recv*(socket: AsyncFD, size: int,
  362. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
  363. ## Reads **up to** ``size`` bytes from ``socket``. Returned future will
  364. ## complete once all the data requested is read, a part of the data has been
  365. ## read, or the socket has disconnected in which case the future will
  366. ## complete with a value of ``""``.
  367. ##
  368. ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
  369. # Things to note:
  370. # * When WSARecv completes immediately then ``bytesReceived`` is very
  371. # unreliable.
  372. # * Still need to implement message-oriented socket disconnection,
  373. # '\0' in the message currently signifies a socket disconnect. Who
  374. # knows what will happen when someone sends that to our socket.
  375. verifyPresence(socket)
  376. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  377. var retFuture = newFuture[string]("recv")
  378. var dataBuf: TWSABuf
  379. dataBuf.buf = cast[cstring](alloc0(size))
  380. dataBuf.len = size.ULONG
  381. var bytesReceived: DWORD
  382. var flagsio = flags.toOSFlags().DWORD
  383. var ol = PCustomOverlapped()
  384. GC_ref(ol)
  385. ol.data = CompletionData(fd: socket, cb:
  386. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  387. if not retFuture.finished:
  388. if errcode == OSErrorCode(-1):
  389. if bytesCount == 0 and dataBuf.buf[0] == '\0':
  390. retFuture.complete("")
  391. else:
  392. var data = newString(bytesCount)
  393. assert bytesCount <= size
  394. copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
  395. retFuture.complete($data)
  396. else:
  397. if flags.isDisconnectionError(errcode):
  398. retFuture.complete("")
  399. else:
  400. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  401. if dataBuf.buf != nil:
  402. dealloc dataBuf.buf
  403. dataBuf.buf = nil
  404. )
  405. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  406. addr flagsio, cast[POVERLAPPED](ol), nil)
  407. if ret == -1:
  408. let err = osLastError()
  409. if err.int32 != ERROR_IO_PENDING:
  410. if dataBuf.buf != nil:
  411. dealloc dataBuf.buf
  412. dataBuf.buf = nil
  413. GC_unref(ol)
  414. if flags.isDisconnectionError(err):
  415. retFuture.complete("")
  416. else:
  417. retFuture.fail(newException(OSError, osErrorMsg(err)))
  418. elif ret == 0:
  419. # Request completed immediately.
  420. if bytesReceived != 0:
  421. var data = newString(bytesReceived)
  422. assert bytesReceived <= size
  423. copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
  424. retFuture.complete($data)
  425. else:
  426. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  427. retFuture.complete("")
  428. return retFuture
  429. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  430. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  431. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
  432. ## at least be of that size. Returned future will complete once all the
  433. ## data requested is read, a part of the data has been read, or the socket
  434. ## has disconnected in which case the future will complete with a value of
  435. ## ``0``.
  436. ##
  437. ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
  438. # Things to note:
  439. # * When WSARecv completes immediately then ``bytesReceived`` is very
  440. # unreliable.
  441. # * Still need to implement message-oriented socket disconnection,
  442. # '\0' in the message currently signifies a socket disconnect. Who
  443. # knows what will happen when someone sends that to our socket.
  444. verifyPresence(socket)
  445. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  446. var retFuture = newFuture[int]("recvInto")
  447. #buf[] = '\0'
  448. var dataBuf: TWSABuf
  449. dataBuf.buf = cast[cstring](buf)
  450. dataBuf.len = size.ULONG
  451. var bytesReceived: DWORD
  452. var flagsio = flags.toOSFlags().DWORD
  453. var ol = PCustomOverlapped()
  454. GC_ref(ol)
  455. ol.data = CompletionData(fd: socket, cb:
  456. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  457. if not retFuture.finished:
  458. if errcode == OSErrorCode(-1):
  459. retFuture.complete(bytesCount)
  460. else:
  461. if flags.isDisconnectionError(errcode):
  462. retFuture.complete(0)
  463. else:
  464. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  465. if dataBuf.buf != nil:
  466. dataBuf.buf = nil
  467. )
  468. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  469. addr flagsio, cast[POVERLAPPED](ol), nil)
  470. if ret == -1:
  471. let err = osLastError()
  472. if err.int32 != ERROR_IO_PENDING:
  473. if dataBuf.buf != nil:
  474. dataBuf.buf = nil
  475. GC_unref(ol)
  476. if flags.isDisconnectionError(err):
  477. retFuture.complete(0)
  478. else:
  479. retFuture.fail(newException(OSError, osErrorMsg(err)))
  480. elif ret == 0:
  481. # Request completed immediately.
  482. if bytesReceived != 0:
  483. assert bytesReceived <= size
  484. retFuture.complete(bytesReceived)
  485. else:
  486. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  487. retFuture.complete(bytesReceived)
  488. return retFuture
  489. proc send*(socket: AsyncFD, buf: pointer, size: int,
  490. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  491. ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
  492. ## will complete once all data has been sent.
  493. ##
  494. ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
  495. ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
  496. verifyPresence(socket)
  497. var retFuture = newFuture[void]("send")
  498. var dataBuf: TWSABuf
  499. dataBuf.buf = cast[cstring](buf)
  500. dataBuf.len = size.ULONG
  501. var bytesReceived, lowFlags: DWORD
  502. var ol = PCustomOverlapped()
  503. GC_ref(ol)
  504. ol.data = CompletionData(fd: socket, cb:
  505. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  506. if not retFuture.finished:
  507. if errcode == OSErrorCode(-1):
  508. retFuture.complete()
  509. else:
  510. if flags.isDisconnectionError(errcode):
  511. retFuture.complete()
  512. else:
  513. retFuture.fail(newOSError(errcode))
  514. )
  515. let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  516. lowFlags, cast[POVERLAPPED](ol), nil)
  517. if ret == -1:
  518. let err = osLastError()
  519. if err.int32 != ERROR_IO_PENDING:
  520. GC_unref(ol)
  521. if flags.isDisconnectionError(err):
  522. retFuture.complete()
  523. else:
  524. retFuture.fail(newException(OSError, osErrorMsg(err)))
  525. else:
  526. retFuture.complete()
  527. # We don't deallocate ``ol`` here because even though this completed
  528. # immediately poll will still be notified about its completion and it will
  529. # free ``ol``.
  530. return retFuture
  531. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  532. saddrLen: SockLen,
  533. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  534. ## Sends ``data`` to specified destination ``saddr``, using
  535. ## socket ``socket``. The returned future will complete once all data
  536. ## has been sent.
  537. verifyPresence(socket)
  538. var retFuture = newFuture[void]("sendTo")
  539. var dataBuf: TWSABuf
  540. dataBuf.buf = cast[cstring](data)
  541. dataBuf.len = size.ULONG
  542. var bytesSent = 0.DWORD
  543. var lowFlags = 0.DWORD
  544. # we will preserve address in our stack
  545. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  546. var stalen: cint = cint(saddrLen)
  547. zeroMem(addr(staddr[0]), 128)
  548. copyMem(addr(staddr[0]), saddr, saddrLen)
  549. var ol = PCustomOverlapped()
  550. GC_ref(ol)
  551. ol.data = CompletionData(fd: socket, cb:
  552. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  553. if not retFuture.finished:
  554. if errcode == OSErrorCode(-1):
  555. retFuture.complete()
  556. else:
  557. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  558. )
  559. let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
  560. lowFlags, cast[ptr SockAddr](addr(staddr[0])),
  561. stalen, cast[POVERLAPPED](ol), nil)
  562. if ret == -1:
  563. let err = osLastError()
  564. if err.int32 != ERROR_IO_PENDING:
  565. GC_unref(ol)
  566. retFuture.fail(newException(OSError, osErrorMsg(err)))
  567. else:
  568. retFuture.complete()
  569. # We don't deallocate ``ol`` here because even though this completed
  570. # immediately poll will still be notified about its completion and it will
  571. # free ``ol``.
  572. return retFuture
  573. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  574. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  575. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  576. ## Receives a datagram data from ``socket`` into ``buf``, which must
  577. ## be at least of size ``size``, address of datagram's sender will be
  578. ## stored into ``saddr`` and ``saddrLen``. Returned future will complete
  579. ## once one datagram has been received, and will return size of packet
  580. ## received.
  581. verifyPresence(socket)
  582. var retFuture = newFuture[int]("recvFromInto")
  583. var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
  584. var bytesReceived = 0.DWORD
  585. var lowFlags = 0.DWORD
  586. var ol = PCustomOverlapped()
  587. GC_ref(ol)
  588. ol.data = CompletionData(fd: socket, cb:
  589. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  590. if not retFuture.finished:
  591. if errcode == OSErrorCode(-1):
  592. assert bytesCount <= size
  593. retFuture.complete(bytesCount)
  594. else:
  595. # datagram sockets don't have disconnection,
  596. # so we can just raise an exception
  597. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  598. )
  599. let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
  600. addr bytesReceived, addr lowFlags,
  601. saddr, cast[ptr cint](saddrLen),
  602. cast[POVERLAPPED](ol), nil)
  603. if res == -1:
  604. let err = osLastError()
  605. if err.int32 != ERROR_IO_PENDING:
  606. GC_unref(ol)
  607. retFuture.fail(newException(OSError, osErrorMsg(err)))
  608. else:
  609. # Request completed immediately.
  610. if bytesReceived != 0:
  611. assert bytesReceived <= size
  612. retFuture.complete(bytesReceived)
  613. else:
  614. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  615. retFuture.complete(bytesReceived)
  616. return retFuture
  617. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
  618. owned(Future[tuple[address: string, client: AsyncFD]]) =
  619. ## Accepts a new connection. Returns a future containing the client socket
  620. ## corresponding to that connection and the remote address of the client.
  621. ## The future will complete when the connection is successfully accepted.
  622. ##
  623. ## The resulting client socket is automatically registered to the
  624. ## dispatcher.
  625. ##
  626. ## The ``accept`` call may result in an error if the connecting socket
  627. ## disconnects during the duration of the ``accept``. If the ``SafeDisconn``
  628. ## flag is specified then this error will not be raised and instead
  629. ## accept will be called again.
  630. verifyPresence(socket)
  631. var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
  632. var clientSock = newNativeSocket()
  633. if clientSock == osInvalidSocket: raiseOSError(osLastError())
  634. const lpOutputLen = 1024
  635. var lpOutputBuf = newString(lpOutputLen)
  636. var dwBytesReceived: DWORD
  637. let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
  638. let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
  639. let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
  640. template failAccept(errcode) =
  641. if flags.isDisconnectionError(errcode):
  642. var newAcceptFut = acceptAddr(socket, flags)
  643. newAcceptFut.callback =
  644. proc () =
  645. if newAcceptFut.failed:
  646. retFuture.fail(newAcceptFut.readError)
  647. else:
  648. retFuture.complete(newAcceptFut.read)
  649. else:
  650. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  651. template completeAccept() {.dirty.} =
  652. var listenSock = socket
  653. let setoptRet = setsockopt(clientSock, SOL_SOCKET,
  654. SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
  655. sizeof(listenSock).SockLen)
  656. if setoptRet != 0:
  657. let errcode = osLastError()
  658. discard clientSock.closesocket()
  659. failAccept(errcode)
  660. else:
  661. var localSockaddr, remoteSockaddr: ptr SockAddr
  662. var localLen, remoteLen: int32
  663. getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
  664. dwLocalAddressLength, dwRemoteAddressLength,
  665. addr localSockaddr, addr localLen,
  666. addr remoteSockaddr, addr remoteLen)
  667. try:
  668. let address = getAddrString(remoteSockaddr)
  669. register(clientSock.AsyncFD)
  670. retFuture.complete((address: address, client: clientSock.AsyncFD))
  671. except:
  672. # getAddrString may raise
  673. clientSock.close()
  674. retFuture.fail(getCurrentException())
  675. var ol = PCustomOverlapped()
  676. GC_ref(ol)
  677. ol.data = CompletionData(fd: socket, cb:
  678. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  679. if not retFuture.finished:
  680. if errcode == OSErrorCode(-1):
  681. completeAccept()
  682. else:
  683. failAccept(errcode)
  684. )
  685. # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
  686. let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
  687. dwReceiveDataLength,
  688. dwLocalAddressLength,
  689. dwRemoteAddressLength,
  690. addr dwBytesReceived, cast[POVERLAPPED](ol))
  691. if not ret:
  692. let err = osLastError()
  693. if err.int32 != ERROR_IO_PENDING:
  694. failAccept(err)
  695. GC_unref(ol)
  696. else:
  697. completeAccept()
  698. # We don't deallocate ``ol`` here because even though this completed
  699. # immediately poll will still be notified about its completion and it will
  700. # free ``ol``.
  701. return retFuture
  702. proc closeSocket*(socket: AsyncFD) =
  703. ## Closes a socket and ensures that it is unregistered.
  704. socket.SocketHandle.close()
  705. getGlobalDispatcher().handles.excl(socket)
  706. proc unregister*(fd: AsyncFD) =
  707. ## Unregisters ``fd``.
  708. getGlobalDispatcher().handles.excl(fd)
  709. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  710. return fd in disp.handles
  711. {.push stackTrace: off.}
  712. proc waitableCallback(param: pointer,
  713. timerOrWaitFired: WINBOOL): void {.stdcall.} =
  714. var p = cast[PostCallbackDataPtr](param)
  715. discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
  716. ULONG_PTR(p.handleFd),
  717. cast[pointer](p.ovl))
  718. {.pop.}
  719. proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
  720. let p = getGlobalDispatcher()
  721. var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
  722. var hEvent = wsaCreateEvent()
  723. if hEvent == 0:
  724. raiseOSError(osLastError())
  725. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  726. pcd.ioPort = p.ioPort
  727. pcd.handleFd = fd
  728. var ol = PCustomOverlapped()
  729. GC_ref(ol)
  730. ol.data = CompletionData(fd: fd, cb:
  731. proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
  732. # we excluding our `fd` because cb(fd) can register own handler
  733. # for this `fd`
  734. p.handles.excl(fd)
  735. # unregisterWait() is called before callback, because appropriate
  736. # winsockets function can re-enable event.
  737. # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
  738. if unregisterWait(pcd.waitFd) == 0:
  739. let err = osLastError()
  740. if err.int32 != ERROR_IO_PENDING:
  741. deallocShared(cast[pointer](pcd))
  742. discard wsaCloseEvent(hEvent)
  743. raiseOSError(err)
  744. if cb(fd):
  745. # callback returned `true`, so we free all allocated resources
  746. deallocShared(cast[pointer](pcd))
  747. if not wsaCloseEvent(hEvent):
  748. raiseOSError(osLastError())
  749. # pcd.ovl will be unrefed in poll().
  750. else:
  751. # callback returned `false` we need to continue
  752. if p.handles.contains(fd):
  753. # new callback was already registered with `fd`, so we free all
  754. # allocated resources. This happens because in callback `cb`
  755. # addRead/addWrite was called with same `fd`.
  756. deallocShared(cast[pointer](pcd))
  757. if not wsaCloseEvent(hEvent):
  758. raiseOSError(osLastError())
  759. else:
  760. # we need to include `fd` again
  761. p.handles.incl(fd)
  762. # and register WaitForSingleObject again
  763. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  764. cast[WAITORTIMERCALLBACK](waitableCallback),
  765. cast[pointer](pcd), INFINITE, flags):
  766. # pcd.ovl will be unrefed in poll()
  767. let err = osLastError()
  768. deallocShared(cast[pointer](pcd))
  769. discard wsaCloseEvent(hEvent)
  770. raiseOSError(err)
  771. else:
  772. # we incref `pcd.ovl` and `protect` callback one more time,
  773. # because it will be unrefed and disposed in `poll()` after
  774. # callback finishes.
  775. GC_ref(pcd.ovl)
  776. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  777. )
  778. # We need to protect our callback environment value, so GC will not free it
  779. # accidentally.
  780. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  781. # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
  782. # will be signaled when appropriate `mask` events will be triggered.
  783. if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
  784. let err = osLastError()
  785. GC_unref(ol)
  786. deallocShared(cast[pointer](pcd))
  787. discard wsaCloseEvent(hEvent)
  788. raiseOSError(err)
  789. pcd.ovl = ol
  790. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  791. cast[WAITORTIMERCALLBACK](waitableCallback),
  792. cast[pointer](pcd), INFINITE, flags):
  793. let err = osLastError()
  794. GC_unref(ol)
  795. deallocShared(cast[pointer](pcd))
  796. discard wsaCloseEvent(hEvent)
  797. raiseOSError(err)
  798. p.handles.incl(fd)
  799. proc addRead*(fd: AsyncFD, cb: Callback) =
  800. ## Start watching the file descriptor for read availability and then call
  801. ## the callback ``cb``.
  802. ##
  803. ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
  804. ## so if you can avoid it, please do it. Use `addRead` only if really
  805. ## need it (main usecase is adaptation of unix-like libraries to be
  806. ## asynchronous on Windows).
  807. ##
  808. ## If you use this function, you don't need to use asyncdispatch.recv()
  809. ## or asyncdispatch.accept(), because they are using IOCP, please use
  810. ## nativesockets.recv() and nativesockets.accept() instead.
  811. ##
  812. ## Be sure your callback ``cb`` returns ``true``, if you want to remove
  813. ## watch of `read` notifications, and ``false``, if you want to continue
  814. ## receiving notifications.
  815. registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
  816. proc addWrite*(fd: AsyncFD, cb: Callback) =
  817. ## Start watching the file descriptor for write availability and then call
  818. ## the callback ``cb``.
  819. ##
  820. ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
  821. ## so if you can avoid it, please do it. Use `addWrite` only if really
  822. ## need it (main usecase is adaptation of unix-like libraries to be
  823. ## asynchronous on Windows).
  824. ##
  825. ## If you use this function, you don't need to use asyncdispatch.send()
  826. ## or asyncdispatch.connect(), because they are using IOCP, please use
  827. ## nativesockets.send() and nativesockets.connect() instead.
  828. ##
  829. ## Be sure your callback ``cb`` returns ``true``, if you want to remove
  830. ## watch of `write` notifications, and ``false``, if you want to continue
  831. ## receiving notifications.
  832. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
  833. template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
  834. handleCallback) =
  835. let handleFD = AsyncFD(hEvent)
  836. pcd.ioPort = p.ioPort
  837. pcd.handleFd = handleFD
  838. var ol = PCustomOverlapped()
  839. GC_ref(ol)
  840. ol.data.fd = handleFD
  841. ol.data.cb = handleCallback
  842. # We need to protect our callback environment value, so GC will not free it
  843. # accidentally.
  844. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  845. pcd.ovl = ol
  846. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  847. cast[WAITORTIMERCALLBACK](waitableCallback),
  848. cast[pointer](pcd), timeout.DWORD, flags):
  849. let err = osLastError()
  850. GC_unref(ol)
  851. deallocShared(cast[pointer](pcd))
  852. discard closeHandle(hEvent)
  853. raiseOSError(err)
  854. p.handles.incl(handleFD)
  855. template closeWaitable(handle: untyped) =
  856. let waitFd = pcd.waitFd
  857. deallocShared(cast[pointer](pcd))
  858. p.handles.excl(fd)
  859. if unregisterWait(waitFd) == 0:
  860. let err = osLastError()
  861. if err.int32 != ERROR_IO_PENDING:
  862. discard closeHandle(handle)
  863. raiseOSError(err)
  864. if closeHandle(handle) == 0:
  865. raiseOSError(osLastError())
  866. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  867. ## Registers callback ``cb`` to be called when timer expired.
  868. ##
  869. ## Parameters:
  870. ##
  871. ## * ``timeout`` - timeout value in milliseconds.
  872. ## * ``oneshot``
  873. ## * `true` - generate only one timeout event
  874. ## * `false` - generate timeout events periodically
  875. doAssert(timeout > 0)
  876. let p = getGlobalDispatcher()
  877. var hEvent = createEvent(nil, 1, 0, nil)
  878. if hEvent == INVALID_HANDLE_VALUE:
  879. raiseOSError(osLastError())
  880. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  881. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  882. if oneshot: flags = flags or WT_EXECUTEONLYONCE
  883. proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  884. let res = cb(fd)
  885. if res or oneshot:
  886. closeWaitable(hEvent)
  887. else:
  888. # if callback returned `false`, then it wants to be called again, so
  889. # we need to ref and protect `pcd.ovl` again, because it will be
  890. # unrefed and disposed in `poll()`.
  891. GC_ref(pcd.ovl)
  892. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  893. registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
  894. proc addProcess*(pid: int, cb: Callback) =
  895. ## Registers callback ``cb`` to be called when process with process ID
  896. ## ``pid`` exited.
  897. const NULL = Handle(0)
  898. let p = getGlobalDispatcher()
  899. let procFlags = SYNCHRONIZE
  900. var hProcess = openProcess(procFlags, 0, pid.DWORD)
  901. if hProcess == NULL:
  902. raiseOSError(osLastError())
  903. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  904. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  905. proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  906. closeWaitable(hProcess)
  907. discard cb(fd)
  908. registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
  909. proc newAsyncEvent*(): AsyncEvent =
  910. ## Creates a new thread-safe ``AsyncEvent`` object.
  911. ##
  912. ## New ``AsyncEvent`` object is not automatically registered with
  913. ## dispatcher like ``AsyncSocket``.
  914. var sa = SECURITY_ATTRIBUTES(
  915. nLength: sizeof(SECURITY_ATTRIBUTES).cint,
  916. bInheritHandle: 1
  917. )
  918. var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
  919. if event == INVALID_HANDLE_VALUE:
  920. raiseOSError(osLastError())
  921. result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
  922. result.hEvent = event
  923. proc trigger*(ev: AsyncEvent) =
  924. ## Set event ``ev`` to signaled state.
  925. if setEvent(ev.hEvent) == 0:
  926. raiseOSError(osLastError())
  927. proc unregister*(ev: AsyncEvent) =
  928. ## Unregisters event ``ev``.
  929. doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
  930. let p = getGlobalDispatcher()
  931. p.handles.excl(AsyncFD(ev.hEvent))
  932. if unregisterWait(ev.hWaiter) == 0:
  933. let err = osLastError()
  934. if err.int32 != ERROR_IO_PENDING:
  935. raiseOSError(err)
  936. ev.hWaiter = 0
  937. proc close*(ev: AsyncEvent) =
  938. ## Closes event ``ev``.
  939. let res = closeHandle(ev.hEvent)
  940. deallocShared(cast[pointer](ev))
  941. if res == 0:
  942. raiseOSError(osLastError())
  943. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  944. ## Registers callback ``cb`` to be called when ``ev`` will be signaled
  945. doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
  946. let p = getGlobalDispatcher()
  947. let hEvent = ev.hEvent
  948. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  949. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  950. proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  951. if ev.hWaiter != 0:
  952. if cb(fd):
  953. # we need this check to avoid exception, if `unregister(event)` was
  954. # called in callback.
  955. deallocShared(cast[pointer](pcd))
  956. if ev.hWaiter != 0:
  957. unregister(ev)
  958. else:
  959. # if callback returned `false`, then it wants to be called again, so
  960. # we need to ref and protect `pcd.ovl` again, because it will be
  961. # unrefed and disposed in `poll()`.
  962. GC_ref(pcd.ovl)
  963. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  964. else:
  965. # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
  966. deallocShared(cast[pointer](pcd))
  967. registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
  968. ev.hWaiter = pcd.waitFd
  969. initAll()
  970. else:
  971. import selectors
  972. from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
  973. MSG_NOSIGNAL
  974. const
  975. InitCallbackListSize = 4 # initial size of callbacks sequence,
  976. # associated with file/socket descriptor.
  977. InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
  978. # queue.
  979. type
  980. AsyncFD* = distinct cint
  981. Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
  982. AsyncData = object
  983. readList: seq[Callback]
  984. writeList: seq[Callback]
  985. AsyncEvent* = distinct SelectEvent
  986. PDispatcher* = ref object of PDispatcherBase
  987. selector: Selector[AsyncData]
  988. proc `==`*(x, y: AsyncFD): bool {.borrow.}
  989. proc `==`*(x, y: AsyncEvent): bool {.borrow.}
  990. template newAsyncData(): AsyncData =
  991. AsyncData(
  992. readList: newSeqOfCap[Callback](InitCallbackListSize),
  993. writeList: newSeqOfCap[Callback](InitCallbackListSize)
  994. )
  995. proc newDispatcher*(): owned(PDispatcher) =
  996. new result
  997. result.selector = newSelector[AsyncData]()
  998. result.timers.newHeapQueue()
  999. result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
  1000. var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
  1001. proc setGlobalDispatcher*(disp: owned PDispatcher) =
  1002. if not gDisp.isNil:
  1003. assert gDisp.callbacks.len == 0
  1004. gDisp = disp
  1005. initCallSoonProc()
  1006. proc getGlobalDispatcher*(): PDispatcher =
  1007. if gDisp.isNil:
  1008. setGlobalDispatcher(newDispatcher())
  1009. result = gDisp
  1010. proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
  1011. return disp.selector
  1012. proc register*(fd: AsyncFD) =
  1013. let p = getGlobalDispatcher()
  1014. var data = newAsyncData()
  1015. p.selector.registerHandle(fd.SocketHandle, {}, data)
  1016. proc unregister*(fd: AsyncFD) =
  1017. getGlobalDispatcher().selector.unregister(fd.SocketHandle)
  1018. proc unregister*(ev: AsyncEvent) =
  1019. getGlobalDispatcher().selector.unregister(SelectEvent(ev))
  1020. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  1021. return fd.SocketHandle in disp.selector
  1022. proc addRead*(fd: AsyncFD, cb: Callback) =
  1023. let p = getGlobalDispatcher()
  1024. var newEvents = {Event.Read}
  1025. withData(p.selector, fd.SocketHandle, adata) do:
  1026. adata.readList.add(cb)
  1027. newEvents.incl(Event.Read)
  1028. if len(adata.writeList) != 0: newEvents.incl(Event.Write)
  1029. do:
  1030. raise newException(ValueError, "File descriptor not registered.")
  1031. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1032. proc addWrite*(fd: AsyncFD, cb: Callback) =
  1033. let p = getGlobalDispatcher()
  1034. var newEvents = {Event.Write}
  1035. withData(p.selector, fd.SocketHandle, adata) do:
  1036. adata.writeList.add(cb)
  1037. newEvents.incl(Event.Write)
  1038. if len(adata.readList) != 0: newEvents.incl(Event.Read)
  1039. do:
  1040. raise newException(ValueError, "File descriptor not registered.")
  1041. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1042. proc hasPendingOperations*(): bool =
  1043. let p = getGlobalDispatcher()
  1044. not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
  1045. proc processBasicCallbacks(
  1046. fd: AsyncFD, event: Event
  1047. ): tuple[readCbListCount, writeCbListCount: int] =
  1048. # Process pending descriptor and AsyncEvent callbacks.
  1049. #
  1050. # Invoke every callback stored in `rwlist`, until one
  1051. # returns `false` (which means callback wants to stay
  1052. # alive). In such case all remaining callbacks will be added
  1053. # to `rwlist` again, in the order they have been inserted.
  1054. #
  1055. # `rwlist` associated with file descriptor MUST BE emptied before
  1056. # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
  1057. # or it can be possible to fall into endless cycle.
  1058. var curList: seq[Callback]
  1059. let selector = getGlobalDispatcher().selector
  1060. withData(selector, fd.int, fdData):
  1061. case event
  1062. of Event.Read:
  1063. shallowCopy(curList, fdData.readList)
  1064. fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1065. of Event.Write:
  1066. shallowCopy(curList, fdData.writeList)
  1067. fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
  1068. else:
  1069. assert false, "Cannot process callbacks for " & $event
  1070. let newLength = max(len(curList), InitCallbackListSize)
  1071. var newList = newSeqOfCap[Callback](newLength)
  1072. var eventsExtinguished = false
  1073. for cb in curList:
  1074. if eventsExtinguished:
  1075. newList.add(cb)
  1076. continue
  1077. if not cb(fd):
  1078. # Callback wants to be called again.
  1079. newList.add(cb)
  1080. # This callback has returned with EAGAIN, so we don't need to
  1081. # call any other callbacks as they are all waiting for the same event
  1082. # on the same fd.
  1083. # We do need to ensure they are called again though.
  1084. eventsExtinguished = true
  1085. withData(selector, fd.int, fdData) do:
  1086. # Descriptor is still present in the queue.
  1087. case event
  1088. of Event.Read:
  1089. fdData.readList = newList & fdData.readList
  1090. of Event.Write:
  1091. fdData.writeList = newList & fdData.writeList
  1092. else:
  1093. assert false, "Cannot process callbacks for " & $event
  1094. result.readCbListCount = len(fdData.readList)
  1095. result.writeCbListCount = len(fdData.writeList)
  1096. do:
  1097. # Descriptor was unregistered in callback via `unregister()`.
  1098. result.readCbListCount = -1
  1099. result.writeCbListCount = -1
  1100. template processCustomCallbacks(ident: untyped) =
  1101. # Process pending custom event callbacks. Custom events are
  1102. # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
  1103. # There can be only one callback registered with one descriptor,
  1104. # so there is no need to iterate over list.
  1105. var curList: seq[Callback]
  1106. withData(p.selector, ident.int, adata) do:
  1107. shallowCopy(curList, adata.readList)
  1108. adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1109. let newLength = len(curList)
  1110. var newList = newSeqOfCap[Callback](newLength)
  1111. var cb = curList[0]
  1112. if not cb(fd.AsyncFD):
  1113. newList.add(cb)
  1114. withData(p.selector, ident.int, adata) do:
  1115. # descriptor still present in queue.
  1116. adata.readList = newList & adata.readList
  1117. if len(adata.readList) == 0:
  1118. # if no callbacks registered with descriptor, unregister it.
  1119. p.selector.unregister(fd.int)
  1120. do:
  1121. # descriptor was unregistered in callback via `unregister()`.
  1122. discard
  1123. proc closeSocket*(sock: AsyncFD) =
  1124. let selector = getGlobalDispatcher().selector
  1125. if sock.SocketHandle notin selector:
  1126. raise newException(ValueError, "File descriptor not registered.")
  1127. let data = selector.getData(sock.SocketHandle)
  1128. sock.unregister()
  1129. sock.SocketHandle.close()
  1130. # We need to unblock the read and write callbacks which could still be
  1131. # waiting for the socket to become readable and/or writeable.
  1132. for cb in data.readList & data.writeList:
  1133. if not cb(sock):
  1134. raise newException(
  1135. ValueError, "Expecting async operations to stop when fd has closed."
  1136. )
  1137. proc runOnce(timeout = 500): bool =
  1138. let p = getGlobalDispatcher()
  1139. when ioselSupportedPlatform:
  1140. let customSet = {Event.Timer, Event.Signal, Event.Process,
  1141. Event.Vnode}
  1142. if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
  1143. raise newException(ValueError,
  1144. "No handles or timers registered in dispatcher.")
  1145. result = false
  1146. var keys: array[64, ReadyKey]
  1147. let nextTimer = processTimers(p, result)
  1148. var count =
  1149. p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
  1150. for i in 0..<count:
  1151. let fd = keys[i].fd.AsyncFD
  1152. let events = keys[i].events
  1153. var (readCbListCount, writeCbListCount) = (0, 0)
  1154. if Event.Read in events or events == {Event.Error}:
  1155. (readCbListCount, writeCbListCount) =
  1156. processBasicCallbacks(fd, Event.Read)
  1157. result = true
  1158. if Event.Write in events or events == {Event.Error}:
  1159. (readCbListCount, writeCbListCount) =
  1160. processBasicCallbacks(fd, Event.Write)
  1161. result = true
  1162. var isCustomEvent = false
  1163. if Event.User in events:
  1164. (readCbListCount, writeCbListCount) =
  1165. processBasicCallbacks(fd, Event.Read)
  1166. isCustomEvent = true
  1167. if readCbListCount == 0:
  1168. p.selector.unregister(fd.int)
  1169. result = true
  1170. when ioselSupportedPlatform:
  1171. if (customSet * events) != {}:
  1172. isCustomEvent = true
  1173. processCustomCallbacks(fd)
  1174. result = true
  1175. # because state `data` can be modified in callback we need to update
  1176. # descriptor events with currently registered callbacks.
  1177. if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
  1178. var newEvents: set[Event] = {}
  1179. if readCbListCount > 0: incl(newEvents, Event.Read)
  1180. if writeCbListCount > 0: incl(newEvents, Event.Write)
  1181. p.selector.updateHandle(SocketHandle(fd), newEvents)
  1182. # Timer processing.
  1183. discard processTimers(p, result)
  1184. # Callback queue processing
  1185. processPendingCallbacks(p, result)
  1186. proc recv*(socket: AsyncFD, size: int,
  1187. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
  1188. var retFuture = newFuture[string]("recv")
  1189. var readBuffer = newString(size)
  1190. proc cb(sock: AsyncFD): bool =
  1191. result = true
  1192. let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
  1193. flags.toOSFlags())
  1194. if res < 0:
  1195. let lastError = osLastError()
  1196. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1197. if flags.isDisconnectionError(lastError):
  1198. retFuture.complete("")
  1199. else:
  1200. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1201. else:
  1202. result = false # We still want this callback to be called.
  1203. elif res == 0:
  1204. # Disconnected
  1205. retFuture.complete("")
  1206. else:
  1207. readBuffer.setLen(res)
  1208. retFuture.complete(readBuffer)
  1209. # TODO: The following causes a massive slowdown.
  1210. #if not cb(socket):
  1211. addRead(socket, cb)
  1212. return retFuture
  1213. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  1214. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  1215. var retFuture = newFuture[int]("recvInto")
  1216. proc cb(sock: AsyncFD): bool =
  1217. result = true
  1218. let res = recv(sock.SocketHandle, buf, size.cint,
  1219. flags.toOSFlags())
  1220. if res < 0:
  1221. let lastError = osLastError()
  1222. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1223. if flags.isDisconnectionError(lastError):
  1224. retFuture.complete(0)
  1225. else:
  1226. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1227. else:
  1228. result = false # We still want this callback to be called.
  1229. else:
  1230. retFuture.complete(res)
  1231. # TODO: The following causes a massive slowdown.
  1232. #if not cb(socket):
  1233. addRead(socket, cb)
  1234. return retFuture
  1235. proc send*(socket: AsyncFD, buf: pointer, size: int,
  1236. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1237. var retFuture = newFuture[void]("send")
  1238. var written = 0
  1239. proc cb(sock: AsyncFD): bool =
  1240. result = true
  1241. let netSize = size-written
  1242. var d = cast[cstring](buf)
  1243. let res = send(sock.SocketHandle, addr d[written], netSize.cint,
  1244. MSG_NOSIGNAL)
  1245. if res < 0:
  1246. let lastError = osLastError()
  1247. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1248. if flags.isDisconnectionError(lastError):
  1249. retFuture.complete()
  1250. else:
  1251. retFuture.fail(newOSError(lastError))
  1252. else:
  1253. result = false # We still want this callback to be called.
  1254. else:
  1255. written.inc(res)
  1256. if res != netSize:
  1257. result = false # We still have data to send.
  1258. else:
  1259. retFuture.complete()
  1260. # TODO: The following causes crashes.
  1261. #if not cb(socket):
  1262. addWrite(socket, cb)
  1263. return retFuture
  1264. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  1265. saddrLen: SockLen,
  1266. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1267. ## Sends ``data`` of size ``size`` in bytes to specified destination
  1268. ## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``.
  1269. ## The returned future will complete once all data has been sent.
  1270. var retFuture = newFuture[void]("sendTo")
  1271. # we will preserve address in our stack
  1272. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  1273. var stalen = saddrLen
  1274. zeroMem(addr(staddr[0]), 128)
  1275. copyMem(addr(staddr[0]), saddr, saddrLen)
  1276. proc cb(sock: AsyncFD): bool =
  1277. result = true
  1278. let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
  1279. cast[ptr SockAddr](addr(staddr[0])), stalen)
  1280. if res < 0:
  1281. let lastError = osLastError()
  1282. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1283. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1284. else:
  1285. result = false # We still want this callback to be called.
  1286. else:
  1287. retFuture.complete()
  1288. addWrite(socket, cb)
  1289. return retFuture
  1290. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  1291. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  1292. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  1293. ## Receives a datagram data from ``socket`` into ``data``, which must
  1294. ## be at least of size ``size`` in bytes, address of datagram's sender
  1295. ## will be stored into ``saddr`` and ``saddrLen``. Returned future will
  1296. ## complete once one datagram has been received, and will return size
  1297. ## of packet received.
  1298. var retFuture = newFuture[int]("recvFromInto")
  1299. proc cb(sock: AsyncFD): bool =
  1300. result = true
  1301. let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
  1302. saddr, saddrLen)
  1303. if res < 0:
  1304. let lastError = osLastError()
  1305. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1306. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1307. else:
  1308. result = false
  1309. else:
  1310. retFuture.complete(res)
  1311. addRead(socket, cb)
  1312. return retFuture
  1313. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
  1314. owned(Future[tuple[address: string, client: AsyncFD]]) =
  1315. var retFuture = newFuture[tuple[address: string,
  1316. client: AsyncFD]]("acceptAddr")
  1317. proc cb(sock: AsyncFD): bool =
  1318. result = true
  1319. var sockAddress: Sockaddr_storage
  1320. var addrLen = sizeof(sockAddress).SockLen
  1321. var client = accept(sock.SocketHandle,
  1322. cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
  1323. if client == osInvalidSocket:
  1324. let lastError = osLastError()
  1325. assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
  1326. if lastError.int32 == EINTR:
  1327. return false
  1328. else:
  1329. if flags.isDisconnectionError(lastError):
  1330. return false
  1331. else:
  1332. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1333. else:
  1334. try:
  1335. let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
  1336. register(client.AsyncFD)
  1337. retFuture.complete((address, client.AsyncFD))
  1338. except:
  1339. # getAddrString may raise
  1340. client.close()
  1341. retFuture.fail(getCurrentException())
  1342. addRead(socket, cb)
  1343. return retFuture
  1344. when ioselSupportedPlatform:
  1345. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  1346. ## Start watching for timeout expiration, and then call the
  1347. ## callback ``cb``.
  1348. ## ``timeout`` - time in milliseconds,
  1349. ## ``oneshot`` - if ``true`` only one event will be dispatched,
  1350. ## if ``false`` continuous events every ``timeout`` milliseconds.
  1351. let p = getGlobalDispatcher()
  1352. var data = newAsyncData()
  1353. data.readList.add(cb)
  1354. p.selector.registerTimer(timeout, oneshot, data)
  1355. proc addSignal*(signal: int, cb: Callback) =
  1356. ## Start watching signal ``signal``, and when signal appears, call the
  1357. ## callback ``cb``.
  1358. let p = getGlobalDispatcher()
  1359. var data = newAsyncData()
  1360. data.readList.add(cb)
  1361. p.selector.registerSignal(signal, data)
  1362. proc addProcess*(pid: int, cb: Callback) =
  1363. ## Start watching for process exit with pid ``pid``, and then call
  1364. ## the callback ``cb``.
  1365. let p = getGlobalDispatcher()
  1366. var data = newAsyncData()
  1367. data.readList.add(cb)
  1368. p.selector.registerProcess(pid, data)
  1369. proc newAsyncEvent*(): AsyncEvent =
  1370. ## Creates new ``AsyncEvent``.
  1371. result = AsyncEvent(newSelectEvent())
  1372. proc trigger*(ev: AsyncEvent) =
  1373. ## Sets new ``AsyncEvent`` to signaled state.
  1374. trigger(SelectEvent(ev))
  1375. proc close*(ev: AsyncEvent) =
  1376. ## Closes ``AsyncEvent``
  1377. close(SelectEvent(ev))
  1378. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  1379. ## Start watching for event ``ev``, and call callback ``cb``, when
  1380. ## ev will be set to signaled state.
  1381. let p = getGlobalDispatcher()
  1382. var data = newAsyncData()
  1383. data.readList.add(cb)
  1384. p.selector.registerEvent(SelectEvent(ev), data)
  1385. proc drain*(timeout = 500) =
  1386. ## Waits for completion events and processes them. Raises ``ValueError``
  1387. ## if there are no pending operations. In contrast to ``poll`` this
  1388. ## processes as many events as are available.
  1389. if runOnce(timeout):
  1390. while hasPendingOperations() and runOnce(0): discard
  1391. proc poll*(timeout = 500) =
  1392. ## Waits for completion events and processes them. Raises ``ValueError``
  1393. ## if there are no pending operations. This runs the underlying OS
  1394. ## `epoll`:idx: or `kqueue`:idx: primitive only once.
  1395. discard runOnce(timeout)
  1396. template createAsyncNativeSocketImpl(domain, sockType, protocol) =
  1397. let handle = newNativeSocket(domain, sockType, protocol)
  1398. if handle == osInvalidSocket:
  1399. return osInvalidSocket.AsyncFD
  1400. handle.setBlocking(false)
  1401. when defined(macosx) and not defined(nimdoc):
  1402. handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
  1403. result = handle.AsyncFD
  1404. register(result)
  1405. proc createAsyncNativeSocket*(domain: cint, sockType: cint,
  1406. protocol: cint): AsyncFD =
  1407. createAsyncNativeSocketImpl(domain, sockType, protocol)
  1408. proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
  1409. sockType: SockType = SOCK_STREAM,
  1410. protocol: Protocol = IPPROTO_TCP): AsyncFD =
  1411. createAsyncNativeSocketImpl(domain, sockType, protocol)
  1412. proc newAsyncNativeSocket*(domain: cint, sockType: cint,
  1413. protocol: cint): AsyncFD {.deprecated: "use createAsyncNativeSocket instead".} =
  1414. createAsyncNativeSocketImpl(domain, sockType, protocol)
  1415. proc newAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
  1416. sockType: SockType = SOCK_STREAM,
  1417. protocol: Protocol = IPPROTO_TCP): AsyncFD
  1418. {.deprecated: "use createAsyncNativeSocket instead".} =
  1419. createAsyncNativeSocketImpl(domain, sockType, protocol)
  1420. when defined(windows) or defined(nimdoc):
  1421. proc bindToDomain(handle: SocketHandle, domain: Domain) =
  1422. # Extracted into a separate proc, because connect() on Windows requires
  1423. # the socket to be initially bound.
  1424. template doBind(saddr) =
  1425. if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
  1426. sizeof(saddr).SockLen) < 0'i32:
  1427. raiseOSError(osLastError())
  1428. if domain == Domain.AF_INET6:
  1429. var saddr: Sockaddr_in6
  1430. saddr.sin6_family = uint16(toInt(domain))
  1431. doBind(saddr)
  1432. else:
  1433. var saddr: Sockaddr_in
  1434. saddr.sin_family = uint16(toInt(domain))
  1435. doBind(saddr)
  1436. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
  1437. let retFuture = newFuture[void]("doConnect")
  1438. result = retFuture
  1439. var ol = PCustomOverlapped()
  1440. GC_ref(ol)
  1441. ol.data = CompletionData(fd: socket, cb:
  1442. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  1443. if not retFuture.finished:
  1444. if errcode == OSErrorCode(-1):
  1445. retFuture.complete()
  1446. else:
  1447. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  1448. )
  1449. let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
  1450. cint(addrInfo.ai_addrlen), nil, 0, nil,
  1451. cast[POVERLAPPED](ol))
  1452. if ret:
  1453. # Request to connect completed immediately.
  1454. retFuture.complete()
  1455. # We don't deallocate ``ol`` here because even though this completed
  1456. # immediately poll will still be notified about its completion and it
  1457. # will free ``ol``.
  1458. else:
  1459. let lastError = osLastError()
  1460. if lastError.int32 != ERROR_IO_PENDING:
  1461. # With ERROR_IO_PENDING ``ol`` will be deallocated in ``poll``,
  1462. # and the future will be completed/failed there, too.
  1463. GC_unref(ol)
  1464. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1465. else:
  1466. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
  1467. let retFuture = newFuture[void]("doConnect")
  1468. result = retFuture
  1469. proc cb(fd: AsyncFD): bool =
  1470. let ret = SocketHandle(fd).getSockOptInt(
  1471. cint(SOL_SOCKET), cint(SO_ERROR))
  1472. if ret == 0:
  1473. # We have connected.
  1474. retFuture.complete()
  1475. return true
  1476. elif ret == EINTR:
  1477. # interrupted, keep waiting
  1478. return false
  1479. else:
  1480. retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
  1481. return true
  1482. let ret = connect(socket.SocketHandle,
  1483. addrInfo.ai_addr,
  1484. addrInfo.ai_addrlen.SockLen)
  1485. if ret == 0:
  1486. # Request to connect completed immediately.
  1487. retFuture.complete()
  1488. else:
  1489. let lastError = osLastError()
  1490. if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
  1491. addWrite(socket, cb)
  1492. else:
  1493. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1494. template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
  1495. protocol: Protocol = IPPROTO_RAW) =
  1496. ## Iterates through the AddrInfo linked list asynchronously
  1497. ## until the connection can be established.
  1498. const shouldCreateFd = not declared(fd)
  1499. when shouldCreateFd:
  1500. let sockType = protocol.toSockType()
  1501. var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
  1502. for i in low(fdPerDomain)..high(fdPerDomain):
  1503. fdPerDomain[i] = osInvalidSocket.AsyncFD
  1504. template closeUnusedFds(domainToKeep = -1) {.dirty.} =
  1505. for i, fd in fdPerDomain:
  1506. if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
  1507. fd.closeSocket()
  1508. var lastException: ref Exception
  1509. var curAddrInfo = addrInfo
  1510. var domain: Domain
  1511. when shouldCreateFd:
  1512. var curFd: AsyncFD
  1513. else:
  1514. var curFd = fd
  1515. proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
  1516. if fut == nil or fut.failed:
  1517. if fut != nil:
  1518. lastException = fut.readError()
  1519. while curAddrInfo != nil:
  1520. let domainOpt = curAddrInfo.ai_family.toKnownDomain()
  1521. if domainOpt.isSome:
  1522. domain = domainOpt.unsafeGet()
  1523. break
  1524. curAddrInfo = curAddrInfo.ai_next
  1525. if curAddrInfo == nil:
  1526. freeaddrinfo(addrInfo)
  1527. when shouldCreateFd:
  1528. closeUnusedFds()
  1529. if lastException != nil:
  1530. retFuture.fail(lastException)
  1531. else:
  1532. retFuture.fail(newException(
  1533. IOError, "Couldn't resolve address: " & address))
  1534. return
  1535. when shouldCreateFd:
  1536. curFd = fdPerDomain[ord(domain)]
  1537. if curFd == osInvalidSocket.AsyncFD:
  1538. try:
  1539. curFd = createAsyncNativeSocket(domain, sockType, protocol)
  1540. except:
  1541. freeaddrinfo(addrInfo)
  1542. closeUnusedFds()
  1543. raise getCurrentException()
  1544. when defined(windows):
  1545. curFd.SocketHandle.bindToDomain(domain)
  1546. fdPerDomain[ord(domain)] = curFd
  1547. doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
  1548. curAddrInfo = curAddrInfo.ai_next
  1549. else:
  1550. freeaddrinfo(addrInfo)
  1551. when shouldCreateFd:
  1552. closeUnusedFds(ord(domain))
  1553. retFuture.complete(curFd)
  1554. else:
  1555. retFuture.complete()
  1556. tryNextAddrInfo(nil)
  1557. proc dial*(address: string, port: Port,
  1558. protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
  1559. ## Establishes connection to the specified ``address``:``port`` pair via the
  1560. ## specified protocol. The procedure iterates through possible
  1561. ## resolutions of the ``address`` until it succeeds, meaning that it
  1562. ## seamlessly works with both IPv4 and IPv6.
  1563. ## Returns the async file descriptor, registered in the dispatcher of
  1564. ## the current thread, ready to send or receive data.
  1565. let retFuture = newFuture[AsyncFD]("dial")
  1566. result = retFuture
  1567. let sockType = protocol.toSockType()
  1568. let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
  1569. asyncAddrInfoLoop(aiList, noFD, protocol)
  1570. proc connect*(socket: AsyncFD, address: string, port: Port,
  1571. domain = Domain.AF_INET): owned(Future[void]) =
  1572. let retFuture = newFuture[void]("connect")
  1573. result = retFuture
  1574. when defined(windows):
  1575. verifyPresence(socket)
  1576. else:
  1577. assert getSockDomain(socket.SocketHandle) == domain
  1578. let aiList = getAddrInfo(address, port, domain)
  1579. when defined(windows):
  1580. socket.SocketHandle.bindToDomain(domain)
  1581. asyncAddrInfoLoop(aiList, socket)
  1582. proc sleepAsync*(ms: int | float): owned(Future[void]) =
  1583. ## Suspends the execution of the current async procedure for the next
  1584. ## ``ms`` milliseconds.
  1585. var retFuture = newFuture[void]("sleepAsync")
  1586. let p = getGlobalDispatcher()
  1587. when ms is int:
  1588. p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
  1589. elif ms is float:
  1590. let ns = (ms * 1_000_000).int64
  1591. p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
  1592. return retFuture
  1593. proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
  1594. ## Returns a future which will complete once ``fut`` completes or after
  1595. ## ``timeout`` milliseconds has elapsed.
  1596. ##
  1597. ## If ``fut`` completes first the returned future will hold true,
  1598. ## otherwise, if ``timeout`` milliseconds has elapsed first, the returned
  1599. ## future will hold false.
  1600. var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
  1601. var timeoutFuture = sleepAsync(timeout)
  1602. fut.callback =
  1603. proc () =
  1604. if not retFuture.finished:
  1605. if fut.failed:
  1606. retFuture.fail(fut.error)
  1607. else:
  1608. retFuture.complete(true)
  1609. timeoutFuture.callback =
  1610. proc () =
  1611. if not retFuture.finished: retFuture.complete(false)
  1612. return retFuture
  1613. proc accept*(socket: AsyncFD,
  1614. flags = {SocketFlag.SafeDisconn}): owned(Future[AsyncFD]) =
  1615. ## Accepts a new connection. Returns a future containing the client socket
  1616. ## corresponding to that connection.
  1617. ## The future will complete when the connection is successfully accepted.
  1618. var retFut = newFuture[AsyncFD]("accept")
  1619. var fut = acceptAddr(socket, flags)
  1620. fut.callback =
  1621. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  1622. assert future.finished
  1623. if future.failed:
  1624. retFut.fail(future.error)
  1625. else:
  1626. retFut.complete(future.read.client)
  1627. return retFut
  1628. proc send*(socket: AsyncFD, data: string,
  1629. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1630. ## Sends ``data`` to ``socket``. The returned future will complete once all
  1631. ## data has been sent.
  1632. var retFuture = newFuture[void]("send")
  1633. var copiedData = data
  1634. GC_ref(copiedData) # we need to protect data until send operation is completed
  1635. # or failed.
  1636. let sendFut = socket.send(addr copiedData[0], data.len, flags)
  1637. sendFut.callback =
  1638. proc () =
  1639. GC_unref(copiedData)
  1640. if sendFut.failed:
  1641. retFuture.fail(sendFut.error)
  1642. else:
  1643. retFuture.complete()
  1644. return retFuture
  1645. # -- Await Macro
  1646. include asyncmacro
  1647. proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
  1648. ## Returns a future that will complete when all the string data from the
  1649. ## specified future stream is retrieved.
  1650. result = ""
  1651. while true:
  1652. let (hasValue, value) = await future.read()
  1653. if hasValue:
  1654. result.add(value)
  1655. else:
  1656. break
  1657. proc callSoon(cbproc: proc () {.gcsafe.}) =
  1658. getGlobalDispatcher().callbacks.addLast(cbproc)
  1659. proc runForever*() =
  1660. ## Begins a never ending global dispatcher poll loop.
  1661. while true:
  1662. poll()
  1663. proc waitFor*[T](fut: Future[T]): T =
  1664. ## **Blocks** the current thread until the specified future completes.
  1665. while not fut.finished:
  1666. poll()
  1667. fut.read