asyncdispatch.nim 68 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. include "system/inclrtl"
  10. import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
  11. import options, math
  12. import asyncfutures except callSoon
  13. import nativesockets, net, deques
  14. export Port, SocketFlag
  15. export asyncfutures except callSoon
  16. export asyncstreams
  17. #{.injectStmt: newGcInvariant().}
  18. ## AsyncDispatch
  19. ## *************
  20. ##
  21. ## This module implements asynchronous IO. This includes a dispatcher,
  22. ## a ``Future`` type implementation, and an ``async`` macro which allows
  23. ## asynchronous code to be written in a synchronous style with the ``await``
  24. ## keyword.
  25. ##
  26. ## The dispatcher acts as a kind of event loop. You must call ``poll`` on it
  27. ## (or a function which does so for you such as ``waitFor`` or ``runForever``)
  28. ## in order to poll for any outstanding events. The underlying implementation
  29. ## is based on epoll on Linux, IO Completion Ports on Windows and select on
  30. ## other operating systems.
  31. ##
  32. ## The ``poll`` function will not, on its own, return any events. Instead
  33. ## an appropriate ``Future`` object will be completed. A ``Future`` is a
  34. ## type which holds a value which is not yet available, but which *may* be
  35. ## available in the future. You can check whether a future is finished
  36. ## by using the ``finished`` function. When a future is finished it means that
  37. ## either the value that it holds is now available or it holds an error instead.
  38. ## The latter situation occurs when the operation to complete a future fails
  39. ## with an exception. You can distinguish between the two situations with the
  40. ## ``failed`` function.
  41. ##
  42. ## Future objects can also store a callback procedure which will be called
  43. ## automatically once the future completes.
  44. ##
  45. ## Futures therefore can be thought of as an implementation of the proactor
  46. ## pattern. In this
  47. ## pattern you make a request for an action, and once that action is fulfilled
  48. ## a future is completed with the result of that action. Requests can be
  49. ## made by calling the appropriate functions. For example: calling the ``recv``
  50. ## function will create a request for some data to be read from a socket. The
  51. ## future which the ``recv`` function returns will then complete once the
  52. ## requested amount of data is read **or** an exception occurs.
  53. ##
  54. ## Code to read some data from a socket may look something like this:
  55. ##
  56. ## .. code-block::nim
  57. ## var future = socket.recv(100)
  58. ## future.addCallback(
  59. ## proc () =
  60. ## echo(future.read)
  61. ## )
  62. ##
  63. ## All asynchronous functions returning a ``Future`` will not block. They
  64. ## will not however return immediately. An asynchronous function will have
  65. ## code which will be executed before an asynchronous request is made, in most
  66. ## cases this code sets up the request.
  67. ##
  68. ## In the above example, the ``recv`` function will return a brand new
  69. ## ``Future`` instance once the request for data to be read from the socket
  70. ## is made. This ``Future`` instance will complete once the requested amount
  71. ## of data is read, in this case it is 100 bytes. The second line sets a
  72. ## callback on this future which will be called once the future completes.
  73. ## All the callback does is write the data stored in the future to ``stdout``.
  74. ## The ``read`` function is used for this and it checks whether the future
  75. ## completes with an error for you (if it did it will simply raise the
  76. ## error), if there is no error however it returns the value of the future.
  77. ##
  78. ## Asynchronous procedures
  79. ## -----------------------
  80. ##
  81. ## Asynchronous procedures remove the pain of working with callbacks. They do
  82. ## this by allowing you to write asynchronous code the same way as you would
  83. ## write synchronous code.
  84. ##
  85. ## An asynchronous procedure is marked using the ``{.async.}`` pragma.
  86. ## When marking a procedure with the ``{.async.}`` pragma it must have a
  87. ## ``Future[T]`` return type or no return type at all. If you do not specify
  88. ## a return type then ``Future[void]`` is assumed.
  89. ##
  90. ## Inside asynchronous procedures ``await`` can be used to call any
  91. ## procedures which return a
  92. ## ``Future``; this includes asynchronous procedures. When a procedure is
  93. ## "awaited", the asynchronous procedure it is awaited in will
  94. ## suspend its execution
  95. ## until the awaited procedure's Future completes. At which point the
  96. ## asynchronous procedure will resume its execution. During the period
  97. ## when an asynchronous procedure is suspended other asynchronous procedures
  98. ## will be run by the dispatcher.
  99. ##
  100. ## The ``await`` call may be used in many contexts. It can be used on the right
  101. ## hand side of a variable declaration: ``var data = await socket.recv(100)``,
  102. ## in which case the variable will be set to the value of the future
  103. ## automatically. It can be used to await a ``Future`` object, and it can
  104. ## be used to await a procedure returning a ``Future[void]``:
  105. ## ``await socket.send("foobar")``.
  106. ##
  107. ## If an awaited future completes with an error, then ``await`` will re-raise
  108. ## this error. To avoid this, you can use the ``yield`` keyword instead of
  109. ## ``await``. The following section shows different ways that you can handle
  110. ## exceptions in async procs.
  111. ##
  112. ## Handling Exceptions
  113. ## ~~~~~~~~~~~~~~~~~~~
  114. ##
  115. ## The most reliable way to handle exceptions is to use ``yield`` on a future
  116. ## then check the future's ``failed`` property. For example:
  117. ##
  118. ## .. code-block:: Nim
  119. ## var future = sock.recv(100)
  120. ## yield future
  121. ## if future.failed:
  122. ## # Handle exception
  123. ##
  124. ## The ``async`` procedures also offer limited support for the try statement.
  125. ##
  126. ## .. code-block:: Nim
  127. ## try:
  128. ## let data = await sock.recv(100)
  129. ## echo("Received ", data)
  130. ## except:
  131. ## # Handle exception
  132. ##
  133. ## Unfortunately the semantics of the try statement may not always be correct,
  134. ## and occasionally the compilation may fail altogether.
  135. ## As such it is better to use the former style when possible.
  136. ##
  137. ##
  138. ## Discarding futures
  139. ## ------------------
  140. ##
  141. ## Futures should **never** be discarded. This is because they may contain
  142. ## errors. If you do not care for the result of a Future then you should
  143. ## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. Note
  144. ## however that this does not wait for completion, and you should use
  145. ## ``waitFor`` for that purpose.
  146. ##
  147. ## Examples
  148. ## --------
  149. ##
  150. ## For examples take a look at the documentation for the modules implementing
  151. ## asynchronous IO. A good place to start is the
  152. ## `asyncnet module <asyncnet.html>`_.
  153. ##
  154. ## Limitations/Bugs
  155. ## ----------------
  156. ##
  157. ## * The effect system (``raises: []``) does not work with async procedures.
  158. # TODO: Check if yielded future is nil and throw a more meaningful exception
  159. type
  160. PDispatcherBase = ref object of RootRef
  161. timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
  162. callbacks*: Deque[proc ()]
  163. proc processTimers(
  164. p: PDispatcherBase, didSomeWork: var bool
  165. ): Option[int] {.inline.} =
  166. # Pop the timers in the order in which they will expire (smaller `finishAt`).
  167. var count = p.timers.len
  168. let t = epochTime()
  169. while count > 0 and t >= p.timers[0].finishAt:
  170. p.timers.pop().fut.complete()
  171. dec count
  172. didSomeWork = true
  173. # Return the number of miliseconds in which the next timer will expire.
  174. if p.timers.len == 0: return
  175. let milisecs = (p.timers[0].finishAt - epochTime()) * 1000
  176. return some(ceil(milisecs).int)
  177. proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
  178. while p.callbacks.len > 0:
  179. var cb = p.callbacks.popFirst()
  180. cb()
  181. didSomeWork = true
  182. proc adjustTimeout(pollTimeout: int, nextTimer: Option[int]): int {.inline.} =
  183. if nextTimer.isNone():
  184. return pollTimeout
  185. result = nextTimer.get()
  186. if pollTimeout == -1: return
  187. result = min(pollTimeout, result)
  188. proc callSoon*(cbproc: proc ()) {.gcsafe.}
  189. proc initCallSoonProc =
  190. if asyncfutures.getCallSoonProc().isNil:
  191. asyncfutures.setCallSoonProc(callSoon)
  192. when defined(windows) or defined(nimdoc):
  193. import winlean, sets, hashes
  194. type
  195. CompletionKey = ULONG_PTR
  196. CompletionData* = object
  197. fd*: AsyncFD # TODO: Rename this.
  198. cb*: proc (fd: AsyncFD, bytesTransferred: Dword,
  199. errcode: OSErrorCode) {.closure,gcsafe.}
  200. cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
  201. # when using RegisterWaitForSingleObject, because
  202. # waiting is done in different thread.
  203. PDispatcher* = ref object of PDispatcherBase
  204. ioPort: Handle
  205. handles: HashSet[AsyncFD]
  206. CustomOverlapped = object of OVERLAPPED
  207. data*: CompletionData
  208. PCustomOverlapped* = ref CustomOverlapped
  209. AsyncFD* = distinct int
  210. PostCallbackData = object
  211. ioPort: Handle
  212. handleFd: AsyncFD
  213. waitFd: Handle
  214. ovl: PCustomOverlapped
  215. PostCallbackDataPtr = ptr PostCallbackData
  216. AsyncEventImpl = object
  217. hEvent: Handle
  218. hWaiter: Handle
  219. pcd: PostCallbackDataPtr
  220. AsyncEvent* = ptr AsyncEventImpl
  221. Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
  222. proc hash(x: AsyncFD): Hash {.borrow.}
  223. proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
  224. proc newDispatcher*(): PDispatcher =
  225. ## Creates a new Dispatcher instance.
  226. new result
  227. result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
  228. result.handles = initSet[AsyncFD]()
  229. result.timers.newHeapQueue()
  230. result.callbacks = initDeque[proc ()](64)
  231. var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
  232. proc setGlobalDispatcher*(disp: PDispatcher) =
  233. if not gDisp.isNil:
  234. assert gDisp.callbacks.len == 0
  235. gDisp = disp
  236. initCallSoonProc()
  237. proc getGlobalDispatcher*(): PDispatcher =
  238. if gDisp.isNil:
  239. setGlobalDispatcher(newDispatcher())
  240. result = gDisp
  241. proc getIoHandler*(disp: PDispatcher): Handle =
  242. ## Returns the underlying IO Completion Port handle (Windows) or selector
  243. ## (Unix) for the specified dispatcher.
  244. return disp.ioPort
  245. proc register*(fd: AsyncFD) =
  246. ## Registers ``fd`` with the dispatcher.
  247. let p = getGlobalDispatcher()
  248. if createIoCompletionPort(fd.Handle, p.ioPort,
  249. cast[CompletionKey](fd), 1) == 0:
  250. raiseOSError(osLastError())
  251. p.handles.incl(fd)
  252. proc verifyPresence(fd: AsyncFD) =
  253. ## Ensures that file descriptor has been registered with the dispatcher.
  254. let p = getGlobalDispatcher()
  255. if fd notin p.handles:
  256. raise newException(ValueError,
  257. "Operation performed on a socket which has not been registered with" &
  258. " the dispatcher yet.")
  259. proc hasPendingOperations*(): bool =
  260. ## Returns `true` if the global dispatcher has pending operations.
  261. let p = getGlobalDispatcher()
  262. p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
  263. proc runOnce(timeout = 500): bool =
  264. let p = getGlobalDispatcher()
  265. if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
  266. raise newException(ValueError,
  267. "No handles or timers registered in dispatcher.")
  268. result = false
  269. let nextTimer = processTimers(p, result)
  270. let at = adjustTimeout(timeout, nextTimer)
  271. var llTimeout =
  272. if at == -1: winlean.INFINITE
  273. else: at.int32
  274. var lpNumberOfBytesTransferred: Dword
  275. var lpCompletionKey: ULONG_PTR
  276. var customOverlapped: PCustomOverlapped
  277. let res = getQueuedCompletionStatus(p.ioPort,
  278. addr lpNumberOfBytesTransferred, addr lpCompletionKey,
  279. cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
  280. result = true
  281. # http://stackoverflow.com/a/12277264/492186
  282. # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
  283. if res:
  284. # This is useful for ensuring the reliability of the overlapped struct.
  285. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  286. customOverlapped.data.cb(customOverlapped.data.fd,
  287. lpNumberOfBytesTransferred, OSErrorCode(-1))
  288. # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
  289. # so we need to dispose our `cb` environment, because it is not needed
  290. # anymore.
  291. if customOverlapped.data.cell.data != nil:
  292. system.dispose(customOverlapped.data.cell)
  293. GC_unref(customOverlapped)
  294. else:
  295. let errCode = osLastError()
  296. if customOverlapped != nil:
  297. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  298. customOverlapped.data.cb(customOverlapped.data.fd,
  299. lpNumberOfBytesTransferred, errCode)
  300. if customOverlapped.data.cell.data != nil:
  301. system.dispose(customOverlapped.data.cell)
  302. GC_unref(customOverlapped)
  303. else:
  304. if errCode.int32 == WAIT_TIMEOUT:
  305. # Timed out
  306. result = false
  307. else: raiseOSError(errCode)
  308. # Timer processing.
  309. discard processTimers(p, result)
  310. # Callback queue processing
  311. processPendingCallbacks(p, result)
  312. var acceptEx: WSAPROC_ACCEPTEX
  313. var connectEx: WSAPROC_CONNECTEX
  314. var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
  315. proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
  316. # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
  317. var bytesRet: Dword
  318. fun = nil
  319. result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
  320. sizeof(GUID).Dword, addr fun, sizeof(pointer).Dword,
  321. addr bytesRet, nil, nil) == 0
  322. proc initAll() =
  323. let dummySock = newNativeSocket()
  324. if dummySock == INVALID_SOCKET:
  325. raiseOSError(osLastError())
  326. var fun: pointer = nil
  327. if not initPointer(dummySock, fun, WSAID_CONNECTEX):
  328. raiseOSError(osLastError())
  329. connectEx = cast[WSAPROC_CONNECTEX](fun)
  330. if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
  331. raiseOSError(osLastError())
  332. acceptEx = cast[WSAPROC_ACCEPTEX](fun)
  333. if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
  334. raiseOSError(osLastError())
  335. getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
  336. close(dummySock)
  337. proc recv*(socket: AsyncFD, size: int,
  338. flags = {SocketFlag.SafeDisconn}): Future[string] =
  339. ## Reads **up to** ``size`` bytes from ``socket``. Returned future will
  340. ## complete once all the data requested is read, a part of the data has been
  341. ## read, or the socket has disconnected in which case the future will
  342. ## complete with a value of ``""``.
  343. ##
  344. ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
  345. # Things to note:
  346. # * When WSARecv completes immediately then ``bytesReceived`` is very
  347. # unreliable.
  348. # * Still need to implement message-oriented socket disconnection,
  349. # '\0' in the message currently signifies a socket disconnect. Who
  350. # knows what will happen when someone sends that to our socket.
  351. verifyPresence(socket)
  352. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  353. var retFuture = newFuture[string]("recv")
  354. var dataBuf: TWSABuf
  355. dataBuf.buf = cast[cstring](alloc0(size))
  356. dataBuf.len = size.ULONG
  357. var bytesReceived: Dword
  358. var flagsio = flags.toOSFlags().Dword
  359. var ol = PCustomOverlapped()
  360. GC_ref(ol)
  361. ol.data = CompletionData(fd: socket, cb:
  362. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  363. if not retFuture.finished:
  364. if errcode == OSErrorCode(-1):
  365. if bytesCount == 0 and dataBuf.buf[0] == '\0':
  366. retFuture.complete("")
  367. else:
  368. var data = newString(bytesCount)
  369. assert bytesCount <= size
  370. copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
  371. retFuture.complete($data)
  372. else:
  373. if flags.isDisconnectionError(errcode):
  374. retFuture.complete("")
  375. else:
  376. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  377. if dataBuf.buf != nil:
  378. dealloc dataBuf.buf
  379. dataBuf.buf = nil
  380. )
  381. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  382. addr flagsio, cast[POVERLAPPED](ol), nil)
  383. if ret == -1:
  384. let err = osLastError()
  385. if err.int32 != ERROR_IO_PENDING:
  386. if dataBuf.buf != nil:
  387. dealloc dataBuf.buf
  388. dataBuf.buf = nil
  389. GC_unref(ol)
  390. if flags.isDisconnectionError(err):
  391. retFuture.complete("")
  392. else:
  393. retFuture.fail(newException(OSError, osErrorMsg(err)))
  394. elif ret == 0:
  395. # Request completed immediately.
  396. if bytesReceived != 0:
  397. var data = newString(bytesReceived)
  398. assert bytesReceived <= size
  399. copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
  400. retFuture.complete($data)
  401. else:
  402. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  403. retFuture.complete("")
  404. return retFuture
  405. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  406. flags = {SocketFlag.SafeDisconn}): Future[int] =
  407. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
  408. ## at least be of that size. Returned future will complete once all the
  409. ## data requested is read, a part of the data has been read, or the socket
  410. ## has disconnected in which case the future will complete with a value of
  411. ## ``0``.
  412. ##
  413. ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
  414. # Things to note:
  415. # * When WSARecv completes immediately then ``bytesReceived`` is very
  416. # unreliable.
  417. # * Still need to implement message-oriented socket disconnection,
  418. # '\0' in the message currently signifies a socket disconnect. Who
  419. # knows what will happen when someone sends that to our socket.
  420. verifyPresence(socket)
  421. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  422. var retFuture = newFuture[int]("recvInto")
  423. #buf[] = '\0'
  424. var dataBuf: TWSABuf
  425. dataBuf.buf = cast[cstring](buf)
  426. dataBuf.len = size.ULONG
  427. var bytesReceived: Dword
  428. var flagsio = flags.toOSFlags().Dword
  429. var ol = PCustomOverlapped()
  430. GC_ref(ol)
  431. ol.data = CompletionData(fd: socket, cb:
  432. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  433. if not retFuture.finished:
  434. if errcode == OSErrorCode(-1):
  435. retFuture.complete(bytesCount)
  436. else:
  437. if flags.isDisconnectionError(errcode):
  438. retFuture.complete(0)
  439. else:
  440. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  441. if dataBuf.buf != nil:
  442. dataBuf.buf = nil
  443. )
  444. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  445. addr flagsio, cast[POVERLAPPED](ol), nil)
  446. if ret == -1:
  447. let err = osLastError()
  448. if err.int32 != ERROR_IO_PENDING:
  449. if dataBuf.buf != nil:
  450. dataBuf.buf = nil
  451. GC_unref(ol)
  452. if flags.isDisconnectionError(err):
  453. retFuture.complete(0)
  454. else:
  455. retFuture.fail(newException(OSError, osErrorMsg(err)))
  456. elif ret == 0:
  457. # Request completed immediately.
  458. if bytesReceived != 0:
  459. assert bytesReceived <= size
  460. retFuture.complete(bytesReceived)
  461. else:
  462. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  463. retFuture.complete(bytesReceived)
  464. return retFuture
  465. proc send*(socket: AsyncFD, buf: pointer, size: int,
  466. flags = {SocketFlag.SafeDisconn}): Future[void] =
  467. ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
  468. ## will complete once all data has been sent.
  469. ##
  470. ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
  471. ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
  472. verifyPresence(socket)
  473. var retFuture = newFuture[void]("send")
  474. var dataBuf: TWSABuf
  475. dataBuf.buf = cast[cstring](buf)
  476. dataBuf.len = size.ULONG
  477. var bytesReceived, lowFlags: Dword
  478. var ol = PCustomOverlapped()
  479. GC_ref(ol)
  480. ol.data = CompletionData(fd: socket, cb:
  481. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  482. if not retFuture.finished:
  483. if errcode == OSErrorCode(-1):
  484. retFuture.complete()
  485. else:
  486. if flags.isDisconnectionError(errcode):
  487. retFuture.complete()
  488. else:
  489. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  490. )
  491. let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  492. lowFlags, cast[POVERLAPPED](ol), nil)
  493. if ret == -1:
  494. let err = osLastError()
  495. if err.int32 != ERROR_IO_PENDING:
  496. GC_unref(ol)
  497. if flags.isDisconnectionError(err):
  498. retFuture.complete()
  499. else:
  500. retFuture.fail(newException(OSError, osErrorMsg(err)))
  501. else:
  502. retFuture.complete()
  503. # We don't deallocate ``ol`` here because even though this completed
  504. # immediately poll will still be notified about its completion and it will
  505. # free ``ol``.
  506. return retFuture
  507. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  508. saddrLen: Socklen,
  509. flags = {SocketFlag.SafeDisconn}): Future[void] =
  510. ## Sends ``data`` to specified destination ``saddr``, using
  511. ## socket ``socket``. The returned future will complete once all data
  512. ## has been sent.
  513. verifyPresence(socket)
  514. var retFuture = newFuture[void]("sendTo")
  515. var dataBuf: TWSABuf
  516. dataBuf.buf = cast[cstring](data)
  517. dataBuf.len = size.ULONG
  518. var bytesSent = 0.Dword
  519. var lowFlags = 0.Dword
  520. # we will preserve address in our stack
  521. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  522. var stalen: cint = cint(saddrLen)
  523. zeroMem(addr(staddr[0]), 128)
  524. copyMem(addr(staddr[0]), saddr, saddrLen)
  525. var ol = PCustomOverlapped()
  526. GC_ref(ol)
  527. ol.data = CompletionData(fd: socket, cb:
  528. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  529. if not retFuture.finished:
  530. if errcode == OSErrorCode(-1):
  531. retFuture.complete()
  532. else:
  533. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  534. )
  535. let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
  536. lowFlags, cast[ptr SockAddr](addr(staddr[0])),
  537. stalen, cast[POVERLAPPED](ol), nil)
  538. if ret == -1:
  539. let err = osLastError()
  540. if err.int32 != ERROR_IO_PENDING:
  541. GC_unref(ol)
  542. retFuture.fail(newException(OSError, osErrorMsg(err)))
  543. else:
  544. retFuture.complete()
  545. # We don't deallocate ``ol`` here because even though this completed
  546. # immediately poll will still be notified about its completion and it will
  547. # free ``ol``.
  548. return retFuture
  549. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  550. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  551. flags = {SocketFlag.SafeDisconn}): Future[int] =
  552. ## Receives a datagram data from ``socket`` into ``buf``, which must
  553. ## be at least of size ``size``, address of datagram's sender will be
  554. ## stored into ``saddr`` and ``saddrLen``. Returned future will complete
  555. ## once one datagram has been received, and will return size of packet
  556. ## received.
  557. verifyPresence(socket)
  558. var retFuture = newFuture[int]("recvFromInto")
  559. var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
  560. var bytesReceived = 0.Dword
  561. var lowFlags = 0.Dword
  562. var ol = PCustomOverlapped()
  563. GC_ref(ol)
  564. ol.data = CompletionData(fd: socket, cb:
  565. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  566. if not retFuture.finished:
  567. if errcode == OSErrorCode(-1):
  568. assert bytesCount <= size
  569. retFuture.complete(bytesCount)
  570. else:
  571. # datagram sockets don't have disconnection,
  572. # so we can just raise an exception
  573. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  574. )
  575. let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
  576. addr bytesReceived, addr lowFlags,
  577. saddr, cast[ptr cint](saddrLen),
  578. cast[POVERLAPPED](ol), nil)
  579. if res == -1:
  580. let err = osLastError()
  581. if err.int32 != ERROR_IO_PENDING:
  582. GC_unref(ol)
  583. retFuture.fail(newException(OSError, osErrorMsg(err)))
  584. else:
  585. # Request completed immediately.
  586. if bytesReceived != 0:
  587. assert bytesReceived <= size
  588. retFuture.complete(bytesReceived)
  589. else:
  590. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  591. retFuture.complete(bytesReceived)
  592. return retFuture
  593. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
  594. Future[tuple[address: string, client: AsyncFD]] =
  595. ## Accepts a new connection. Returns a future containing the client socket
  596. ## corresponding to that connection and the remote address of the client.
  597. ## The future will complete when the connection is successfully accepted.
  598. ##
  599. ## The resulting client socket is automatically registered to the
  600. ## dispatcher.
  601. ##
  602. ## The ``accept`` call may result in an error if the connecting socket
  603. ## disconnects during the duration of the ``accept``. If the ``SafeDisconn``
  604. ## flag is specified then this error will not be raised and instead
  605. ## accept will be called again.
  606. verifyPresence(socket)
  607. var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
  608. var clientSock = newNativeSocket()
  609. if clientSock == osInvalidSocket: raiseOSError(osLastError())
  610. const lpOutputLen = 1024
  611. var lpOutputBuf = newString(lpOutputLen)
  612. var dwBytesReceived: Dword
  613. let dwReceiveDataLength = 0.Dword # We don't want any data to be read.
  614. let dwLocalAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
  615. let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
  616. template failAccept(errcode) =
  617. if flags.isDisconnectionError(errcode):
  618. var newAcceptFut = acceptAddr(socket, flags)
  619. newAcceptFut.callback =
  620. proc () =
  621. if newAcceptFut.failed:
  622. retFuture.fail(newAcceptFut.readError)
  623. else:
  624. retFuture.complete(newAcceptFut.read)
  625. else:
  626. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  627. template completeAccept() {.dirty.} =
  628. var listenSock = socket
  629. let setoptRet = setsockopt(clientSock, SOL_SOCKET,
  630. SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
  631. sizeof(listenSock).SockLen)
  632. if setoptRet != 0:
  633. let errcode = osLastError()
  634. discard clientSock.closeSocket()
  635. failAccept(errcode)
  636. else:
  637. var localSockaddr, remoteSockaddr: ptr SockAddr
  638. var localLen, remoteLen: int32
  639. getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength,
  640. dwLocalAddressLength, dwRemoteAddressLength,
  641. addr localSockaddr, addr localLen,
  642. addr remoteSockaddr, addr remoteLen)
  643. try:
  644. let address = getAddrString(remoteSockAddr)
  645. register(clientSock.AsyncFD)
  646. retFuture.complete((address: address, client: clientSock.AsyncFD))
  647. except:
  648. # getAddrString may raise
  649. clientSock.close()
  650. retFuture.fail(getCurrentException())
  651. var ol = PCustomOverlapped()
  652. GC_ref(ol)
  653. ol.data = CompletionData(fd: socket, cb:
  654. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  655. if not retFuture.finished:
  656. if errcode == OSErrorCode(-1):
  657. completeAccept()
  658. else:
  659. failAccept(errcode)
  660. )
  661. # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
  662. let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
  663. dwReceiveDataLength,
  664. dwLocalAddressLength,
  665. dwRemoteAddressLength,
  666. addr dwBytesReceived, cast[POVERLAPPED](ol))
  667. if not ret:
  668. let err = osLastError()
  669. if err.int32 != ERROR_IO_PENDING:
  670. failAccept(err)
  671. GC_unref(ol)
  672. else:
  673. completeAccept()
  674. # We don't deallocate ``ol`` here because even though this completed
  675. # immediately poll will still be notified about its completion and it will
  676. # free ``ol``.
  677. return retFuture
  678. proc closeSocket*(socket: AsyncFD) =
  679. ## Closes a socket and ensures that it is unregistered.
  680. socket.SocketHandle.close()
  681. getGlobalDispatcher().handles.excl(socket)
  682. proc unregister*(fd: AsyncFD) =
  683. ## Unregisters ``fd``.
  684. getGlobalDispatcher().handles.excl(fd)
  685. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  686. return fd in disp.handles
  687. {.push stackTrace:off.}
  688. proc waitableCallback(param: pointer,
  689. timerOrWaitFired: WINBOOL): void {.stdcall.} =
  690. var p = cast[PostCallbackDataPtr](param)
  691. discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.Dword,
  692. ULONG_PTR(p.handleFd),
  693. cast[pointer](p.ovl))
  694. {.pop.}
  695. proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: Dword) =
  696. let p = getGlobalDispatcher()
  697. var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).Dword
  698. var hEvent = wsaCreateEvent()
  699. if hEvent == 0:
  700. raiseOSError(osLastError())
  701. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  702. pcd.ioPort = p.ioPort
  703. pcd.handleFd = fd
  704. var ol = PCustomOverlapped()
  705. GC_ref(ol)
  706. ol.data = CompletionData(fd: fd, cb:
  707. proc(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  708. # we excluding our `fd` because cb(fd) can register own handler
  709. # for this `fd`
  710. p.handles.excl(fd)
  711. # unregisterWait() is called before callback, because appropriate
  712. # winsockets function can re-enable event.
  713. # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
  714. if unregisterWait(pcd.waitFd) == 0:
  715. let err = osLastError()
  716. if err.int32 != ERROR_IO_PENDING:
  717. deallocShared(cast[pointer](pcd))
  718. discard wsaCloseEvent(hEvent)
  719. raiseOSError(err)
  720. if cb(fd):
  721. # callback returned `true`, so we free all allocated resources
  722. deallocShared(cast[pointer](pcd))
  723. if not wsaCloseEvent(hEvent):
  724. raiseOSError(osLastError())
  725. # pcd.ovl will be unrefed in poll().
  726. else:
  727. # callback returned `false` we need to continue
  728. if p.handles.contains(fd):
  729. # new callback was already registered with `fd`, so we free all
  730. # allocated resources. This happens because in callback `cb`
  731. # addRead/addWrite was called with same `fd`.
  732. deallocShared(cast[pointer](pcd))
  733. if not wsaCloseEvent(hEvent):
  734. raiseOSError(osLastError())
  735. else:
  736. # we need to include `fd` again
  737. p.handles.incl(fd)
  738. # and register WaitForSingleObject again
  739. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  740. cast[WAITORTIMERCALLBACK](waitableCallback),
  741. cast[pointer](pcd), INFINITE, flags):
  742. # pcd.ovl will be unrefed in poll()
  743. let err = osLastError()
  744. deallocShared(cast[pointer](pcd))
  745. discard wsaCloseEvent(hEvent)
  746. raiseOSError(err)
  747. else:
  748. # we incref `pcd.ovl` and `protect` callback one more time,
  749. # because it will be unrefed and disposed in `poll()` after
  750. # callback finishes.
  751. GC_ref(pcd.ovl)
  752. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  753. )
  754. # We need to protect our callback environment value, so GC will not free it
  755. # accidentally.
  756. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  757. # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
  758. # will be signaled when appropriate `mask` events will be triggered.
  759. if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
  760. let err = osLastError()
  761. GC_unref(ol)
  762. deallocShared(cast[pointer](pcd))
  763. discard wsaCloseEvent(hEvent)
  764. raiseOSError(err)
  765. pcd.ovl = ol
  766. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  767. cast[WAITORTIMERCALLBACK](waitableCallback),
  768. cast[pointer](pcd), INFINITE, flags):
  769. let err = osLastError()
  770. GC_unref(ol)
  771. deallocShared(cast[pointer](pcd))
  772. discard wsaCloseEvent(hEvent)
  773. raiseOSError(err)
  774. p.handles.incl(fd)
  775. proc addRead*(fd: AsyncFD, cb: Callback) =
  776. ## Start watching the file descriptor for read availability and then call
  777. ## the callback ``cb``.
  778. ##
  779. ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
  780. ## so if you can avoid it, please do it. Use `addRead` only if really
  781. ## need it (main usecase is adaptation of unix-like libraries to be
  782. ## asynchronous on Windows).
  783. ##
  784. ## If you use this function, you don't need to use asyncdispatch.recv()
  785. ## or asyncdispatch.accept(), because they are using IOCP, please use
  786. ## nativesockets.recv() and nativesockets.accept() instead.
  787. ##
  788. ## Be sure your callback ``cb`` returns ``true``, if you want to remove
  789. ## watch of `read` notifications, and ``false``, if you want to continue
  790. ## receiving notifications.
  791. registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
  792. proc addWrite*(fd: AsyncFD, cb: Callback) =
  793. ## Start watching the file descriptor for write availability and then call
  794. ## the callback ``cb``.
  795. ##
  796. ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
  797. ## so if you can avoid it, please do it. Use `addWrite` only if really
  798. ## need it (main usecase is adaptation of unix-like libraries to be
  799. ## asynchronous on Windows).
  800. ##
  801. ## If you use this function, you don't need to use asyncdispatch.send()
  802. ## or asyncdispatch.connect(), because they are using IOCP, please use
  803. ## nativesockets.send() and nativesockets.connect() instead.
  804. ##
  805. ## Be sure your callback ``cb`` returns ``true``, if you want to remove
  806. ## watch of `write` notifications, and ``false``, if you want to continue
  807. ## receiving notifications.
  808. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
  809. template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
  810. handleCallback) =
  811. let handleFD = AsyncFD(hEvent)
  812. pcd.ioPort = p.ioPort
  813. pcd.handleFd = handleFD
  814. var ol = PCustomOverlapped()
  815. GC_ref(ol)
  816. ol.data.fd = handleFD
  817. ol.data.cb = handleCallback
  818. # We need to protect our callback environment value, so GC will not free it
  819. # accidentally.
  820. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  821. pcd.ovl = ol
  822. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  823. cast[WAITORTIMERCALLBACK](waitableCallback),
  824. cast[pointer](pcd), timeout.Dword, flags):
  825. let err = osLastError()
  826. GC_unref(ol)
  827. deallocShared(cast[pointer](pcd))
  828. discard closeHandle(hEvent)
  829. raiseOSError(err)
  830. p.handles.incl(handleFD)
  831. template closeWaitable(handle: untyped) =
  832. let waitFd = pcd.waitFd
  833. deallocShared(cast[pointer](pcd))
  834. p.handles.excl(fd)
  835. if unregisterWait(waitFd) == 0:
  836. let err = osLastError()
  837. if err.int32 != ERROR_IO_PENDING:
  838. discard closeHandle(handle)
  839. raiseOSError(err)
  840. if closeHandle(handle) == 0:
  841. raiseOSError(osLastError())
  842. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  843. ## Registers callback ``cb`` to be called when timer expired.
  844. ##
  845. ## Parameters:
  846. ##
  847. ## * ``timeout`` - timeout value in milliseconds.
  848. ## * ``oneshot``
  849. ## * `true` - generate only one timeout event
  850. ## * `false` - generate timeout events periodically
  851. doAssert(timeout > 0)
  852. let p = getGlobalDispatcher()
  853. var hEvent = createEvent(nil, 1, 0, nil)
  854. if hEvent == INVALID_HANDLE_VALUE:
  855. raiseOSError(osLastError())
  856. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  857. var flags = WT_EXECUTEINWAITTHREAD.Dword
  858. if oneshot: flags = flags or WT_EXECUTEONLYONCE
  859. proc timercb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  860. let res = cb(fd)
  861. if res or oneshot:
  862. closeWaitable(hEvent)
  863. else:
  864. # if callback returned `false`, then it wants to be called again, so
  865. # we need to ref and protect `pcd.ovl` again, because it will be
  866. # unrefed and disposed in `poll()`.
  867. GC_ref(pcd.ovl)
  868. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  869. registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
  870. proc addProcess*(pid: int, cb: Callback) =
  871. ## Registers callback ``cb`` to be called when process with process ID
  872. ## ``pid`` exited.
  873. let p = getGlobalDispatcher()
  874. let procFlags = SYNCHRONIZE
  875. var hProcess = openProcess(procFlags, 0, pid.Dword)
  876. if hProcess == INVALID_HANDLE_VALUE:
  877. raiseOSError(osLastError())
  878. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  879. var flags = WT_EXECUTEINWAITTHREAD.Dword
  880. proc proccb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  881. closeWaitable(hProcess)
  882. discard cb(fd)
  883. registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
  884. proc newAsyncEvent*(): AsyncEvent =
  885. ## Creates a new thread-safe ``AsyncEvent`` object.
  886. ##
  887. ## New ``AsyncEvent`` object is not automatically registered with
  888. ## dispatcher like ``AsyncSocket``.
  889. var sa = SECURITY_ATTRIBUTES(
  890. nLength: sizeof(SECURITY_ATTRIBUTES).cint,
  891. bInheritHandle: 1
  892. )
  893. var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
  894. if event == INVALID_HANDLE_VALUE:
  895. raiseOSError(osLastError())
  896. result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
  897. result.hEvent = event
  898. proc trigger*(ev: AsyncEvent) =
  899. ## Set event ``ev`` to signaled state.
  900. if setEvent(ev.hEvent) == 0:
  901. raiseOSError(osLastError())
  902. proc unregister*(ev: AsyncEvent) =
  903. ## Unregisters event ``ev``.
  904. doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
  905. let p = getGlobalDispatcher()
  906. p.handles.excl(AsyncFD(ev.hEvent))
  907. if unregisterWait(ev.hWaiter) == 0:
  908. let err = osLastError()
  909. if err.int32 != ERROR_IO_PENDING:
  910. raiseOSError(err)
  911. ev.hWaiter = 0
  912. proc close*(ev: AsyncEvent) =
  913. ## Closes event ``ev``.
  914. let res = closeHandle(ev.hEvent)
  915. deallocShared(cast[pointer](ev))
  916. if res == 0:
  917. raiseOSError(osLastError())
  918. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  919. ## Registers callback ``cb`` to be called when ``ev`` will be signaled
  920. doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
  921. let p = getGlobalDispatcher()
  922. let hEvent = ev.hEvent
  923. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  924. var flags = WT_EXECUTEINWAITTHREAD.Dword
  925. proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  926. if ev.hWaiter != 0:
  927. if cb(fd):
  928. # we need this check to avoid exception, if `unregister(event)` was
  929. # called in callback.
  930. deallocShared(cast[pointer](pcd))
  931. if ev.hWaiter != 0:
  932. unregister(ev)
  933. else:
  934. # if callback returned `false`, then it wants to be called again, so
  935. # we need to ref and protect `pcd.ovl` again, because it will be
  936. # unrefed and disposed in `poll()`.
  937. GC_ref(pcd.ovl)
  938. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  939. else:
  940. # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
  941. deallocShared(cast[pointer](pcd))
  942. registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
  943. ev.hWaiter = pcd.waitFd
  944. initAll()
  945. else:
  946. import selectors
  947. from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
  948. MSG_NOSIGNAL
  949. const
  950. InitCallbackListSize = 4 # initial size of callbacks sequence,
  951. # associated with file/socket descriptor.
  952. InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
  953. # queue.
  954. type
  955. AsyncFD* = distinct cint
  956. Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
  957. AsyncData = object
  958. readList: seq[Callback]
  959. writeList: seq[Callback]
  960. AsyncEvent* = distinct SelectEvent
  961. PDispatcher* = ref object of PDispatcherBase
  962. selector: Selector[AsyncData]
  963. proc `==`*(x, y: AsyncFD): bool {.borrow.}
  964. proc `==`*(x, y: AsyncEvent): bool {.borrow.}
  965. template newAsyncData(): AsyncData =
  966. AsyncData(
  967. readList: newSeqOfCap[Callback](InitCallbackListSize),
  968. writeList: newSeqOfCap[Callback](InitCallbackListSize)
  969. )
  970. proc newDispatcher*(): PDispatcher =
  971. new result
  972. result.selector = newSelector[AsyncData]()
  973. result.timers.newHeapQueue()
  974. result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
  975. var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
  976. proc setGlobalDispatcher*(disp: PDispatcher) =
  977. if not gDisp.isNil:
  978. assert gDisp.callbacks.len == 0
  979. gDisp = disp
  980. initCallSoonProc()
  981. proc getGlobalDispatcher*(): PDispatcher =
  982. if gDisp.isNil:
  983. setGlobalDispatcher(newDispatcher())
  984. result = gDisp
  985. proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
  986. return disp.selector
  987. proc register*(fd: AsyncFD) =
  988. let p = getGlobalDispatcher()
  989. var data = newAsyncData()
  990. p.selector.registerHandle(fd.SocketHandle, {}, data)
  991. proc closeSocket*(sock: AsyncFD) =
  992. let disp = getGlobalDispatcher()
  993. disp.selector.unregister(sock.SocketHandle)
  994. sock.SocketHandle.close()
  995. proc unregister*(fd: AsyncFD) =
  996. getGlobalDispatcher().selector.unregister(fd.SocketHandle)
  997. proc unregister*(ev: AsyncEvent) =
  998. getGlobalDispatcher().selector.unregister(SelectEvent(ev))
  999. proc contains*(disp: PDispatcher, fd: AsyncFd): bool =
  1000. return fd.SocketHandle in disp.selector
  1001. proc addRead*(fd: AsyncFD, cb: Callback) =
  1002. let p = getGlobalDispatcher()
  1003. var newEvents = {Event.Read}
  1004. withData(p.selector, fd.SocketHandle, adata) do:
  1005. adata.readList.add(cb)
  1006. newEvents.incl(Event.Read)
  1007. if len(adata.writeList) != 0: newEvents.incl(Event.Write)
  1008. do:
  1009. raise newException(ValueError, "File descriptor not registered.")
  1010. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1011. proc addWrite*(fd: AsyncFD, cb: Callback) =
  1012. let p = getGlobalDispatcher()
  1013. var newEvents = {Event.Write}
  1014. withData(p.selector, fd.SocketHandle, adata) do:
  1015. adata.writeList.add(cb)
  1016. newEvents.incl(Event.Write)
  1017. if len(adata.readList) != 0: newEvents.incl(Event.Read)
  1018. do:
  1019. raise newException(ValueError, "File descriptor not registered.")
  1020. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1021. proc hasPendingOperations*(): bool =
  1022. let p = getGlobalDispatcher()
  1023. not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
  1024. template processBasicCallbacks(ident, rwlist: untyped) =
  1025. # Process pending descriptor and AsyncEvent callbacks.
  1026. #
  1027. # Invoke every callback stored in `rwlist`, until one
  1028. # returns `false` (which means callback wants to stay
  1029. # alive). In such case all remaining callbacks will be added
  1030. # to `rwlist` again, in the order they have been inserted.
  1031. #
  1032. # `rwlist` associated with file descriptor MUST BE emptied before
  1033. # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
  1034. # or it can be possible to fall into endless cycle.
  1035. var curList: seq[Callback]
  1036. withData(p.selector, ident, adata) do:
  1037. shallowCopy(curList, adata.rwlist)
  1038. adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize)
  1039. let newLength = max(len(curList), InitCallbackListSize)
  1040. var newList = newSeqOfCap[Callback](newLength)
  1041. for cb in curList:
  1042. if len(newList) > 0:
  1043. # A callback has already returned with EAGAIN, don't call any others
  1044. # until next `poll`.
  1045. newList.add(cb)
  1046. else:
  1047. if not cb(fd.AsyncFD):
  1048. # Callback wants to be called again.
  1049. newList.add(cb)
  1050. withData(p.selector, ident, adata) do:
  1051. # descriptor still present in queue.
  1052. adata.rwlist = newList & adata.rwlist
  1053. rLength = len(adata.readList)
  1054. wLength = len(adata.writeList)
  1055. do:
  1056. # descriptor was unregistered in callback via `unregister()`.
  1057. rLength = -1
  1058. wLength = -1
  1059. template processCustomCallbacks(ident: untyped) =
  1060. # Process pending custom event callbacks. Custom events are
  1061. # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
  1062. # There can be only one callback registered with one descriptor,
  1063. # so there is no need to iterate over list.
  1064. var curList: seq[Callback]
  1065. withData(p.selector, ident, adata) do:
  1066. shallowCopy(curList, adata.readList)
  1067. adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1068. let newLength = len(curList)
  1069. var newList = newSeqOfCap[Callback](newLength)
  1070. var cb = curList[0]
  1071. if not cb(fd.AsyncFD):
  1072. newList.add(cb)
  1073. withData(p.selector, ident, adata) do:
  1074. # descriptor still present in queue.
  1075. adata.readList = newList & adata.readList
  1076. if len(adata.readList) == 0:
  1077. # if no callbacks registered with descriptor, unregister it.
  1078. p.selector.unregister(fd)
  1079. do:
  1080. # descriptor was unregistered in callback via `unregister()`.
  1081. discard
  1082. proc runOnce(timeout = 500): bool =
  1083. let p = getGlobalDispatcher()
  1084. when ioselSupportedPlatform:
  1085. let customSet = {Event.Timer, Event.Signal, Event.Process,
  1086. Event.Vnode}
  1087. if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
  1088. raise newException(ValueError,
  1089. "No handles or timers registered in dispatcher.")
  1090. result = false
  1091. var keys: array[64, ReadyKey]
  1092. let nextTimer = processTimers(p, result)
  1093. var count = p.selector.selectInto(adjustTimeout(timeout, nextTimer), keys)
  1094. for i in 0..<count:
  1095. var custom = false
  1096. let fd = keys[i].fd
  1097. let events = keys[i].events
  1098. var rLength = 0 # len(data.readList) after callback
  1099. var wLength = 0 # len(data.writeList) after callback
  1100. if Event.Read in events or events == {Event.Error}:
  1101. processBasicCallbacks(fd, readList)
  1102. result = true
  1103. if Event.Write in events or events == {Event.Error}:
  1104. processBasicCallbacks(fd, writeList)
  1105. result = true
  1106. if Event.User in events:
  1107. processBasicCallbacks(fd, readList)
  1108. custom = true
  1109. if rLength == 0:
  1110. p.selector.unregister(fd)
  1111. result = true
  1112. when ioselSupportedPlatform:
  1113. if (customSet * events) != {}:
  1114. custom = true
  1115. processCustomCallbacks(fd)
  1116. result = true
  1117. # because state `data` can be modified in callback we need to update
  1118. # descriptor events with currently registered callbacks.
  1119. if not custom:
  1120. var newEvents: set[Event] = {}
  1121. if rLength != -1 and wLength != -1:
  1122. if rLength > 0: incl(newEvents, Event.Read)
  1123. if wLength > 0: incl(newEvents, Event.Write)
  1124. p.selector.updateHandle(SocketHandle(fd), newEvents)
  1125. # Timer processing.
  1126. discard processTimers(p, result)
  1127. # Callback queue processing
  1128. processPendingCallbacks(p, result)
  1129. proc recv*(socket: AsyncFD, size: int,
  1130. flags = {SocketFlag.SafeDisconn}): Future[string] =
  1131. var retFuture = newFuture[string]("recv")
  1132. var readBuffer = newString(size)
  1133. proc cb(sock: AsyncFD): bool =
  1134. result = true
  1135. let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
  1136. flags.toOSFlags())
  1137. if res < 0:
  1138. let lastError = osLastError()
  1139. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1140. if flags.isDisconnectionError(lastError):
  1141. retFuture.complete("")
  1142. else:
  1143. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1144. else:
  1145. result = false # We still want this callback to be called.
  1146. elif res == 0:
  1147. # Disconnected
  1148. retFuture.complete("")
  1149. else:
  1150. readBuffer.setLen(res)
  1151. retFuture.complete(readBuffer)
  1152. # TODO: The following causes a massive slowdown.
  1153. #if not cb(socket):
  1154. addRead(socket, cb)
  1155. return retFuture
  1156. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  1157. flags = {SocketFlag.SafeDisconn}): Future[int] =
  1158. var retFuture = newFuture[int]("recvInto")
  1159. proc cb(sock: AsyncFD): bool =
  1160. result = true
  1161. let res = recv(sock.SocketHandle, buf, size.cint,
  1162. flags.toOSFlags())
  1163. if res < 0:
  1164. let lastError = osLastError()
  1165. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1166. if flags.isDisconnectionError(lastError):
  1167. retFuture.complete(0)
  1168. else:
  1169. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1170. else:
  1171. result = false # We still want this callback to be called.
  1172. else:
  1173. retFuture.complete(res)
  1174. # TODO: The following causes a massive slowdown.
  1175. #if not cb(socket):
  1176. addRead(socket, cb)
  1177. return retFuture
  1178. proc send*(socket: AsyncFD, buf: pointer, size: int,
  1179. flags = {SocketFlag.SafeDisconn}): Future[void] =
  1180. var retFuture = newFuture[void]("send")
  1181. var written = 0
  1182. proc cb(sock: AsyncFD): bool =
  1183. result = true
  1184. let netSize = size-written
  1185. var d = cast[cstring](buf)
  1186. let res = send(sock.SocketHandle, addr d[written], netSize.cint,
  1187. MSG_NOSIGNAL)
  1188. if res < 0:
  1189. let lastError = osLastError()
  1190. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1191. if flags.isDisconnectionError(lastError):
  1192. retFuture.complete()
  1193. else:
  1194. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1195. else:
  1196. result = false # We still want this callback to be called.
  1197. else:
  1198. written.inc(res)
  1199. if res != netSize:
  1200. result = false # We still have data to send.
  1201. else:
  1202. retFuture.complete()
  1203. # TODO: The following causes crashes.
  1204. #if not cb(socket):
  1205. addWrite(socket, cb)
  1206. return retFuture
  1207. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  1208. saddrLen: SockLen,
  1209. flags = {SocketFlag.SafeDisconn}): Future[void] =
  1210. ## Sends ``data`` of size ``size`` in bytes to specified destination
  1211. ## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``.
  1212. ## The returned future will complete once all data has been sent.
  1213. var retFuture = newFuture[void]("sendTo")
  1214. # we will preserve address in our stack
  1215. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  1216. var stalen = saddrLen
  1217. zeroMem(addr(staddr[0]), 128)
  1218. copyMem(addr(staddr[0]), saddr, saddrLen)
  1219. proc cb(sock: AsyncFD): bool =
  1220. result = true
  1221. let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
  1222. cast[ptr SockAddr](addr(staddr[0])), stalen)
  1223. if res < 0:
  1224. let lastError = osLastError()
  1225. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1226. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1227. else:
  1228. result = false # We still want this callback to be called.
  1229. else:
  1230. retFuture.complete()
  1231. addWrite(socket, cb)
  1232. return retFuture
  1233. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  1234. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  1235. flags = {SocketFlag.SafeDisconn}): Future[int] =
  1236. ## Receives a datagram data from ``socket`` into ``data``, which must
  1237. ## be at least of size ``size`` in bytes, address of datagram's sender
  1238. ## will be stored into ``saddr`` and ``saddrLen``. Returned future will
  1239. ## complete once one datagram has been received, and will return size
  1240. ## of packet received.
  1241. var retFuture = newFuture[int]("recvFromInto")
  1242. proc cb(sock: AsyncFD): bool =
  1243. result = true
  1244. let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
  1245. saddr, saddrLen)
  1246. if res < 0:
  1247. let lastError = osLastError()
  1248. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1249. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1250. else:
  1251. result = false
  1252. else:
  1253. retFuture.complete(res)
  1254. addRead(socket, cb)
  1255. return retFuture
  1256. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
  1257. Future[tuple[address: string, client: AsyncFD]] =
  1258. var retFuture = newFuture[tuple[address: string,
  1259. client: AsyncFD]]("acceptAddr")
  1260. proc cb(sock: AsyncFD): bool =
  1261. result = true
  1262. var sockAddress: Sockaddr_storage
  1263. var addrLen = sizeof(sockAddress).Socklen
  1264. var client = accept(sock.SocketHandle,
  1265. cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
  1266. if client == osInvalidSocket:
  1267. let lastError = osLastError()
  1268. assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
  1269. if lastError.int32 == EINTR:
  1270. return false
  1271. else:
  1272. if flags.isDisconnectionError(lastError):
  1273. return false
  1274. else:
  1275. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1276. else:
  1277. try:
  1278. let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
  1279. register(client.AsyncFD)
  1280. retFuture.complete((address, client.AsyncFD))
  1281. except:
  1282. # getAddrString may raise
  1283. client.close()
  1284. retFuture.fail(getCurrentException())
  1285. addRead(socket, cb)
  1286. return retFuture
  1287. when ioselSupportedPlatform:
  1288. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  1289. ## Start watching for timeout expiration, and then call the
  1290. ## callback ``cb``.
  1291. ## ``timeout`` - time in milliseconds,
  1292. ## ``oneshot`` - if ``true`` only one event will be dispatched,
  1293. ## if ``false`` continuous events every ``timeout`` milliseconds.
  1294. let p = getGlobalDispatcher()
  1295. var data = newAsyncData()
  1296. data.readList.add(cb)
  1297. p.selector.registerTimer(timeout, oneshot, data)
  1298. proc addSignal*(signal: int, cb: Callback) =
  1299. ## Start watching signal ``signal``, and when signal appears, call the
  1300. ## callback ``cb``.
  1301. let p = getGlobalDispatcher()
  1302. var data = newAsyncData()
  1303. data.readList.add(cb)
  1304. p.selector.registerSignal(signal, data)
  1305. proc addProcess*(pid: int, cb: Callback) =
  1306. ## Start watching for process exit with pid ``pid``, and then call
  1307. ## the callback ``cb``.
  1308. let p = getGlobalDispatcher()
  1309. var data = newAsyncData()
  1310. data.readList.add(cb)
  1311. p.selector.registerProcess(pid, data)
  1312. proc newAsyncEvent*(): AsyncEvent =
  1313. ## Creates new ``AsyncEvent``.
  1314. result = AsyncEvent(newSelectEvent())
  1315. proc trigger*(ev: AsyncEvent) =
  1316. ## Sets new ``AsyncEvent`` to signaled state.
  1317. trigger(SelectEvent(ev))
  1318. proc close*(ev: AsyncEvent) =
  1319. ## Closes ``AsyncEvent``
  1320. close(SelectEvent(ev))
  1321. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  1322. ## Start watching for event ``ev``, and call callback ``cb``, when
  1323. ## ev will be set to signaled state.
  1324. let p = getGlobalDispatcher()
  1325. var data = newAsyncData()
  1326. data.readList.add(cb)
  1327. p.selector.registerEvent(SelectEvent(ev), data)
  1328. proc drain*(timeout = 500) =
  1329. ## Waits for completion events and processes them. Raises ``ValueError``
  1330. ## if there are no pending operations. In contrast to ``poll`` this
  1331. ## processes as many events as are available.
  1332. if runOnce(timeout):
  1333. while hasPendingOperations() and runOnce(0): discard
  1334. proc poll*(timeout = 500) =
  1335. ## Waits for completion events and processes them. Raises ``ValueError``
  1336. ## if there are no pending operations. This runs the underlying OS
  1337. ## `epoll`:idx: or `kqueue`:idx: primitive only once.
  1338. discard runOnce(timeout)
  1339. template createAsyncNativeSocketImpl(domain, sockType, protocol) =
  1340. let handle = newNativeSocket(domain, sockType, protocol)
  1341. if handle == osInvalidSocket:
  1342. return osInvalidSocket.AsyncFD
  1343. handle.setBlocking(false)
  1344. when defined(macosx) and not defined(nimdoc):
  1345. handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
  1346. result = handle.AsyncFD
  1347. register(result)
  1348. proc createAsyncNativeSocket*(domain: cint, sockType: cint,
  1349. protocol: cint): AsyncFD =
  1350. createAsyncNativeSocketImpl(domain, sockType, protocol)
  1351. proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
  1352. sockType: SockType = SOCK_STREAM,
  1353. protocol: Protocol = IPPROTO_TCP): AsyncFD =
  1354. createAsyncNativeSocketImpl(domain, sockType, protocol)
  1355. proc newAsyncNativeSocket*(domain: cint, sockType: cint,
  1356. protocol: cint): AsyncFD {.deprecated: "use createAsyncNativeSocket instead".} =
  1357. createAsyncNativeSocketImpl(domain, sockType, protocol)
  1358. proc newAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
  1359. sockType: SockType = SOCK_STREAM,
  1360. protocol: Protocol = IPPROTO_TCP): AsyncFD
  1361. {.deprecated: "use createAsyncNativeSocket instead".} =
  1362. createAsyncNativeSocketImpl(domain, sockType, protocol)
  1363. when defined(windows) or defined(nimdoc):
  1364. proc bindToDomain(handle: SocketHandle, domain: Domain) =
  1365. # Extracted into a separate proc, because connect() on Windows requires
  1366. # the socket to be initially bound.
  1367. template doBind(saddr) =
  1368. if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
  1369. sizeof(saddr).SockLen) < 0'i32:
  1370. raiseOSError(osLastError())
  1371. if domain == Domain.AF_INET6:
  1372. var saddr: Sockaddr_in6
  1373. saddr.sin6_family = uint16(toInt(domain))
  1374. doBind(saddr)
  1375. else:
  1376. var saddr: Sockaddr_in
  1377. saddr.sin_family = uint16(toInt(domain))
  1378. doBind(saddr)
  1379. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): Future[void] =
  1380. let retFuture = newFuture[void]("doConnect")
  1381. result = retFuture
  1382. var ol = PCustomOverlapped()
  1383. GC_ref(ol)
  1384. ol.data = CompletionData(fd: socket, cb:
  1385. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  1386. if not retFuture.finished:
  1387. if errcode == OSErrorCode(-1):
  1388. retFuture.complete()
  1389. else:
  1390. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  1391. )
  1392. let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
  1393. cint(addrInfo.ai_addrlen), nil, 0, nil,
  1394. cast[POVERLAPPED](ol))
  1395. if ret:
  1396. # Request to connect completed immediately.
  1397. retFuture.complete()
  1398. # We don't deallocate ``ol`` here because even though this completed
  1399. # immediately poll will still be notified about its completion and it
  1400. # will free ``ol``.
  1401. else:
  1402. let lastError = osLastError()
  1403. if lastError.int32 != ERROR_IO_PENDING:
  1404. # With ERROR_IO_PENDING ``ol`` will be deallocated in ``poll``,
  1405. # and the future will be completed/failed there, too.
  1406. GC_unref(ol)
  1407. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1408. else:
  1409. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): Future[void] =
  1410. let retFuture = newFuture[void]("doConnect")
  1411. result = retFuture
  1412. proc cb(fd: AsyncFD): bool =
  1413. let ret = SocketHandle(fd).getSockOptInt(
  1414. cint(SOL_SOCKET), cint(SO_ERROR))
  1415. if ret == 0:
  1416. # We have connected.
  1417. retFuture.complete()
  1418. return true
  1419. elif ret == EINTR:
  1420. # interrupted, keep waiting
  1421. return false
  1422. else:
  1423. retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
  1424. return true
  1425. let ret = connect(socket.SocketHandle,
  1426. addrInfo.ai_addr,
  1427. addrInfo.ai_addrlen.Socklen)
  1428. if ret == 0:
  1429. # Request to connect completed immediately.
  1430. retFuture.complete()
  1431. else:
  1432. let lastError = osLastError()
  1433. if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
  1434. addWrite(socket, cb)
  1435. else:
  1436. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1437. template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
  1438. protocol: Protocol = IPPROTO_RAW) =
  1439. ## Iterates through the AddrInfo linked list asynchronously
  1440. ## until the connection can be established.
  1441. const shouldCreateFd = not declared(fd)
  1442. when shouldCreateFd:
  1443. let sockType = protocol.toSockType()
  1444. var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
  1445. for i in low(fdPerDomain)..high(fdPerDomain):
  1446. fdPerDomain[i] = osInvalidSocket.AsyncFD
  1447. template closeUnusedFds(domainToKeep = -1) {.dirty.} =
  1448. for i, fd in fdPerDomain:
  1449. if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
  1450. fd.closeSocket()
  1451. var lastException: ref Exception
  1452. var curAddrInfo = addrInfo
  1453. var domain: Domain
  1454. when shouldCreateFd:
  1455. var curFd: AsyncFD
  1456. else:
  1457. var curFd = fd
  1458. proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
  1459. if fut == nil or fut.failed:
  1460. if fut != nil:
  1461. lastException = fut.readError()
  1462. while curAddrInfo != nil:
  1463. let domainOpt = curAddrInfo.ai_family.toKnownDomain()
  1464. if domainOpt.isSome:
  1465. domain = domainOpt.unsafeGet()
  1466. break
  1467. curAddrInfo = curAddrInfo.ai_next
  1468. if curAddrInfo == nil:
  1469. freeAddrInfo(addrInfo)
  1470. when shouldCreateFd:
  1471. closeUnusedFds()
  1472. if lastException != nil:
  1473. retFuture.fail(lastException)
  1474. else:
  1475. retFuture.fail(newException(
  1476. IOError, "Couldn't resolve address: " & address))
  1477. return
  1478. when shouldCreateFd:
  1479. curFd = fdPerDomain[ord(domain)]
  1480. if curFd == osInvalidSocket.AsyncFD:
  1481. try:
  1482. curFd = newAsyncNativeSocket(domain, sockType, protocol)
  1483. except:
  1484. freeAddrInfo(addrInfo)
  1485. closeUnusedFds()
  1486. raise getCurrentException()
  1487. when defined(windows):
  1488. curFd.SocketHandle.bindToDomain(domain)
  1489. fdPerDomain[ord(domain)] = curFd
  1490. doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
  1491. curAddrInfo = curAddrInfo.ai_next
  1492. else:
  1493. freeAddrInfo(addrInfo)
  1494. when shouldCreateFd:
  1495. closeUnusedFds(ord(domain))
  1496. retFuture.complete(curFd)
  1497. else:
  1498. retFuture.complete()
  1499. tryNextAddrInfo(nil)
  1500. proc dial*(address: string, port: Port,
  1501. protocol: Protocol = IPPROTO_TCP): Future[AsyncFD] =
  1502. ## Establishes connection to the specified ``address``:``port`` pair via the
  1503. ## specified protocol. The procedure iterates through possible
  1504. ## resolutions of the ``address`` until it succeeds, meaning that it
  1505. ## seamlessly works with both IPv4 and IPv6.
  1506. ## Returns the async file descriptor, registered in the dispatcher of
  1507. ## the current thread, ready to send or receive data.
  1508. let retFuture = newFuture[AsyncFD]("dial")
  1509. result = retFuture
  1510. let sockType = protocol.toSockType()
  1511. let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
  1512. asyncAddrInfoLoop(aiList, noFD, protocol)
  1513. proc connect*(socket: AsyncFD, address: string, port: Port,
  1514. domain = Domain.AF_INET): Future[void] =
  1515. let retFuture = newFuture[void]("connect")
  1516. result = retFuture
  1517. when defined(windows):
  1518. verifyPresence(socket)
  1519. else:
  1520. assert getSockDomain(socket.SocketHandle) == domain
  1521. let aiList = getAddrInfo(address, port, domain)
  1522. when defined(windows):
  1523. socket.SocketHandle.bindToDomain(domain)
  1524. asyncAddrInfoLoop(aiList, socket)
  1525. proc sleepAsync*(ms: int | float): Future[void] =
  1526. ## Suspends the execution of the current async procedure for the next
  1527. ## ``ms`` milliseconds.
  1528. var retFuture = newFuture[void]("sleepAsync")
  1529. let p = getGlobalDispatcher()
  1530. p.timers.push((epochTime() + (ms / 1000), retFuture))
  1531. return retFuture
  1532. proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] =
  1533. ## Returns a future which will complete once ``fut`` completes or after
  1534. ## ``timeout`` milliseconds has elapsed.
  1535. ##
  1536. ## If ``fut`` completes first the returned future will hold true,
  1537. ## otherwise, if ``timeout`` milliseconds has elapsed first, the returned
  1538. ## future will hold false.
  1539. var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
  1540. var timeoutFuture = sleepAsync(timeout)
  1541. fut.callback =
  1542. proc () =
  1543. if not retFuture.finished:
  1544. if fut.failed:
  1545. retFuture.fail(fut.error)
  1546. else:
  1547. retFuture.complete(true)
  1548. timeoutFuture.callback =
  1549. proc () =
  1550. if not retFuture.finished: retFuture.complete(false)
  1551. return retFuture
  1552. proc accept*(socket: AsyncFD,
  1553. flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
  1554. ## Accepts a new connection. Returns a future containing the client socket
  1555. ## corresponding to that connection.
  1556. ## The future will complete when the connection is successfully accepted.
  1557. var retFut = newFuture[AsyncFD]("accept")
  1558. var fut = acceptAddr(socket, flags)
  1559. fut.callback =
  1560. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  1561. assert future.finished
  1562. if future.failed:
  1563. retFut.fail(future.error)
  1564. else:
  1565. retFut.complete(future.read.client)
  1566. return retFut
  1567. proc send*(socket: AsyncFD, data: string,
  1568. flags = {SocketFlag.SafeDisconn}): Future[void] =
  1569. ## Sends ``data`` to ``socket``. The returned future will complete once all
  1570. ## data has been sent.
  1571. var retFuture = newFuture[void]("send")
  1572. var copiedData = data
  1573. GC_ref(copiedData) # we need to protect data until send operation is completed
  1574. # or failed.
  1575. let sendFut = socket.send(addr copiedData[0], data.len, flags)
  1576. sendFut.callback =
  1577. proc () =
  1578. GC_unref(copiedData)
  1579. if sendFut.failed:
  1580. retFuture.fail(sendFut.error)
  1581. else:
  1582. retFuture.complete()
  1583. return retFuture
  1584. # -- Await Macro
  1585. include asyncmacro
  1586. proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
  1587. ## Returns a future that will complete when all the string data from the
  1588. ## specified future stream is retrieved.
  1589. result = ""
  1590. while true:
  1591. let (hasValue, value) = await future.read()
  1592. if hasValue:
  1593. result.add(value)
  1594. else:
  1595. break
  1596. proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
  1597. ## Reads a line of data from ``socket``. Returned future will complete once
  1598. ## a full line is read or an error occurs.
  1599. ##
  1600. ## If a full line is read ``\r\L`` is not
  1601. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  1602. ## will be set to it.
  1603. ##
  1604. ## If the socket is disconnected, ``line`` will be set to ``""``.
  1605. ##
  1606. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  1607. ## is read) then line will be set to ``""``.
  1608. ## The partial line **will be lost**.
  1609. ##
  1610. ## **Warning**: This assumes that lines are delimited by ``\r\L``.
  1611. ##
  1612. ## **Note**: This procedure is mostly used for testing. You likely want to
  1613. ## use ``asyncnet.recvLine`` instead.
  1614. ##
  1615. ## **Deprecated since version 0.15.0**: Use ``asyncnet.recvLine()`` instead.
  1616. template addNLIfEmpty(): typed =
  1617. if result.len == 0:
  1618. result.add("\c\L")
  1619. result = ""
  1620. var c = ""
  1621. while true:
  1622. c = await recv(socket, 1)
  1623. if c.len == 0:
  1624. return ""
  1625. if c == "\r":
  1626. c = await recv(socket, 1)
  1627. assert c == "\l"
  1628. addNLIfEmpty()
  1629. return
  1630. elif c == "\L":
  1631. addNLIfEmpty()
  1632. return
  1633. add(result, c)
  1634. proc callSoon*(cbproc: proc ()) =
  1635. ## Schedule `cbproc` to be called as soon as possible.
  1636. ## The callback is called when control returns to the event loop.
  1637. getGlobalDispatcher().callbacks.addLast(cbproc)
  1638. proc runForever*() =
  1639. ## Begins a never ending global dispatcher poll loop.
  1640. while true:
  1641. poll()
  1642. proc waitFor*[T](fut: Future[T]): T =
  1643. ## **Blocks** the current thread until the specified future completes.
  1644. while not fut.finished:
  1645. poll()
  1646. fut.read
  1647. proc setEvent*(ev: AsyncEvent) {.deprecated.} =
  1648. ## Set event ``ev`` to signaled state.
  1649. ##
  1650. ## **Deprecated since v0.18.0:** Use ``trigger`` instead.
  1651. ev.trigger()