asyncdispatch.nim 74 KB


  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous IO. This includes a dispatcher,
  10. ## a `Future` type implementation, and an `async` macro which allows
  11. ## asynchronous code to be written in a synchronous style with the `await`
  12. ## keyword.
  13. ##
  14. ## The dispatcher acts as a kind of event loop. You must call `poll` on it
  15. ## (or a function which does so for you such as `waitFor` or `runForever`)
  16. ## in order to poll for any outstanding events. The underlying implementation
  17. ## is based on epoll on Linux, IO Completion Ports on Windows and select on
  18. ## other operating systems.
  19. ##
  20. ## The `poll` function will not, on its own, return any events. Instead
  21. ## an appropriate `Future` object will be completed. A `Future` is a
  22. ## type which holds a value which is not yet available, but which *may* be
  23. ## available in the future. You can check whether a future is finished
  24. ## by using the `finished` function. When a future is finished it means that
  25. ## either the value that it holds is now available or it holds an error instead.
  26. ## The latter situation occurs when the operation to complete a future fails
  27. ## with an exception. You can distinguish between the two situations with the
  28. ## `failed` function.
  29. ##
  30. ## Future objects can also store a callback procedure which will be called
  31. ## automatically once the future completes.
  32. ##
  33. ## Futures therefore can be thought of as an implementation of the proactor
  34. ## pattern. In this
  35. ## pattern you make a request for an action, and once that action is fulfilled
  36. ## a future is completed with the result of that action. Requests can be
  37. ## made by calling the appropriate functions. For example: calling the `recv`
  38. ## function will create a request for some data to be read from a socket. The
  39. ## future which the `recv` function returns will then complete once the
  40. ## requested amount of data is read **or** an exception occurs.
  41. ##
  42. ## Code to read some data from a socket may look something like this:
  43. ##
  44. ## .. code-block:: Nim
  45. ## var future = socket.recv(100)
  46. ## future.addCallback(
  47. ## proc () =
  48. ## echo(future.read)
  49. ## )
  50. ##
  51. ## All asynchronous functions returning a `Future` will not block. They
  52. ## will not however return immediately. An asynchronous function will have
  53. ## code which will be executed before an asynchronous request is made, in most
  54. ## cases this code sets up the request.
  55. ##
  56. ## In the above example, the `recv` function will return a brand new
  57. ## `Future` instance once the request for data to be read from the socket
  58. ## is made. This `Future` instance will complete once the requested amount
  59. ## of data is read, in this case it is 100 bytes. The second line sets a
  60. ## callback on this future which will be called once the future completes.
  61. ## All the callback does is write the data stored in the future to `stdout`.
  62. ## The `read` function is used for this and it checks whether the future
  63. ## completes with an error for you (if it did, it will simply raise the
  64. ## error), if there is no error, however, it returns the value of the future.
  65. ##
  66. ## Asynchronous procedures
  67. ## =======================
  68. ##
  69. ## Asynchronous procedures remove the pain of working with callbacks. They do
  70. ## this by allowing you to write asynchronous code the same way as you would
  71. ## write synchronous code.
  72. ##
  73. ## An asynchronous procedure is marked using the `{.async.}` pragma.
  74. ## When marking a procedure with the `{.async.}` pragma it must have a
  75. ## `Future[T]` return type or no return type at all. If you do not specify
  76. ## a return type then `Future[void]` is assumed.
  77. ##
  78. ## Inside asynchronous procedures `await` can be used to call any
  79. ## procedures which return a
  80. ## `Future`; this includes asynchronous procedures. When a procedure is
  81. ## "awaited", the asynchronous procedure it is awaited in will
  82. ## suspend its execution
  83. ## until the awaited procedure's Future completes. At which point the
  84. ## asynchronous procedure will resume its execution. During the period
  85. ## when an asynchronous procedure is suspended other asynchronous procedures
  86. ## will be run by the dispatcher.
  87. ##
  88. ## The `await` call may be used in many contexts. It can be used on the right
  89. ## hand side of a variable declaration: `var data = await socket.recv(100)`,
  90. ## in which case the variable will be set to the value of the future
  91. ## automatically. It can be used to await a `Future` object, and it can
  92. ## be used to await a procedure returning a `Future[void]`:
  93. ## `await socket.send("foobar")`.
  94. ##
  95. ## If an awaited future completes with an error, then `await` will re-raise
  96. ## this error. To avoid this, you can use the `yield` keyword instead of
  97. ## `await`. The following section shows different ways that you can handle
  98. ## exceptions in async procs.
  99. ##
  100. ## Handling Exceptions
  101. ## -------------------
  102. ##
  103. ## The most reliable way to handle exceptions is to use `yield` on a future
  104. ## then check the future's `failed` property. For example:
  105. ##
  106. ## .. code-block:: Nim
  107. ## var future = sock.recv(100)
  108. ## yield future
  109. ## if future.failed:
  110. ## # Handle exception
  111. ##
  112. ## The `async` procedures also offer limited support for the try statement.
  113. ##
  114. ## .. code-block:: Nim
  115. ## try:
  116. ## let data = await sock.recv(100)
  117. ## echo("Received ", data)
  118. ## except:
  119. ## # Handle exception
  120. ##
  121. ## Unfortunately the semantics of the try statement may not always be correct,
  122. ## and occasionally the compilation may fail altogether.
  123. ## As such it is better to use the former style when possible.
  124. ##
  125. ##
  126. ## Discarding futures
  127. ## ==================
  128. ##
  129. ## Futures should **never** be discarded. This is because they may contain
  130. ## errors. If you do not care for the result of a Future then you should
  131. ## use the `asyncCheck` procedure instead of the `discard` keyword. Note
  132. ## however that this does not wait for completion, and you should use
  133. ## `waitFor` for that purpose.
  134. ##
  135. ## Examples
  136. ## ========
  137. ##
  138. ## For examples take a look at the documentation for the modules implementing
  139. ## asynchronous IO. A good place to start is the
  140. ## `asyncnet module <asyncnet.html>`_.
  141. ##
  142. ## Investigating pending futures
  143. ## =============================
  144. ##
  145. ## It's possible to get into a situation where an async proc, or more accurately
  146. ## a `Future[T]` gets stuck and
  147. ## never completes. This can happen for various reasons and can cause serious
  148. ## memory leaks. When this occurs it's hard to identify the procedure that is
  149. ## stuck.
  150. ##
  151. ## Thankfully there is a mechanism which tracks the count of each pending future.
  152. ## All you need to do to enable it is compile with `-d:futureLogging` and
  153. ## use the `getFuturesInProgress` procedure to get the list of pending futures
  154. ## together with the stack traces to the moment of their creation.
  155. ##
  156. ## You may also find it useful to use this
  157. ## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
  158. ## the pending futures into prometheus, allowing you to analyse them via a nice
  159. ## graph.
  160. ##
  161. ##
  162. ##
  163. ## Limitations/Bugs
  164. ## ================
  165. ##
  166. ## * The effect system (`raises: []`) does not work with async procedures.
  167. ##
  168. ##
  169. ## Multiple async backend support
  170. ## ==============================
  171. ##
  172. ## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be
  173. ## implemented in libraries with only minimal support from the language - as
  174. ## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and
  175. ## ``chronos``, and more may come to be developed in the future.
  176. ##
  177. ## Libraries built on top of async/await may wish to support multiple async
  178. ## backends - the best way to do so is to create separate modules for each backend
  179. ## that may be imported side-by-side.
  180. ##
  181. ## An alternative way is to select backend using a global compile flag - this
  182. ## method makes it difficult to compose applications that use both backends as may
  183. ## happen with transitive dependencies, but may be appropriate in some cases -
  184. ## libraries choosing this path should call the flag `asyncBackend`, allowing
  185. ## applications to choose the backend with `-d:asyncBackend=<backend_name>`.
  186. ##
  187. ## Known `async` backends include:
  188. ##
  189. ## * `none` - ``-d:asyncBackend=none`` - disable ``async`` support completely
  190. ## * `asyncdispatch <https://nim-lang.org/docs/asyncdispatch.html> -``-d:asyncBackend=asyncdispatch``
  191. ## * `chronos <https://github.com/status-im/nim-chronos/>` - ``-d:asyncBackend=chronos``
  192. ##
  193. ## ``none`` can be used when a library supports both a synchronous and
  194. ## asynchronous API, to disable the latter.
  195. import os, tables, strutils, times, heapqueue, options, asyncstreams
  196. import options, math, std/monotimes
  197. import asyncfutures except callSoon
  198. import nativesockets, net, deques
  199. export Port, SocketFlag
  200. export asyncfutures except callSoon
  201. export asyncstreams
  202. # TODO: Check if yielded future is nil and throw a more meaningful exception
  203. type
  204. PDispatcherBase = ref object of RootRef
  205. timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
  206. callbacks*: Deque[proc () {.gcsafe.}]
  207. proc processTimers(
  208. p: PDispatcherBase, didSomeWork: var bool
  209. ): Option[int] {.inline.} =
  210. # Pop the timers in the order in which they will expire (smaller `finishAt`).
  211. var count = p.timers.len
  212. let t = getMonoTime()
  213. while count > 0 and t >= p.timers[0].finishAt:
  214. p.timers.pop().fut.complete()
  215. dec count
  216. didSomeWork = true
  217. # Return the number of milliseconds in which the next timer will expire.
  218. if p.timers.len == 0: return
  219. let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
  220. return some(millisecs.int + 1)
  221. proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
  222. while p.callbacks.len > 0:
  223. var cb = p.callbacks.popFirst()
  224. cb()
  225. didSomeWork = true
  226. proc adjustTimeout(
  227. p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
  228. ): int {.inline.} =
  229. if p.callbacks.len != 0:
  230. return 0
  231. if nextTimer.isNone() or pollTimeout == -1:
  232. return pollTimeout
  233. result = max(nextTimer.get(), 0)
  234. result = min(pollTimeout, result)
  235. proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
  236. ## Schedule `cbproc` to be called as soon as possible.
  237. ## The callback is called when control returns to the event loop.
  238. proc initCallSoonProc =
  239. if asyncfutures.getCallSoonProc().isNil:
  240. asyncfutures.setCallSoonProc(callSoon)
  241. template implementSetInheritable() {.dirty.} =
  242. when declared(setInheritable):
  243. proc setInheritable*(fd: AsyncFD, inheritable: bool): bool =
  244. ## Control whether a file handle can be inherited by child processes.
  245. ## Returns `true` on success.
  246. ##
  247. ## This procedure is not guaranteed to be available for all platforms.
  248. ## Test for availability with `declared() <system.html#declared,untyped>`_.
  249. fd.FileHandle.setInheritable(inheritable)
  250. when defined(windows) or defined(nimdoc):
  251. import winlean, sets, hashes
  252. type
  253. CompletionKey = ULONG_PTR
  254. CompletionData* = object
  255. fd*: AsyncFD # TODO: Rename this.
  256. cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
  257. errcode: OSErrorCode) {.closure, gcsafe.})
  258. cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
  259. # when using RegisterWaitForSingleObject, because
  260. # waiting is done in different thread.
  261. PDispatcher* = ref object of PDispatcherBase
  262. ioPort: Handle
  263. handles*: HashSet[AsyncFD] # Export handles so that an external library can register them.
  264. CustomObj = object of OVERLAPPED
  265. data*: CompletionData
  266. CustomRef* = ref CustomObj
  267. AsyncFD* = distinct int
  268. PostCallbackData = object
  269. ioPort: Handle
  270. handleFd: AsyncFD
  271. waitFd: Handle
  272. ovl: owned CustomRef
  273. PostCallbackDataPtr = ptr PostCallbackData
  274. AsyncEventImpl = object
  275. hEvent: Handle
  276. hWaiter: Handle
  277. pcd: PostCallbackDataPtr
  278. AsyncEvent* = ptr AsyncEventImpl
  279. Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
  280. proc hash(x: AsyncFD): Hash {.borrow.}
  281. proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
  282. proc newDispatcher*(): owned PDispatcher =
  283. ## Creates a new Dispatcher instance.
  284. new result
  285. result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
  286. result.handles = initHashSet[AsyncFD]()
  287. result.timers.clear()
  288. result.callbacks = initDeque[proc () {.closure, gcsafe.}](64)
  289. var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
  290. proc setGlobalDispatcher*(disp: sink PDispatcher) =
  291. if not gDisp.isNil:
  292. assert gDisp.callbacks.len == 0
  293. gDisp = disp
  294. initCallSoonProc()
  295. proc getGlobalDispatcher*(): PDispatcher =
  296. if gDisp.isNil:
  297. setGlobalDispatcher(newDispatcher())
  298. result = gDisp
  299. proc getIoHandler*(disp: PDispatcher): Handle =
  300. ## Returns the underlying IO Completion Port handle (Windows) or selector
  301. ## (Unix) for the specified dispatcher.
  302. return disp.ioPort
  303. proc register*(fd: AsyncFD) =
  304. ## Registers `fd` with the dispatcher.
  305. let p = getGlobalDispatcher()
  306. if createIoCompletionPort(fd.Handle, p.ioPort,
  307. cast[CompletionKey](fd), 1) == 0:
  308. raiseOSError(osLastError())
  309. p.handles.incl(fd)
  310. proc verifyPresence(fd: AsyncFD) =
  311. ## Ensures that file descriptor has been registered with the dispatcher.
  312. ## Raises ValueError if `fd` has not been registered.
  313. let p = getGlobalDispatcher()
  314. if fd notin p.handles:
  315. raise newException(ValueError,
  316. "Operation performed on a socket which has not been registered with" &
  317. " the dispatcher yet.")
  318. proc hasPendingOperations*(): bool =
  319. ## Returns `true` if the global dispatcher has pending operations.
  320. let p = getGlobalDispatcher()
  321. p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
  322. proc runOnce(timeout = 500): bool =
  323. let p = getGlobalDispatcher()
  324. if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
  325. raise newException(ValueError,
  326. "No handles or timers registered in dispatcher.")
  327. result = false
  328. let nextTimer = processTimers(p, result)
  329. let at = adjustTimeout(p, timeout, nextTimer)
  330. var llTimeout =
  331. if at == -1: winlean.INFINITE
  332. else: at.int32
  333. var lpNumberOfBytesTransferred: DWORD
  334. var lpCompletionKey: ULONG_PTR
  335. var customOverlapped: CustomRef
  336. let res = getQueuedCompletionStatus(p.ioPort,
  337. addr lpNumberOfBytesTransferred, addr lpCompletionKey,
  338. cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
  339. result = true
  340. # For 'gcDestructors' the destructor of 'customOverlapped' will
  341. # be called at the end and we are the only owner here. This means
  342. # We do not have to 'GC_unref(customOverlapped)' because the destructor
  343. # does that for us.
  344. # http://stackoverflow.com/a/12277264/492186
  345. # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
  346. if res:
  347. # This is useful for ensuring the reliability of the overlapped struct.
  348. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  349. customOverlapped.data.cb(customOverlapped.data.fd,
  350. lpNumberOfBytesTransferred, OSErrorCode(-1))
  351. # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
  352. # so we need to dispose our `cb` environment, because it is not needed
  353. # anymore.
  354. if customOverlapped.data.cell.data != nil:
  355. system.dispose(customOverlapped.data.cell)
  356. when not defined(gcDestructors):
  357. GC_unref(customOverlapped)
  358. else:
  359. let errCode = osLastError()
  360. if customOverlapped != nil:
  361. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  362. customOverlapped.data.cb(customOverlapped.data.fd,
  363. lpNumberOfBytesTransferred, errCode)
  364. if customOverlapped.data.cell.data != nil:
  365. system.dispose(customOverlapped.data.cell)
  366. when not defined(gcDestructors):
  367. GC_unref(customOverlapped)
  368. else:
  369. if errCode.int32 == WAIT_TIMEOUT:
  370. # Timed out
  371. result = false
  372. else: raiseOSError(errCode)
  373. # Timer processing.
  374. discard processTimers(p, result)
  375. # Callback queue processing
  376. processPendingCallbacks(p, result)
  377. var acceptEx: WSAPROC_ACCEPTEX
  378. var connectEx: WSAPROC_CONNECTEX
  379. var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
  380. proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
  381. # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
  382. var bytesRet: DWORD
  383. fun = nil
  384. result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
  385. sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
  386. addr bytesRet, nil, nil) == 0
  387. proc initAll() =
  388. let dummySock = createNativeSocket()
  389. if dummySock == INVALID_SOCKET:
  390. raiseOSError(osLastError())
  391. var fun: pointer = nil
  392. if not initPointer(dummySock, fun, WSAID_CONNECTEX):
  393. raiseOSError(osLastError())
  394. connectEx = cast[WSAPROC_CONNECTEX](fun)
  395. if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
  396. raiseOSError(osLastError())
  397. acceptEx = cast[WSAPROC_ACCEPTEX](fun)
  398. if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
  399. raiseOSError(osLastError())
  400. getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
  401. close(dummySock)
  402. proc newCustom*(): CustomRef =
  403. result = CustomRef() # 0
  404. GC_ref(result) # 1 prevent destructor from doing a premature free.
  405. # destructor of newCustom's caller --> 0. This means
  406. # Windows holds a ref for us with RC == 0 (single owner).
  407. # This is passed back to us in the IO completion port.
  408. proc recv*(socket: AsyncFD, size: int,
  409. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
  410. ## Reads **up to** `size` bytes from `socket`. Returned future will
  411. ## complete once all the data requested is read, a part of the data has been
  412. ## read, or the socket has disconnected in which case the future will
  413. ## complete with a value of `""`.
  414. ##
  415. ## .. warning:: The `Peek` socket flag is not supported on Windows.
  416. # Things to note:
  417. # * When WSARecv completes immediately then `bytesReceived` is very
  418. # unreliable.
  419. # * Still need to implement message-oriented socket disconnection,
  420. # '\0' in the message currently signifies a socket disconnect. Who
  421. # knows what will happen when someone sends that to our socket.
  422. verifyPresence(socket)
  423. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  424. var retFuture = newFuture[string]("recv")
  425. var dataBuf: TWSABuf
  426. dataBuf.buf = cast[cstring](alloc0(size))
  427. dataBuf.len = size.ULONG
  428. var bytesReceived: DWORD
  429. var flagsio = flags.toOSFlags().DWORD
  430. var ol = newCustom()
  431. ol.data = CompletionData(fd: socket, cb:
  432. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  433. if not retFuture.finished:
  434. if errcode == OSErrorCode(-1):
  435. if bytesCount == 0 and dataBuf.buf[0] == '\0':
  436. retFuture.complete("")
  437. else:
  438. var data = newString(bytesCount)
  439. assert bytesCount <= size
  440. copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
  441. retFuture.complete($data)
  442. else:
  443. if flags.isDisconnectionError(errcode):
  444. retFuture.complete("")
  445. else:
  446. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  447. if dataBuf.buf != nil:
  448. dealloc dataBuf.buf
  449. dataBuf.buf = nil
  450. )
  451. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  452. addr flagsio, cast[POVERLAPPED](ol), nil)
  453. if ret == -1:
  454. let err = osLastError()
  455. if err.int32 != ERROR_IO_PENDING:
  456. if dataBuf.buf != nil:
  457. dealloc dataBuf.buf
  458. dataBuf.buf = nil
  459. GC_unref(ol)
  460. if flags.isDisconnectionError(err):
  461. retFuture.complete("")
  462. else:
  463. retFuture.fail(newException(OSError, osErrorMsg(err)))
  464. elif ret == 0:
  465. # Request completed immediately.
  466. if bytesReceived != 0:
  467. var data = newString(bytesReceived)
  468. assert bytesReceived <= size
  469. copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
  470. retFuture.complete($data)
  471. else:
  472. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  473. retFuture.complete("")
  474. return retFuture
  475. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  476. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  477. ## Reads **up to** `size` bytes from `socket` into `buf`, which must
  478. ## at least be of that size. Returned future will complete once all the
  479. ## data requested is read, a part of the data has been read, or the socket
  480. ## has disconnected in which case the future will complete with a value of
  481. ## `0`.
  482. ##
  483. ## .. warning:: The `Peek` socket flag is not supported on Windows.
  484. # Things to note:
  485. # * When WSARecv completes immediately then `bytesReceived` is very
  486. # unreliable.
  487. # * Still need to implement message-oriented socket disconnection,
  488. # '\0' in the message currently signifies a socket disconnect. Who
  489. # knows what will happen when someone sends that to our socket.
  490. verifyPresence(socket)
  491. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  492. var retFuture = newFuture[int]("recvInto")
  493. #buf[] = '\0'
  494. var dataBuf: TWSABuf
  495. dataBuf.buf = cast[cstring](buf)
  496. dataBuf.len = size.ULONG
  497. var bytesReceived: DWORD
  498. var flagsio = flags.toOSFlags().DWORD
  499. var ol = newCustom()
  500. ol.data = CompletionData(fd: socket, cb:
  501. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  502. if not retFuture.finished:
  503. if errcode == OSErrorCode(-1):
  504. retFuture.complete(bytesCount)
  505. else:
  506. if flags.isDisconnectionError(errcode):
  507. retFuture.complete(0)
  508. else:
  509. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  510. if dataBuf.buf != nil:
  511. dataBuf.buf = nil
  512. )
  513. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  514. addr flagsio, cast[POVERLAPPED](ol), nil)
  515. if ret == -1:
  516. let err = osLastError()
  517. if err.int32 != ERROR_IO_PENDING:
  518. if dataBuf.buf != nil:
  519. dataBuf.buf = nil
  520. GC_unref(ol)
  521. if flags.isDisconnectionError(err):
  522. retFuture.complete(0)
  523. else:
  524. retFuture.fail(newException(OSError, osErrorMsg(err)))
  525. elif ret == 0:
  526. # Request completed immediately.
  527. if bytesReceived != 0:
  528. assert bytesReceived <= size
  529. retFuture.complete(bytesReceived)
  530. else:
  531. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  532. retFuture.complete(bytesReceived)
  533. return retFuture
  534. proc send*(socket: AsyncFD, buf: pointer, size: int,
  535. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  536. ## Sends `size` bytes from `buf` to `socket`. The returned future
  537. ## will complete once all data has been sent.
  538. ##
  539. ## .. warning:: Use it with caution. If `buf` refers to GC'ed object,
  540. ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
  541. verifyPresence(socket)
  542. var retFuture = newFuture[void]("send")
  543. var dataBuf: TWSABuf
  544. dataBuf.buf = cast[cstring](buf)
  545. dataBuf.len = size.ULONG
  546. var bytesReceived, lowFlags: DWORD
  547. var ol = newCustom()
  548. ol.data = CompletionData(fd: socket, cb:
  549. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  550. if not retFuture.finished:
  551. if errcode == OSErrorCode(-1):
  552. retFuture.complete()
  553. else:
  554. if flags.isDisconnectionError(errcode):
  555. retFuture.complete()
  556. else:
  557. retFuture.fail(newOSError(errcode))
  558. )
  559. let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  560. lowFlags, cast[POVERLAPPED](ol), nil)
  561. if ret == -1:
  562. let err = osLastError()
  563. if err.int32 != ERROR_IO_PENDING:
  564. GC_unref(ol)
  565. if flags.isDisconnectionError(err):
  566. retFuture.complete()
  567. else:
  568. retFuture.fail(newException(OSError, osErrorMsg(err)))
  569. else:
  570. retFuture.complete()
  571. # We don't deallocate `ol` here because even though this completed
  572. # immediately poll will still be notified about its completion and it will
  573. # free `ol`.
  574. return retFuture
  575. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  576. saddrLen: SockLen,
  577. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  578. ## Sends `data` to specified destination `saddr`, using
  579. ## socket `socket`. The returned future will complete once all data
  580. ## has been sent.
  581. verifyPresence(socket)
  582. var retFuture = newFuture[void]("sendTo")
  583. var dataBuf: TWSABuf
  584. dataBuf.buf = cast[cstring](data)
  585. dataBuf.len = size.ULONG
  586. var bytesSent = 0.DWORD
  587. var lowFlags = 0.DWORD
  588. # we will preserve address in our stack
  589. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  590. var stalen: cint = cint(saddrLen)
  591. zeroMem(addr(staddr[0]), 128)
  592. copyMem(addr(staddr[0]), saddr, saddrLen)
  593. var ol = newCustom()
  594. ol.data = CompletionData(fd: socket, cb:
  595. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  596. if not retFuture.finished:
  597. if errcode == OSErrorCode(-1):
  598. retFuture.complete()
  599. else:
  600. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  601. )
  602. let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
  603. lowFlags, cast[ptr SockAddr](addr(staddr[0])),
  604. stalen, cast[POVERLAPPED](ol), nil)
  605. if ret == -1:
  606. let err = osLastError()
  607. if err.int32 != ERROR_IO_PENDING:
  608. GC_unref(ol)
  609. retFuture.fail(newException(OSError, osErrorMsg(err)))
  610. else:
  611. retFuture.complete()
  612. # We don't deallocate `ol` here because even though this completed
  613. # immediately poll will still be notified about its completion and it will
  614. # free `ol`.
  615. return retFuture
  616. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  617. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  618. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  619. ## Receives a datagram data from `socket` into `buf`, which must
  620. ## be at least of size `size`, address of datagram's sender will be
  621. ## stored into `saddr` and `saddrLen`. Returned future will complete
  622. ## once one datagram has been received, and will return size of packet
  623. ## received.
  624. verifyPresence(socket)
  625. var retFuture = newFuture[int]("recvFromInto")
  626. var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
  627. var bytesReceived = 0.DWORD
  628. var lowFlags = 0.DWORD
  629. var ol = newCustom()
  630. ol.data = CompletionData(fd: socket, cb:
  631. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  632. if not retFuture.finished:
  633. if errcode == OSErrorCode(-1):
  634. assert bytesCount <= size
  635. retFuture.complete(bytesCount)
  636. else:
  637. # datagram sockets don't have disconnection,
  638. # so we can just raise an exception
  639. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  640. )
  641. let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
  642. addr bytesReceived, addr lowFlags,
  643. saddr, cast[ptr cint](saddrLen),
  644. cast[POVERLAPPED](ol), nil)
  645. if res == -1:
  646. let err = osLastError()
  647. if err.int32 != ERROR_IO_PENDING:
  648. GC_unref(ol)
  649. retFuture.fail(newException(OSError, osErrorMsg(err)))
  650. else:
  651. # Request completed immediately.
  652. if bytesReceived != 0:
  653. assert bytesReceived <= size
  654. retFuture.complete(bytesReceived)
  655. else:
  656. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  657. retFuture.complete(bytesReceived)
  658. return retFuture
  659. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
  660. inheritable = defined(nimInheritHandles)):
  661. owned(Future[tuple[address: string, client: AsyncFD]]) =
  662. ## Accepts a new connection. Returns a future containing the client socket
  663. ## corresponding to that connection and the remote address of the client.
  664. ## The future will complete when the connection is successfully accepted.
  665. ##
  666. ## The resulting client socket is automatically registered to the
  667. ## dispatcher.
  668. ##
  669. ## If `inheritable` is false (the default), the resulting client socket will
  670. ## not be inheritable by child processes.
  671. ##
  672. ## The `accept` call may result in an error if the connecting socket
  673. ## disconnects during the duration of the `accept`. If the `SafeDisconn`
  674. ## flag is specified then this error will not be raised and instead
  675. ## accept will be called again.
  676. verifyPresence(socket)
  677. var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
  678. var clientSock = createNativeSocket(inheritable = inheritable)
  679. if clientSock == osInvalidSocket: raiseOSError(osLastError())
  680. const lpOutputLen = 1024
  681. var lpOutputBuf = newString(lpOutputLen)
  682. var dwBytesReceived: DWORD
  683. let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
  684. let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
  685. let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
  686. template failAccept(errcode) =
  687. if flags.isDisconnectionError(errcode):
  688. var newAcceptFut = acceptAddr(socket, flags)
  689. newAcceptFut.callback =
  690. proc () =
  691. if newAcceptFut.failed:
  692. retFuture.fail(newAcceptFut.readError)
  693. else:
  694. retFuture.complete(newAcceptFut.read)
  695. else:
  696. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  697. template completeAccept() {.dirty.} =
  698. var listenSock = socket
  699. let setoptRet = setsockopt(clientSock, SOL_SOCKET,
  700. SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
  701. sizeof(listenSock).SockLen)
  702. if setoptRet != 0:
  703. let errcode = osLastError()
  704. discard clientSock.closesocket()
  705. failAccept(errcode)
  706. else:
  707. var localSockaddr, remoteSockaddr: ptr SockAddr
  708. var localLen, remoteLen: int32
  709. getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
  710. dwLocalAddressLength, dwRemoteAddressLength,
  711. addr localSockaddr, addr localLen,
  712. addr remoteSockaddr, addr remoteLen)
  713. try:
  714. let address = getAddrString(remoteSockaddr)
  715. register(clientSock.AsyncFD)
  716. retFuture.complete((address: address, client: clientSock.AsyncFD))
  717. except:
  718. # getAddrString may raise
  719. clientSock.close()
  720. retFuture.fail(getCurrentException())
  721. var ol = newCustom()
  722. ol.data = CompletionData(fd: socket, cb:
  723. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  724. if not retFuture.finished:
  725. if errcode == OSErrorCode(-1):
  726. completeAccept()
  727. else:
  728. failAccept(errcode)
  729. )
  730. # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
  731. let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
  732. dwReceiveDataLength,
  733. dwLocalAddressLength,
  734. dwRemoteAddressLength,
  735. addr dwBytesReceived, cast[POVERLAPPED](ol))
  736. if not ret:
  737. let err = osLastError()
  738. if err.int32 != ERROR_IO_PENDING:
  739. failAccept(err)
  740. GC_unref(ol)
  741. else:
  742. completeAccept()
  743. # We don't deallocate `ol` here because even though this completed
  744. # immediately poll will still be notified about its completion and it will
  745. # free `ol`.
  746. return retFuture
  747. implementSetInheritable()
  748. proc closeSocket*(socket: AsyncFD) =
  749. ## Closes a socket and ensures that it is unregistered.
  750. socket.SocketHandle.close()
  751. getGlobalDispatcher().handles.excl(socket)
  752. proc unregister*(fd: AsyncFD) =
  753. ## Unregisters `fd`.
  754. getGlobalDispatcher().handles.excl(fd)
  755. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  756. return fd in disp.handles
  757. {.push stackTrace: off.}
  758. proc waitableCallback(param: pointer,
  759. timerOrWaitFired: WINBOOL) {.stdcall.} =
  760. var p = cast[PostCallbackDataPtr](param)
  761. discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
  762. ULONG_PTR(p.handleFd),
  763. cast[pointer](p.ovl))
  764. {.pop.}
  765. proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
  766. let p = getGlobalDispatcher()
  767. var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
  768. var hEvent = wsaCreateEvent()
  769. if hEvent == 0:
  770. raiseOSError(osLastError())
  771. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  772. pcd.ioPort = p.ioPort
  773. pcd.handleFd = fd
  774. var ol = newCustom()
  775. ol.data = CompletionData(fd: fd, cb:
  776. proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
  777. # we excluding our `fd` because cb(fd) can register own handler
  778. # for this `fd`
  779. p.handles.excl(fd)
  780. # unregisterWait() is called before callback, because appropriate
  781. # winsockets function can re-enable event.
  782. # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
  783. if unregisterWait(pcd.waitFd) == 0:
  784. let err = osLastError()
  785. if err.int32 != ERROR_IO_PENDING:
  786. deallocShared(cast[pointer](pcd))
  787. discard wsaCloseEvent(hEvent)
  788. raiseOSError(err)
  789. if cb(fd):
  790. # callback returned `true`, so we free all allocated resources
  791. deallocShared(cast[pointer](pcd))
  792. if not wsaCloseEvent(hEvent):
  793. raiseOSError(osLastError())
  794. # pcd.ovl will be unrefed in poll().
  795. else:
  796. # callback returned `false` we need to continue
  797. if p.handles.contains(fd):
  798. # new callback was already registered with `fd`, so we free all
  799. # allocated resources. This happens because in callback `cb`
  800. # addRead/addWrite was called with same `fd`.
  801. deallocShared(cast[pointer](pcd))
  802. if not wsaCloseEvent(hEvent):
  803. raiseOSError(osLastError())
  804. else:
  805. # we need to include `fd` again
  806. p.handles.incl(fd)
  807. # and register WaitForSingleObject again
  808. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  809. cast[WAITORTIMERCALLBACK](waitableCallback),
  810. cast[pointer](pcd), INFINITE, flags):
  811. # pcd.ovl will be unrefed in poll()
  812. let err = osLastError()
  813. deallocShared(cast[pointer](pcd))
  814. discard wsaCloseEvent(hEvent)
  815. raiseOSError(err)
  816. else:
  817. # we incref `pcd.ovl` and `protect` callback one more time,
  818. # because it will be unrefed and disposed in `poll()` after
  819. # callback finishes.
  820. GC_ref(pcd.ovl)
  821. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  822. )
  823. # We need to protect our callback environment value, so GC will not free it
  824. # accidentally.
  825. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  826. # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
  827. # will be signaled when appropriate `mask` events will be triggered.
  828. if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
  829. let err = osLastError()
  830. GC_unref(ol)
  831. deallocShared(cast[pointer](pcd))
  832. discard wsaCloseEvent(hEvent)
  833. raiseOSError(err)
  834. pcd.ovl = ol
  835. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  836. cast[WAITORTIMERCALLBACK](waitableCallback),
  837. cast[pointer](pcd), INFINITE, flags):
  838. let err = osLastError()
  839. GC_unref(ol)
  840. deallocShared(cast[pointer](pcd))
  841. discard wsaCloseEvent(hEvent)
  842. raiseOSError(err)
  843. p.handles.incl(fd)
  844. proc addRead*(fd: AsyncFD, cb: Callback) =
  845. ## Start watching the file descriptor for read availability and then call
  846. ## the callback `cb`.
  847. ##
  848. ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
  849. ## so if you can avoid it, please do it. Use `addRead` only if really
  850. ## need it (main usecase is adaptation of unix-like libraries to be
  851. ## asynchronous on Windows).
  852. ##
  853. ## If you use this function, you don't need to use asyncdispatch.recv()
  854. ## or asyncdispatch.accept(), because they are using IOCP, please use
  855. ## nativesockets.recv() and nativesockets.accept() instead.
  856. ##
  857. ## Be sure your callback `cb` returns `true`, if you want to remove
  858. ## watch of `read` notifications, and `false`, if you want to continue
  859. ## receiving notifications.
  860. registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
  861. proc addWrite*(fd: AsyncFD, cb: Callback) =
  862. ## Start watching the file descriptor for write availability and then call
  863. ## the callback `cb`.
  864. ##
  865. ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
  866. ## so if you can avoid it, please do it. Use `addWrite` only if really
  867. ## need it (main usecase is adaptation of unix-like libraries to be
  868. ## asynchronous on Windows).
  869. ##
  870. ## If you use this function, you don't need to use asyncdispatch.send()
  871. ## or asyncdispatch.connect(), because they are using IOCP, please use
  872. ## nativesockets.send() and nativesockets.connect() instead.
  873. ##
  874. ## Be sure your callback `cb` returns `true`, if you want to remove
  875. ## watch of `write` notifications, and `false`, if you want to continue
  876. ## receiving notifications.
  877. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
  878. template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
  879. handleCallback) =
  880. let handleFD = AsyncFD(hEvent)
  881. pcd.ioPort = p.ioPort
  882. pcd.handleFd = handleFD
  883. var ol = newCustom()
  884. ol.data.fd = handleFD
  885. ol.data.cb = handleCallback
  886. # We need to protect our callback environment value, so GC will not free it
  887. # accidentally.
  888. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  889. pcd.ovl = ol
  890. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  891. cast[WAITORTIMERCALLBACK](waitableCallback),
  892. cast[pointer](pcd), timeout.DWORD, flags):
  893. let err = osLastError()
  894. GC_unref(ol)
  895. deallocShared(cast[pointer](pcd))
  896. discard closeHandle(hEvent)
  897. raiseOSError(err)
  898. p.handles.incl(handleFD)
  899. template closeWaitable(handle: untyped) =
  900. let waitFd = pcd.waitFd
  901. deallocShared(cast[pointer](pcd))
  902. p.handles.excl(fd)
  903. if unregisterWait(waitFd) == 0:
  904. let err = osLastError()
  905. if err.int32 != ERROR_IO_PENDING:
  906. discard closeHandle(handle)
  907. raiseOSError(err)
  908. if closeHandle(handle) == 0:
  909. raiseOSError(osLastError())
  910. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  911. ## Registers callback `cb` to be called when timer expired.
  912. ##
  913. ## Parameters:
  914. ##
  915. ## * `timeout` - timeout value in milliseconds.
  916. ## * `oneshot`
  917. ## * `true` - generate only one timeout event
  918. ## * `false` - generate timeout events periodically
  919. doAssert(timeout > 0)
  920. let p = getGlobalDispatcher()
  921. var hEvent = createEvent(nil, 1, 0, nil)
  922. if hEvent == INVALID_HANDLE_VALUE:
  923. raiseOSError(osLastError())
  924. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  925. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  926. if oneshot: flags = flags or WT_EXECUTEONLYONCE
  927. proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  928. let res = cb(fd)
  929. if res or oneshot:
  930. closeWaitable(hEvent)
  931. else:
  932. # if callback returned `false`, then it wants to be called again, so
  933. # we need to ref and protect `pcd.ovl` again, because it will be
  934. # unrefed and disposed in `poll()`.
  935. GC_ref(pcd.ovl)
  936. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  937. registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
  938. proc addProcess*(pid: int, cb: Callback) =
  939. ## Registers callback `cb` to be called when process with process ID
  940. ## `pid` exited.
  941. const NULL = Handle(0)
  942. let p = getGlobalDispatcher()
  943. let procFlags = SYNCHRONIZE
  944. var hProcess = openProcess(procFlags, 0, pid.DWORD)
  945. if hProcess == NULL:
  946. raiseOSError(osLastError())
  947. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  948. var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD
  949. proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  950. closeWaitable(hProcess)
  951. discard cb(fd)
  952. registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
  953. proc newAsyncEvent*(): AsyncEvent =
  954. ## Creates a new thread-safe `AsyncEvent` object.
  955. ##
  956. ## New `AsyncEvent` object is not automatically registered with
  957. ## dispatcher like `AsyncSocket`.
  958. var sa = SECURITY_ATTRIBUTES(
  959. nLength: sizeof(SECURITY_ATTRIBUTES).cint,
  960. bInheritHandle: 1
  961. )
  962. var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
  963. if event == INVALID_HANDLE_VALUE:
  964. raiseOSError(osLastError())
  965. result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
  966. result.hEvent = event
  967. proc trigger*(ev: AsyncEvent) =
  968. ## Set event `ev` to signaled state.
  969. if setEvent(ev.hEvent) == 0:
  970. raiseOSError(osLastError())
  971. proc unregister*(ev: AsyncEvent) =
  972. ## Unregisters event `ev`.
  973. doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
  974. let p = getGlobalDispatcher()
  975. p.handles.excl(AsyncFD(ev.hEvent))
  976. if unregisterWait(ev.hWaiter) == 0:
  977. let err = osLastError()
  978. if err.int32 != ERROR_IO_PENDING:
  979. raiseOSError(err)
  980. ev.hWaiter = 0
  981. proc close*(ev: AsyncEvent) =
  982. ## Closes event `ev`.
  983. let res = closeHandle(ev.hEvent)
  984. deallocShared(cast[pointer](ev))
  985. if res == 0:
  986. raiseOSError(osLastError())
  987. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  988. ## Registers callback `cb` to be called when `ev` will be signaled
  989. doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
  990. let p = getGlobalDispatcher()
  991. let hEvent = ev.hEvent
  992. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  993. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  994. proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  995. if ev.hWaiter != 0:
  996. if cb(fd):
  997. # we need this check to avoid exception, if `unregister(event)` was
  998. # called in callback.
  999. deallocShared(cast[pointer](pcd))
  1000. if ev.hWaiter != 0:
  1001. unregister(ev)
  1002. else:
  1003. # if callback returned `false`, then it wants to be called again, so
  1004. # we need to ref and protect `pcd.ovl` again, because it will be
  1005. # unrefed and disposed in `poll()`.
  1006. GC_ref(pcd.ovl)
  1007. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  1008. else:
  1009. # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
  1010. deallocShared(cast[pointer](pcd))
  1011. registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
  1012. ev.hWaiter = pcd.waitFd
  1013. initAll()
  1014. else:
  1015. import selectors
  1016. from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
  1017. MSG_NOSIGNAL
  1018. when declared(posix.accept4):
  1019. from posix import accept4, SOCK_CLOEXEC
  1020. const
  1021. InitCallbackListSize = 4 # initial size of callbacks sequence,
  1022. # associated with file/socket descriptor.
  1023. InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
  1024. # queue.
  1025. type
  1026. AsyncFD* = distinct cint
  1027. Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
  1028. AsyncData = object
  1029. readList: seq[Callback]
  1030. writeList: seq[Callback]
  1031. AsyncEvent* = distinct SelectEvent
  1032. PDispatcher* = ref object of PDispatcherBase
  1033. selector: Selector[AsyncData]
  1034. proc `==`*(x, y: AsyncFD): bool {.borrow.}
  1035. proc `==`*(x, y: AsyncEvent): bool {.borrow.}
  1036. template newAsyncData(): AsyncData =
  1037. AsyncData(
  1038. readList: newSeqOfCap[Callback](InitCallbackListSize),
  1039. writeList: newSeqOfCap[Callback](InitCallbackListSize)
  1040. )
  1041. proc newDispatcher*(): owned(PDispatcher) =
  1042. new result
  1043. result.selector = newSelector[AsyncData]()
  1044. result.timers.clear()
  1045. result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
  1046. var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
  1047. proc setGlobalDispatcher*(disp: owned PDispatcher) =
  1048. if not gDisp.isNil:
  1049. assert gDisp.callbacks.len == 0
  1050. gDisp = disp
  1051. initCallSoonProc()
  1052. proc getGlobalDispatcher*(): PDispatcher =
  1053. if gDisp.isNil:
  1054. setGlobalDispatcher(newDispatcher())
  1055. result = gDisp
  1056. proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
  1057. return disp.selector
  1058. proc register*(fd: AsyncFD) =
  1059. let p = getGlobalDispatcher()
  1060. var data = newAsyncData()
  1061. p.selector.registerHandle(fd.SocketHandle, {}, data)
  1062. proc unregister*(fd: AsyncFD) =
  1063. getGlobalDispatcher().selector.unregister(fd.SocketHandle)
  1064. proc unregister*(ev: AsyncEvent) =
  1065. getGlobalDispatcher().selector.unregister(SelectEvent(ev))
  1066. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  1067. return fd.SocketHandle in disp.selector
  1068. proc addRead*(fd: AsyncFD, cb: Callback) =
  1069. let p = getGlobalDispatcher()
  1070. var newEvents = {Event.Read}
  1071. withData(p.selector, fd.SocketHandle, adata) do:
  1072. adata.readList.add(cb)
  1073. newEvents.incl(Event.Read)
  1074. if len(adata.writeList) != 0: newEvents.incl(Event.Write)
  1075. do:
  1076. raise newException(ValueError, "File descriptor not registered.")
  1077. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1078. proc addWrite*(fd: AsyncFD, cb: Callback) =
  1079. let p = getGlobalDispatcher()
  1080. var newEvents = {Event.Write}
  1081. withData(p.selector, fd.SocketHandle, adata) do:
  1082. adata.writeList.add(cb)
  1083. newEvents.incl(Event.Write)
  1084. if len(adata.readList) != 0: newEvents.incl(Event.Read)
  1085. do:
  1086. raise newException(ValueError, "File descriptor not registered.")
  1087. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1088. proc hasPendingOperations*(): bool =
  1089. let p = getGlobalDispatcher()
  1090. not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
  1091. proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) =
  1092. var old = move dest
  1093. dest = src
  1094. for i in 0..high(old):
  1095. dest.add(move old[i])
  1096. proc processBasicCallbacks(
  1097. fd: AsyncFD, event: Event
  1098. ): tuple[readCbListCount, writeCbListCount: int] =
  1099. # Process pending descriptor and AsyncEvent callbacks.
  1100. #
  1101. # Invoke every callback stored in `rwlist`, until one
  1102. # returns `false` (which means callback wants to stay
  1103. # alive). In such case all remaining callbacks will be added
  1104. # to `rwlist` again, in the order they have been inserted.
  1105. #
  1106. # `rwlist` associated with file descriptor MUST BE emptied before
  1107. # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
  1108. # or it can be possible to fall into endless cycle.
  1109. var curList: seq[Callback]
  1110. let selector = getGlobalDispatcher().selector
  1111. withData(selector, fd.int, fdData):
  1112. case event
  1113. of Event.Read:
  1114. #shallowCopy(curList, fdData.readList)
  1115. curList = move fdData.readList
  1116. fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1117. of Event.Write:
  1118. #shallowCopy(curList, fdData.writeList)
  1119. curList = move fdData.writeList
  1120. fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
  1121. else:
  1122. assert false, "Cannot process callbacks for " & $event
  1123. let newLength = max(len(curList), InitCallbackListSize)
  1124. var newList = newSeqOfCap[Callback](newLength)
  1125. var eventsExtinguished = false
  1126. for cb in curList:
  1127. if eventsExtinguished:
  1128. newList.add(cb)
  1129. elif not cb(fd):
  1130. # Callback wants to be called again.
  1131. newList.add(cb)
  1132. # This callback has returned with EAGAIN, so we don't need to
  1133. # call any other callbacks as they are all waiting for the same event
  1134. # on the same fd.
  1135. # We do need to ensure they are called again though.
  1136. eventsExtinguished = true
  1137. withData(selector, fd.int, fdData) do:
  1138. # Descriptor is still present in the queue.
  1139. case event
  1140. of Event.Read: prependSeq(fdData.readList, newList)
  1141. of Event.Write: prependSeq(fdData.writeList, newList)
  1142. else:
  1143. assert false, "Cannot process callbacks for " & $event
  1144. result.readCbListCount = len(fdData.readList)
  1145. result.writeCbListCount = len(fdData.writeList)
  1146. do:
  1147. # Descriptor was unregistered in callback via `unregister()`.
  1148. result.readCbListCount = -1
  1149. result.writeCbListCount = -1
  1150. proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) =
  1151. # Process pending custom event callbacks. Custom events are
  1152. # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
  1153. # There can be only one callback registered with one descriptor,
  1154. # so there is no need to iterate over list.
  1155. var curList: seq[Callback]
  1156. withData(p.selector, fd.int, adata) do:
  1157. curList = move adata.readList
  1158. adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1159. let newLength = len(curList)
  1160. var newList = newSeqOfCap[Callback](newLength)
  1161. var cb = curList[0]
  1162. if not cb(fd):
  1163. newList.add(cb)
  1164. withData(p.selector, fd.int, adata) do:
  1165. # descriptor still present in queue.
  1166. adata.readList = newList & adata.readList
  1167. if len(adata.readList) == 0:
  1168. # if no callbacks registered with descriptor, unregister it.
  1169. p.selector.unregister(fd.int)
  1170. do:
  1171. # descriptor was unregistered in callback via `unregister()`.
  1172. discard
  1173. implementSetInheritable()
  1174. proc closeSocket*(sock: AsyncFD) =
  1175. let selector = getGlobalDispatcher().selector
  1176. if sock.SocketHandle notin selector:
  1177. raise newException(ValueError, "File descriptor not registered.")
  1178. let data = selector.getData(sock.SocketHandle)
  1179. sock.unregister()
  1180. sock.SocketHandle.close()
  1181. # We need to unblock the read and write callbacks which could still be
  1182. # waiting for the socket to become readable and/or writeable.
  1183. for cb in data.readList & data.writeList:
  1184. if not cb(sock):
  1185. raise newException(
  1186. ValueError, "Expecting async operations to stop when fd has closed."
  1187. )
  1188. proc runOnce(timeout = 500): bool =
  1189. let p = getGlobalDispatcher()
  1190. if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
  1191. raise newException(ValueError,
  1192. "No handles or timers registered in dispatcher.")
  1193. result = false
  1194. var keys: array[64, ReadyKey]
  1195. let nextTimer = processTimers(p, result)
  1196. var count =
  1197. p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
  1198. for i in 0..<count:
  1199. let fd = keys[i].fd.AsyncFD
  1200. let events = keys[i].events
  1201. var (readCbListCount, writeCbListCount) = (0, 0)
  1202. if Event.Read in events or events == {Event.Error}:
  1203. (readCbListCount, writeCbListCount) =
  1204. processBasicCallbacks(fd, Event.Read)
  1205. result = true
  1206. if Event.Write in events or events == {Event.Error}:
  1207. (readCbListCount, writeCbListCount) =
  1208. processBasicCallbacks(fd, Event.Write)
  1209. result = true
  1210. var isCustomEvent = false
  1211. if Event.User in events:
  1212. (readCbListCount, writeCbListCount) =
  1213. processBasicCallbacks(fd, Event.Read)
  1214. isCustomEvent = true
  1215. if readCbListCount == 0:
  1216. p.selector.unregister(fd.int)
  1217. result = true
  1218. when ioselSupportedPlatform:
  1219. const customSet = {Event.Timer, Event.Signal, Event.Process,
  1220. Event.Vnode}
  1221. if (customSet * events) != {}:
  1222. isCustomEvent = true
  1223. processCustomCallbacks(p, fd)
  1224. result = true
  1225. # because state `data` can be modified in callback we need to update
  1226. # descriptor events with currently registered callbacks.
  1227. if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
  1228. var newEvents: set[Event] = {}
  1229. if readCbListCount > 0: incl(newEvents, Event.Read)
  1230. if writeCbListCount > 0: incl(newEvents, Event.Write)
  1231. p.selector.updateHandle(SocketHandle(fd), newEvents)
  1232. # Timer processing.
  1233. discard processTimers(p, result)
  1234. # Callback queue processing
  1235. processPendingCallbacks(p, result)
  1236. proc recv*(socket: AsyncFD, size: int,
  1237. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
  1238. var retFuture = newFuture[string]("recv")
  1239. var readBuffer = newString(size)
  1240. proc cb(sock: AsyncFD): bool =
  1241. result = true
  1242. let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
  1243. flags.toOSFlags())
  1244. if res < 0:
  1245. let lastError = osLastError()
  1246. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1247. lastError.int32 != EAGAIN:
  1248. if flags.isDisconnectionError(lastError):
  1249. retFuture.complete("")
  1250. else:
  1251. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1252. else:
  1253. result = false # We still want this callback to be called.
  1254. elif res == 0:
  1255. # Disconnected
  1256. retFuture.complete("")
  1257. else:
  1258. readBuffer.setLen(res)
  1259. retFuture.complete(readBuffer)
  1260. # TODO: The following causes a massive slowdown.
  1261. #if not cb(socket):
  1262. addRead(socket, cb)
  1263. return retFuture
  1264. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  1265. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  1266. var retFuture = newFuture[int]("recvInto")
  1267. proc cb(sock: AsyncFD): bool =
  1268. result = true
  1269. let res = recv(sock.SocketHandle, buf, size.cint,
  1270. flags.toOSFlags())
  1271. if res < 0:
  1272. let lastError = osLastError()
  1273. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1274. lastError.int32 != EAGAIN:
  1275. if flags.isDisconnectionError(lastError):
  1276. retFuture.complete(0)
  1277. else:
  1278. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1279. else:
  1280. result = false # We still want this callback to be called.
  1281. else:
  1282. retFuture.complete(res)
  1283. # TODO: The following causes a massive slowdown.
  1284. #if not cb(socket):
  1285. addRead(socket, cb)
  1286. return retFuture
  1287. proc send*(socket: AsyncFD, buf: pointer, size: int,
  1288. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1289. var retFuture = newFuture[void]("send")
  1290. var written = 0
  1291. proc cb(sock: AsyncFD): bool =
  1292. result = true
  1293. let netSize = size-written
  1294. var d = cast[cstring](buf)
  1295. let res = send(sock.SocketHandle, addr d[written], netSize.cint,
  1296. MSG_NOSIGNAL)
  1297. if res < 0:
  1298. let lastError = osLastError()
  1299. if lastError.int32 != EINTR and
  1300. lastError.int32 != EWOULDBLOCK and
  1301. lastError.int32 != EAGAIN:
  1302. if flags.isDisconnectionError(lastError):
  1303. retFuture.complete()
  1304. else:
  1305. retFuture.fail(newOSError(lastError))
  1306. else:
  1307. result = false # We still want this callback to be called.
  1308. else:
  1309. written.inc(res)
  1310. if res != netSize:
  1311. result = false # We still have data to send.
  1312. else:
  1313. retFuture.complete()
  1314. # TODO: The following causes crashes.
  1315. #if not cb(socket):
  1316. addWrite(socket, cb)
  1317. return retFuture
  1318. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  1319. saddrLen: SockLen,
  1320. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1321. ## Sends `data` of size `size` in bytes to specified destination
  1322. ## (`saddr` of size `saddrLen` in bytes, using socket `socket`.
  1323. ## The returned future will complete once all data has been sent.
  1324. var retFuture = newFuture[void]("sendTo")
  1325. # we will preserve address in our stack
  1326. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  1327. var stalen = saddrLen
  1328. zeroMem(addr(staddr[0]), 128)
  1329. copyMem(addr(staddr[0]), saddr, saddrLen)
  1330. proc cb(sock: AsyncFD): bool =
  1331. result = true
  1332. let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
  1333. cast[ptr SockAddr](addr(staddr[0])), stalen)
  1334. if res < 0:
  1335. let lastError = osLastError()
  1336. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1337. lastError.int32 != EAGAIN:
  1338. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1339. else:
  1340. result = false # We still want this callback to be called.
  1341. else:
  1342. retFuture.complete()
  1343. addWrite(socket, cb)
  1344. return retFuture
  1345. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  1346. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  1347. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  1348. ## Receives a datagram data from `socket` into `data`, which must
  1349. ## be at least of size `size` in bytes, address of datagram's sender
  1350. ## will be stored into `saddr` and `saddrLen`. Returned future will
  1351. ## complete once one datagram has been received, and will return size
  1352. ## of packet received.
  1353. var retFuture = newFuture[int]("recvFromInto")
  1354. proc cb(sock: AsyncFD): bool =
  1355. result = true
  1356. let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
  1357. saddr, saddrLen)
  1358. if res < 0:
  1359. let lastError = osLastError()
  1360. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1361. lastError.int32 != EAGAIN:
  1362. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1363. else:
  1364. result = false
  1365. else:
  1366. retFuture.complete(res)
  1367. addRead(socket, cb)
  1368. return retFuture
  1369. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
  1370. inheritable = defined(nimInheritHandles)):
  1371. owned(Future[tuple[address: string, client: AsyncFD]]) =
  1372. var retFuture = newFuture[tuple[address: string,
  1373. client: AsyncFD]]("acceptAddr")
  1374. proc cb(sock: AsyncFD): bool =
  1375. result = true
  1376. var sockAddress: Sockaddr_storage
  1377. var addrLen = sizeof(sockAddress).SockLen
  1378. var client =
  1379. when declared(accept4):
  1380. accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
  1381. addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC)
  1382. else:
  1383. accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
  1384. addr(addrLen))
  1385. when declared(setInheritable) and not declared(accept4):
  1386. if client != osInvalidSocket and not setInheritable(client, inheritable):
  1387. # Set failure first because close() itself can fail,
  1388. # altering osLastError().
  1389. retFuture.fail(newOSError(osLastError()))
  1390. close client
  1391. return false
  1392. if client == osInvalidSocket:
  1393. let lastError = osLastError()
  1394. assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN
  1395. if lastError.int32 == EINTR:
  1396. return false
  1397. else:
  1398. if flags.isDisconnectionError(lastError):
  1399. return false
  1400. else:
  1401. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1402. else:
  1403. try:
  1404. let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
  1405. register(client.AsyncFD)
  1406. retFuture.complete((address, client.AsyncFD))
  1407. except:
  1408. # getAddrString may raise
  1409. client.close()
  1410. retFuture.fail(getCurrentException())
  1411. addRead(socket, cb)
  1412. return retFuture
  1413. when ioselSupportedPlatform:
  1414. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  1415. ## Start watching for timeout expiration, and then call the
  1416. ## callback `cb`.
  1417. ## `timeout` - time in milliseconds,
  1418. ## `oneshot` - if `true` only one event will be dispatched,
  1419. ## if `false` continuous events every `timeout` milliseconds.
  1420. let p = getGlobalDispatcher()
  1421. var data = newAsyncData()
  1422. data.readList.add(cb)
  1423. p.selector.registerTimer(timeout, oneshot, data)
  1424. proc addSignal*(signal: int, cb: Callback) =
  1425. ## Start watching signal `signal`, and when signal appears, call the
  1426. ## callback `cb`.
  1427. let p = getGlobalDispatcher()
  1428. var data = newAsyncData()
  1429. data.readList.add(cb)
  1430. p.selector.registerSignal(signal, data)
  1431. proc addProcess*(pid: int, cb: Callback) =
  1432. ## Start watching for process exit with pid `pid`, and then call
  1433. ## the callback `cb`.
  1434. let p = getGlobalDispatcher()
  1435. var data = newAsyncData()
  1436. data.readList.add(cb)
  1437. p.selector.registerProcess(pid, data)
  1438. proc newAsyncEvent*(): AsyncEvent =
  1439. ## Creates new `AsyncEvent`.
  1440. result = AsyncEvent(newSelectEvent())
  1441. proc trigger*(ev: AsyncEvent) =
  1442. ## Sets new `AsyncEvent` to signaled state.
  1443. trigger(SelectEvent(ev))
  1444. proc close*(ev: AsyncEvent) =
  1445. ## Closes `AsyncEvent`
  1446. close(SelectEvent(ev))
  1447. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  1448. ## Start watching for event `ev`, and call callback `cb`, when
  1449. ## ev will be set to signaled state.
  1450. let p = getGlobalDispatcher()
  1451. var data = newAsyncData()
  1452. data.readList.add(cb)
  1453. p.selector.registerEvent(SelectEvent(ev), data)
  1454. proc drain*(timeout = 500) =
  1455. ## Waits for completion of **all** events and processes them. Raises `ValueError`
  1456. ## if there are no pending operations. In contrast to `poll` this
  1457. ## processes as many events as are available until the timeout has elapsed.
  1458. var curTimeout = timeout
  1459. let start = now()
  1460. while hasPendingOperations():
  1461. discard runOnce(curTimeout)
  1462. curTimeout -= (now() - start).inMilliseconds.int
  1463. if curTimeout < 0:
  1464. break
  1465. proc poll*(timeout = 500) =
  1466. ## Waits for completion events and processes them. Raises `ValueError`
  1467. ## if there are no pending operations. This runs the underlying OS
  1468. ## `epoll`:idx: or `kqueue`:idx: primitive only once.
  1469. discard runOnce(timeout)
  1470. template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped,
  1471. inheritable = defined(nimInheritHandles)) =
  1472. let handle = createNativeSocket(domain, sockType, protocol, inheritable)
  1473. if handle == osInvalidSocket:
  1474. return osInvalidSocket.AsyncFD
  1475. handle.setBlocking(false)
  1476. when defined(macosx) and not defined(nimdoc):
  1477. handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
  1478. result = handle.AsyncFD
  1479. register(result)
  1480. proc createAsyncNativeSocket*(domain: cint, sockType: cint,
  1481. protocol: cint,
  1482. inheritable = defined(nimInheritHandles)): AsyncFD =
  1483. createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
  1484. proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
  1485. sockType: SockType = SOCK_STREAM,
  1486. protocol: Protocol = IPPROTO_TCP,
  1487. inheritable = defined(nimInheritHandles)): AsyncFD =
  1488. createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
  1489. when defined(windows) or defined(nimdoc):
  1490. proc bindToDomain(handle: SocketHandle, domain: Domain) =
  1491. # Extracted into a separate proc, because connect() on Windows requires
  1492. # the socket to be initially bound.
  1493. template doBind(saddr) =
  1494. if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
  1495. sizeof(saddr).SockLen) < 0'i32:
  1496. raiseOSError(osLastError())
  1497. if domain == Domain.AF_INET6:
  1498. var saddr: Sockaddr_in6
  1499. saddr.sin6_family = uint16(toInt(domain))
  1500. doBind(saddr)
  1501. else:
  1502. var saddr: Sockaddr_in
  1503. saddr.sin_family = uint16(toInt(domain))
  1504. doBind(saddr)
  1505. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
  1506. let retFuture = newFuture[void]("doConnect")
  1507. result = retFuture
  1508. var ol = newCustom()
  1509. ol.data = CompletionData(fd: socket, cb:
  1510. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  1511. if not retFuture.finished:
  1512. if errcode == OSErrorCode(-1):
  1513. retFuture.complete()
  1514. else:
  1515. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  1516. )
  1517. let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
  1518. cint(addrInfo.ai_addrlen), nil, 0, nil,
  1519. cast[POVERLAPPED](ol))
  1520. if ret:
  1521. # Request to connect completed immediately.
  1522. retFuture.complete()
  1523. # We don't deallocate `ol` here because even though this completed
  1524. # immediately poll will still be notified about its completion and it
  1525. # will free `ol`.
  1526. else:
  1527. let lastError = osLastError()
  1528. if lastError.int32 != ERROR_IO_PENDING:
  1529. # With ERROR_IO_PENDING `ol` will be deallocated in `poll`,
  1530. # and the future will be completed/failed there, too.
  1531. GC_unref(ol)
  1532. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1533. else:
  1534. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
  1535. let retFuture = newFuture[void]("doConnect")
  1536. result = retFuture
  1537. proc cb(fd: AsyncFD): bool =
  1538. let ret = SocketHandle(fd).getSockOptInt(
  1539. cint(SOL_SOCKET), cint(SO_ERROR))
  1540. if ret == 0:
  1541. # We have connected.
  1542. retFuture.complete()
  1543. return true
  1544. elif ret == EINTR:
  1545. # interrupted, keep waiting
  1546. return false
  1547. else:
  1548. retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
  1549. return true
  1550. let ret = connect(socket.SocketHandle,
  1551. addrInfo.ai_addr,
  1552. addrInfo.ai_addrlen.SockLen)
  1553. if ret == 0:
  1554. # Request to connect completed immediately.
  1555. retFuture.complete()
  1556. else:
  1557. let lastError = osLastError()
  1558. if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
  1559. addWrite(socket, cb)
  1560. else:
  1561. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1562. template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
  1563. protocol: Protocol = IPPROTO_RAW) =
  1564. ## Iterates through the AddrInfo linked list asynchronously
  1565. ## until the connection can be established.
  1566. const shouldCreateFd = not declared(fd)
  1567. when shouldCreateFd:
  1568. let sockType = protocol.toSockType()
  1569. var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
  1570. for i in low(fdPerDomain)..high(fdPerDomain):
  1571. fdPerDomain[i] = osInvalidSocket.AsyncFD
  1572. template closeUnusedFds(domainToKeep = -1) {.dirty.} =
  1573. for i, fd in fdPerDomain:
  1574. if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
  1575. fd.closeSocket()
  1576. var lastException: ref Exception
  1577. var curAddrInfo = addrInfo
  1578. var domain: Domain
  1579. when shouldCreateFd:
  1580. var curFd: AsyncFD
  1581. else:
  1582. var curFd = fd
  1583. proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
  1584. if fut == nil or fut.failed:
  1585. if fut != nil:
  1586. lastException = fut.readError()
  1587. while curAddrInfo != nil:
  1588. let domainOpt = curAddrInfo.ai_family.toKnownDomain()
  1589. if domainOpt.isSome:
  1590. domain = domainOpt.unsafeGet()
  1591. break
  1592. curAddrInfo = curAddrInfo.ai_next
  1593. if curAddrInfo == nil:
  1594. freeaddrinfo(addrInfo)
  1595. when shouldCreateFd:
  1596. closeUnusedFds()
  1597. if lastException != nil:
  1598. retFuture.fail(lastException)
  1599. else:
  1600. retFuture.fail(newException(
  1601. IOError, "Couldn't resolve address: " & address))
  1602. return
  1603. when shouldCreateFd:
  1604. curFd = fdPerDomain[ord(domain)]
  1605. if curFd == osInvalidSocket.AsyncFD:
  1606. try:
  1607. curFd = createAsyncNativeSocket(domain, sockType, protocol)
  1608. except:
  1609. freeaddrinfo(addrInfo)
  1610. closeUnusedFds()
  1611. raise getCurrentException()
  1612. when defined(windows):
  1613. curFd.SocketHandle.bindToDomain(domain)
  1614. fdPerDomain[ord(domain)] = curFd
  1615. doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
  1616. curAddrInfo = curAddrInfo.ai_next
  1617. else:
  1618. freeaddrinfo(addrInfo)
  1619. when shouldCreateFd:
  1620. closeUnusedFds(ord(domain))
  1621. retFuture.complete(curFd)
  1622. else:
  1623. retFuture.complete()
  1624. tryNextAddrInfo(nil)
  1625. proc dial*(address: string, port: Port,
  1626. protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
  1627. ## Establishes connection to the specified `address`:`port` pair via the
  1628. ## specified protocol. The procedure iterates through possible
  1629. ## resolutions of the `address` until it succeeds, meaning that it
  1630. ## seamlessly works with both IPv4 and IPv6.
  1631. ## Returns the async file descriptor, registered in the dispatcher of
  1632. ## the current thread, ready to send or receive data.
  1633. let retFuture = newFuture[AsyncFD]("dial")
  1634. result = retFuture
  1635. let sockType = protocol.toSockType()
  1636. let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
  1637. asyncAddrInfoLoop(aiList, noFD, protocol)
  1638. proc connect*(socket: AsyncFD, address: string, port: Port,
  1639. domain = Domain.AF_INET): owned(Future[void]) =
  1640. let retFuture = newFuture[void]("connect")
  1641. result = retFuture
  1642. when defined(windows):
  1643. verifyPresence(socket)
  1644. else:
  1645. assert getSockDomain(socket.SocketHandle) == domain
  1646. let aiList = getAddrInfo(address, port, domain)
  1647. when defined(windows):
  1648. socket.SocketHandle.bindToDomain(domain)
  1649. asyncAddrInfoLoop(aiList, socket)
  1650. proc sleepAsync*(ms: int | float): owned(Future[void]) =
  1651. ## Suspends the execution of the current async procedure for the next
  1652. ## `ms` milliseconds.
  1653. var retFuture = newFuture[void]("sleepAsync")
  1654. let p = getGlobalDispatcher()
  1655. when ms is int:
  1656. p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
  1657. elif ms is float:
  1658. let ns = (ms * 1_000_000).int64
  1659. p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
  1660. return retFuture
  1661. proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
  1662. ## Returns a future which will complete once `fut` completes or after
  1663. ## `timeout` milliseconds has elapsed.
  1664. ##
  1665. ## If `fut` completes first the returned future will hold true,
  1666. ## otherwise, if `timeout` milliseconds has elapsed first, the returned
  1667. ## future will hold false.
  1668. var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
  1669. var timeoutFuture = sleepAsync(timeout)
  1670. fut.callback =
  1671. proc () =
  1672. if not retFuture.finished:
  1673. if fut.failed:
  1674. retFuture.fail(fut.error)
  1675. else:
  1676. retFuture.complete(true)
  1677. timeoutFuture.callback =
  1678. proc () =
  1679. if not retFuture.finished: retFuture.complete(false)
  1680. return retFuture
  1681. proc accept*(socket: AsyncFD,
  1682. flags = {SocketFlag.SafeDisconn},
  1683. inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) =
  1684. ## Accepts a new connection. Returns a future containing the client socket
  1685. ## corresponding to that connection.
  1686. ##
  1687. ## If `inheritable` is false (the default), the resulting client socket
  1688. ## will not be inheritable by child processes.
  1689. ##
  1690. ## The future will complete when the connection is successfully accepted.
  1691. var retFut = newFuture[AsyncFD]("accept")
  1692. var fut = acceptAddr(socket, flags, inheritable)
  1693. fut.callback =
  1694. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  1695. assert future.finished
  1696. if future.failed:
  1697. retFut.fail(future.error)
  1698. else:
  1699. retFut.complete(future.read.client)
  1700. return retFut
  1701. proc keepAlive(x: string) =
  1702. discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive"
  1703. proc send*(socket: AsyncFD, data: string,
  1704. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1705. ## Sends `data` to `socket`. The returned future will complete once all
  1706. ## data has been sent.
  1707. var retFuture = newFuture[void]("send")
  1708. if data.len > 0:
  1709. let sendFut = socket.send(unsafeAddr data[0], data.len, flags)
  1710. sendFut.callback =
  1711. proc () =
  1712. keepAlive(data)
  1713. if sendFut.failed:
  1714. retFuture.fail(sendFut.error)
  1715. else:
  1716. retFuture.complete()
  1717. else:
  1718. retFuture.complete()
  1719. return retFuture
  1720. # -- Await Macro
  1721. include asyncmacro
  1722. proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
  1723. ## Returns a future that will complete when all the string data from the
  1724. ## specified future stream is retrieved.
  1725. result = ""
  1726. while true:
  1727. let (hasValue, value) = await future.read()
  1728. if hasValue:
  1729. result.add(value)
  1730. else:
  1731. break
  1732. proc callSoon(cbproc: proc () {.gcsafe.}) =
  1733. getGlobalDispatcher().callbacks.addLast(cbproc)
  1734. proc runForever*() =
  1735. ## Begins a never ending global dispatcher poll loop.
  1736. while true:
  1737. poll()
  1738. proc waitFor*[T](fut: Future[T]): T =
  1739. ## **Blocks** the current thread until the specified future completes.
  1740. while not fut.finished:
  1741. poll()
  1742. fut.read
  1743. proc activeDescriptors*(): int {.inline.} =
  1744. ## Returns the current number of active file descriptors for the current
  1745. ## event loop. This is a cheap operation that does not involve a system call.
  1746. when defined(windows):
  1747. result = getGlobalDispatcher().handles.len
  1748. elif not defined(nimdoc):
  1749. result = getGlobalDispatcher().selector.count
  1750. when defined(posix):
  1751. import posix
  1752. when defined(linux) or defined(windows) or defined(macosx) or defined(bsd):
  1753. proc maxDescriptors*(): int {.raises: OSError.} =
  1754. ## Returns the maximum number of active file descriptors for the current
  1755. ## process. This involves a system call. For now `maxDescriptors` is
  1756. ## supported on the following OSes: Windows, Linux, OSX, BSD.
  1757. when defined(windows):
  1758. result = 16_700_000
  1759. else:
  1760. var fdLim: RLimit
  1761. if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
  1762. raiseOSError(osLastError())
  1763. result = int(fdLim.rlim_cur) - 1