asyncdispatch.nim 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. include "system/inclrtl"
  10. import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
  11. import options, math
  12. import asyncfutures except callSoon
  13. import nativesockets, net, deques
  14. export Port, SocketFlag
  15. export asyncfutures, asyncstreams
  16. #{.injectStmt: newGcInvariant().}
  17. ## AsyncDispatch
  18. ## *************
  19. ##
  20. ## This module implements asynchronous IO. This includes a dispatcher,
  21. ## a ``Future`` type implementation, and an ``async`` macro which allows
  22. ## asynchronous code to be written in a synchronous style with the ``await``
  23. ## keyword.
  24. ##
  25. ## The dispatcher acts as a kind of event loop. You must call ``poll`` on it
  26. ## (or a function which does so for you such as ``waitFor`` or ``runForever``)
  27. ## in order to poll for any outstanding events. The underlying implementation
  28. ## is based on epoll on Linux, IO Completion Ports on Windows and select on
  29. ## other operating systems.
  30. ##
  31. ## The ``poll`` function will not, on its own, return any events. Instead
  32. ## an appropriate ``Future`` object will be completed. A ``Future`` is a
  33. ## type which holds a value which is not yet available, but which *may* be
  34. ## available in the future. You can check whether a future is finished
  35. ## by using the ``finished`` function. When a future is finished it means that
  36. ## either the value that it holds is now available or it holds an error instead.
  37. ## The latter situation occurs when the operation to complete a future fails
  38. ## with an exception. You can distinguish between the two situations with the
  39. ## ``failed`` function.
  40. ##
  41. ## Future objects can also store a callback procedure which will be called
  42. ## automatically once the future completes.
  43. ##
  44. ## Futures therefore can be thought of as an implementation of the proactor
  45. ## pattern. In this
  46. ## pattern you make a request for an action, and once that action is fulfilled
  47. ## a future is completed with the result of that action. Requests can be
  48. ## made by calling the appropriate functions. For example: calling the ``recv``
  49. ## function will create a request for some data to be read from a socket. The
  50. ## future which the ``recv`` function returns will then complete once the
  51. ## requested amount of data is read **or** an exception occurs.
  52. ##
  53. ## Code to read some data from a socket may look something like this:
  54. ##
  55. ## .. code-block::nim
  56. ## var future = socket.recv(100)
  57. ## future.addCallback(
  58. ## proc () =
  59. ## echo(future.read)
  60. ## )
  61. ##
  62. ## All asynchronous functions returning a ``Future`` will not block. They
  63. ## will not however return immediately. An asynchronous function will have
  64. ## code which will be executed before an asynchronous request is made, in most
  65. ## cases this code sets up the request.
  66. ##
  67. ## In the above example, the ``recv`` function will return a brand new
  68. ## ``Future`` instance once the request for data to be read from the socket
  69. ## is made. This ``Future`` instance will complete once the requested amount
  70. ## of data is read, in this case it is 100 bytes. The second line sets a
  71. ## callback on this future which will be called once the future completes.
  72. ## All the callback does is write the data stored in the future to ``stdout``.
  73. ## The ``read`` function is used for this and it checks whether the future
  74. ## completes with an error for you (if it did it will simply raise the
  75. ## error), if there is no error however it returns the value of the future.
  76. ##
  77. ## Asynchronous procedures
  78. ## -----------------------
  79. ##
  80. ## Asynchronous procedures remove the pain of working with callbacks. They do
  81. ## this by allowing you to write asynchronous code the same way as you would
  82. ## write synchronous code.
  83. ##
  84. ## An asynchronous procedure is marked using the ``{.async.}`` pragma.
  85. ## When marking a procedure with the ``{.async.}`` pragma it must have a
  86. ## ``Future[T]`` return type or no return type at all. If you do not specify
  87. ## a return type then ``Future[void]`` is assumed.
  88. ##
  89. ## Inside asynchronous procedures ``await`` can be used to call any
  90. ## procedures which return a
  91. ## ``Future``; this includes asynchronous procedures. When a procedure is
  92. ## "awaited", the asynchronous procedure it is awaited in will
  93. ## suspend its execution
  94. ## until the awaited procedure's Future completes. At which point the
  95. ## asynchronous procedure will resume its execution. During the period
  96. ## when an asynchronous procedure is suspended other asynchronous procedures
  97. ## will be run by the dispatcher.
  98. ##
  99. ## The ``await`` call may be used in many contexts. It can be used on the right
  100. ## hand side of a variable declaration: ``var data = await socket.recv(100)``,
  101. ## in which case the variable will be set to the value of the future
  102. ## automatically. It can be used to await a ``Future`` object, and it can
  103. ## be used to await a procedure returning a ``Future[void]``:
  104. ## ``await socket.send("foobar")``.
  105. ##
  106. ## If an awaited future completes with an error, then ``await`` will re-raise
  107. ## this error. To avoid this, you can use the ``yield`` keyword instead of
  108. ## ``await``. The following section shows different ways that you can handle
  109. ## exceptions in async procs.
  110. ##
  111. ## Handling Exceptions
  112. ## ~~~~~~~~~~~~~~~~~~~
  113. ##
  114. ## The most reliable way to handle exceptions is to use ``yield`` on a future
  115. ## then check the future's ``failed`` property. For example:
  116. ##
  117. ## .. code-block:: Nim
  118. ## var future = sock.recv(100)
  119. ## yield future
  120. ## if future.failed:
  121. ## # Handle exception
  122. ##
  123. ## The ``async`` procedures also offer limited support for the try statement.
  124. ##
  125. ## .. code-block:: Nim
  126. ## try:
  127. ## let data = await sock.recv(100)
  128. ## echo("Received ", data)
  129. ## except:
  130. ## # Handle exception
  131. ##
  132. ## Unfortunately the semantics of the try statement may not always be correct,
  133. ## and occasionally the compilation may fail altogether.
  134. ## As such it is better to use the former style when possible.
  135. ##
  136. ##
  137. ## Discarding futures
  138. ## ------------------
  139. ##
  140. ## Futures should **never** be discarded. This is because they may contain
  141. ## errors. If you do not care for the result of a Future then you should
  142. ## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. Note
  143. ## however that this does not wait for completion, and you should use
  144. ## ``waitFor`` for that purpose.
  145. ##
  146. ## Examples
  147. ## --------
  148. ##
  149. ## For examples take a look at the documentation for the modules implementing
  150. ## asynchronous IO. A good place to start is the
  151. ## `asyncnet module <asyncnet.html>`_.
  152. ##
  153. ## Limitations/Bugs
  154. ## ----------------
  155. ##
  156. ## * The effect system (``raises: []``) does not work with async procedures.
  157. # TODO: Check if yielded future is nil and throw a more meaningful exception
  158. type
  159. PDispatcherBase = ref object of RootRef
  160. timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
  161. callbacks*: Deque[proc ()]
  162. proc processTimers(
  163. p: PDispatcherBase, didSomeWork: var bool
  164. ): Option[int] {.inline.} =
  165. # Pop the timers in the order in which they will expire (smaller `finishAt`).
  166. var count = p.timers.len
  167. let t = epochTime()
  168. while count > 0 and t >= p.timers[0].finishAt:
  169. p.timers.pop().fut.complete()
  170. dec count
  171. didSomeWork = true
  172. # Return the number of miliseconds in which the next timer will expire.
  173. if p.timers.len == 0: return
  174. let milisecs = (p.timers[0].finishAt - epochTime()) * 1000
  175. return some(ceil(milisecs).int)
  176. proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
  177. while p.callbacks.len > 0:
  178. var cb = p.callbacks.popFirst()
  179. cb()
  180. didSomeWork = true
  181. proc adjustTimeout(pollTimeout: int, nextTimer: Option[int]): int {.inline.} =
  182. if nextTimer.isNone():
  183. return pollTimeout
  184. result = nextTimer.get()
  185. if pollTimeout == -1: return
  186. result = min(pollTimeout, result)
  187. proc callSoon(cbproc: proc ()) {.gcsafe.}
  188. proc initCallSoonProc =
  189. if asyncfutures.getCallSoonProc().isNil:
  190. asyncfutures.setCallSoonProc(callSoon)
  191. when defined(windows) or defined(nimdoc):
  192. import winlean, sets, hashes
  193. type
  194. CompletionKey = ULONG_PTR
  195. CompletionData* = object
  196. fd*: AsyncFD # TODO: Rename this.
  197. cb*: proc (fd: AsyncFD, bytesTransferred: Dword,
  198. errcode: OSErrorCode) {.closure,gcsafe.}
  199. cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
  200. # when using RegisterWaitForSingleObject, because
  201. # waiting is done in different thread.
  202. PDispatcher* = ref object of PDispatcherBase
  203. ioPort: Handle
  204. handles: HashSet[AsyncFD]
  205. CustomOverlapped = object of OVERLAPPED
  206. data*: CompletionData
  207. PCustomOverlapped* = ref CustomOverlapped
  208. AsyncFD* = distinct int
  209. PostCallbackData = object
  210. ioPort: Handle
  211. handleFd: AsyncFD
  212. waitFd: Handle
  213. ovl: PCustomOverlapped
  214. PostCallbackDataPtr = ptr PostCallbackData
  215. AsyncEventImpl = object
  216. hEvent: Handle
  217. hWaiter: Handle
  218. pcd: PostCallbackDataPtr
  219. AsyncEvent* = ptr AsyncEventImpl
  220. Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
  221. {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD,
  222. TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].}
  223. proc hash(x: AsyncFD): Hash {.borrow.}
  224. proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
  225. proc newDispatcher*(): PDispatcher =
  226. ## Creates a new Dispatcher instance.
  227. new result
  228. result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
  229. result.handles = initSet[AsyncFD]()
  230. result.timers.newHeapQueue()
  231. result.callbacks = initDeque[proc ()](64)
  232. var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
  233. proc setGlobalDispatcher*(disp: PDispatcher) =
  234. if not gDisp.isNil:
  235. assert gDisp.callbacks.len == 0
  236. gDisp = disp
  237. initCallSoonProc()
  238. proc getGlobalDispatcher*(): PDispatcher =
  239. if gDisp.isNil:
  240. setGlobalDispatcher(newDispatcher())
  241. result = gDisp
  242. proc getIoHandler*(disp: PDispatcher): Handle =
  243. ## Returns the underlying IO Completion Port handle (Windows) or selector
  244. ## (Unix) for the specified dispatcher.
  245. return disp.ioPort
  246. proc register*(fd: AsyncFD) =
  247. ## Registers ``fd`` with the dispatcher.
  248. let p = getGlobalDispatcher()
  249. if createIoCompletionPort(fd.Handle, p.ioPort,
  250. cast[CompletionKey](fd), 1) == 0:
  251. raiseOSError(osLastError())
  252. p.handles.incl(fd)
  253. proc verifyPresence(fd: AsyncFD) =
  254. ## Ensures that file descriptor has been registered with the dispatcher.
  255. let p = getGlobalDispatcher()
  256. if fd notin p.handles:
  257. raise newException(ValueError,
  258. "Operation performed on a socket which has not been registered with" &
  259. " the dispatcher yet.")
  260. proc hasPendingOperations*(): bool =
  261. ## Returns `true` if the global dispatcher has pending operations.
  262. let p = getGlobalDispatcher()
  263. p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
  264. proc runOnce(timeout = 500): bool =
  265. let p = getGlobalDispatcher()
  266. if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
  267. raise newException(ValueError,
  268. "No handles or timers registered in dispatcher.")
  269. result = false
  270. let nextTimer = processTimers(p, result)
  271. let at = adjustTimeout(timeout, nextTimer)
  272. var llTimeout =
  273. if at == -1: winlean.INFINITE
  274. else: at.int32
  275. var lpNumberOfBytesTransferred: Dword
  276. var lpCompletionKey: ULONG_PTR
  277. var customOverlapped: PCustomOverlapped
  278. let res = getQueuedCompletionStatus(p.ioPort,
  279. addr lpNumberOfBytesTransferred, addr lpCompletionKey,
  280. cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
  281. result = true
  282. # http://stackoverflow.com/a/12277264/492186
  283. # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
  284. if res:
  285. # This is useful for ensuring the reliability of the overlapped struct.
  286. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  287. customOverlapped.data.cb(customOverlapped.data.fd,
  288. lpNumberOfBytesTransferred, OSErrorCode(-1))
  289. # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
  290. # so we need to dispose our `cb` environment, because it is not needed
  291. # anymore.
  292. if customOverlapped.data.cell.data != nil:
  293. system.dispose(customOverlapped.data.cell)
  294. GC_unref(customOverlapped)
  295. else:
  296. let errCode = osLastError()
  297. if customOverlapped != nil:
  298. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  299. customOverlapped.data.cb(customOverlapped.data.fd,
  300. lpNumberOfBytesTransferred, errCode)
  301. if customOverlapped.data.cell.data != nil:
  302. system.dispose(customOverlapped.data.cell)
  303. GC_unref(customOverlapped)
  304. else:
  305. if errCode.int32 == WAIT_TIMEOUT:
  306. # Timed out
  307. result = false
  308. else: raiseOSError(errCode)
  309. # Timer processing.
  310. discard processTimers(p, result)
  311. # Callback queue processing
  312. processPendingCallbacks(p, result)
  313. var acceptEx: WSAPROC_ACCEPTEX
  314. var connectEx: WSAPROC_CONNECTEX
  315. var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
  316. proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
  317. # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
  318. var bytesRet: Dword
  319. fun = nil
  320. result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
  321. sizeof(GUID).Dword, addr fun, sizeof(pointer).Dword,
  322. addr bytesRet, nil, nil) == 0
  323. proc initAll() =
  324. let dummySock = newNativeSocket()
  325. if dummySock == INVALID_SOCKET:
  326. raiseOSError(osLastError())
  327. var fun: pointer = nil
  328. if not initPointer(dummySock, fun, WSAID_CONNECTEX):
  329. raiseOSError(osLastError())
  330. connectEx = cast[WSAPROC_CONNECTEX](fun)
  331. if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
  332. raiseOSError(osLastError())
  333. acceptEx = cast[WSAPROC_ACCEPTEX](fun)
  334. if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
  335. raiseOSError(osLastError())
  336. getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
  337. close(dummySock)
  338. proc recv*(socket: AsyncFD, size: int,
  339. flags = {SocketFlag.SafeDisconn}): Future[string] =
  340. ## Reads **up to** ``size`` bytes from ``socket``. Returned future will
  341. ## complete once all the data requested is read, a part of the data has been
  342. ## read, or the socket has disconnected in which case the future will
  343. ## complete with a value of ``""``.
  344. ##
  345. ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
  346. # Things to note:
  347. # * When WSARecv completes immediately then ``bytesReceived`` is very
  348. # unreliable.
  349. # * Still need to implement message-oriented socket disconnection,
  350. # '\0' in the message currently signifies a socket disconnect. Who
  351. # knows what will happen when someone sends that to our socket.
  352. verifyPresence(socket)
  353. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  354. var retFuture = newFuture[string]("recv")
  355. var dataBuf: TWSABuf
  356. dataBuf.buf = cast[cstring](alloc0(size))
  357. dataBuf.len = size.ULONG
  358. var bytesReceived: Dword
  359. var flagsio = flags.toOSFlags().Dword
  360. var ol = PCustomOverlapped()
  361. GC_ref(ol)
  362. ol.data = CompletionData(fd: socket, cb:
  363. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  364. if not retFuture.finished:
  365. if errcode == OSErrorCode(-1):
  366. if bytesCount == 0 and dataBuf.buf[0] == '\0':
  367. retFuture.complete("")
  368. else:
  369. var data = newString(bytesCount)
  370. assert bytesCount <= size
  371. copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
  372. retFuture.complete($data)
  373. else:
  374. if flags.isDisconnectionError(errcode):
  375. retFuture.complete("")
  376. else:
  377. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  378. if dataBuf.buf != nil:
  379. dealloc dataBuf.buf
  380. dataBuf.buf = nil
  381. )
  382. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  383. addr flagsio, cast[POVERLAPPED](ol), nil)
  384. if ret == -1:
  385. let err = osLastError()
  386. if err.int32 != ERROR_IO_PENDING:
  387. if dataBuf.buf != nil:
  388. dealloc dataBuf.buf
  389. dataBuf.buf = nil
  390. GC_unref(ol)
  391. if flags.isDisconnectionError(err):
  392. retFuture.complete("")
  393. else:
  394. retFuture.fail(newException(OSError, osErrorMsg(err)))
  395. elif ret == 0:
  396. # Request completed immediately.
  397. if bytesReceived != 0:
  398. var data = newString(bytesReceived)
  399. assert bytesReceived <= size
  400. copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
  401. retFuture.complete($data)
  402. else:
  403. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  404. retFuture.complete("")
  405. return retFuture
  406. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  407. flags = {SocketFlag.SafeDisconn}): Future[int] =
  408. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
  409. ## at least be of that size. Returned future will complete once all the
  410. ## data requested is read, a part of the data has been read, or the socket
  411. ## has disconnected in which case the future will complete with a value of
  412. ## ``0``.
  413. ##
  414. ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
  415. # Things to note:
  416. # * When WSARecv completes immediately then ``bytesReceived`` is very
  417. # unreliable.
  418. # * Still need to implement message-oriented socket disconnection,
  419. # '\0' in the message currently signifies a socket disconnect. Who
  420. # knows what will happen when someone sends that to our socket.
  421. verifyPresence(socket)
  422. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  423. var retFuture = newFuture[int]("recvInto")
  424. #buf[] = '\0'
  425. var dataBuf: TWSABuf
  426. dataBuf.buf = cast[cstring](buf)
  427. dataBuf.len = size.ULONG
  428. var bytesReceived: Dword
  429. var flagsio = flags.toOSFlags().Dword
  430. var ol = PCustomOverlapped()
  431. GC_ref(ol)
  432. ol.data = CompletionData(fd: socket, cb:
  433. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  434. if not retFuture.finished:
  435. if errcode == OSErrorCode(-1):
  436. retFuture.complete(bytesCount)
  437. else:
  438. if flags.isDisconnectionError(errcode):
  439. retFuture.complete(0)
  440. else:
  441. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  442. if dataBuf.buf != nil:
  443. dataBuf.buf = nil
  444. )
  445. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  446. addr flagsio, cast[POVERLAPPED](ol), nil)
  447. if ret == -1:
  448. let err = osLastError()
  449. if err.int32 != ERROR_IO_PENDING:
  450. if dataBuf.buf != nil:
  451. dataBuf.buf = nil
  452. GC_unref(ol)
  453. if flags.isDisconnectionError(err):
  454. retFuture.complete(0)
  455. else:
  456. retFuture.fail(newException(OSError, osErrorMsg(err)))
  457. elif ret == 0:
  458. # Request completed immediately.
  459. if bytesReceived != 0:
  460. assert bytesReceived <= size
  461. retFuture.complete(bytesReceived)
  462. else:
  463. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  464. retFuture.complete(bytesReceived)
  465. return retFuture
  466. proc send*(socket: AsyncFD, buf: pointer, size: int,
  467. flags = {SocketFlag.SafeDisconn}): Future[void] =
  468. ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
  469. ## will complete once all data has been sent.
  470. ##
  471. ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
  472. ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
  473. verifyPresence(socket)
  474. var retFuture = newFuture[void]("send")
  475. var dataBuf: TWSABuf
  476. dataBuf.buf = cast[cstring](buf)
  477. dataBuf.len = size.ULONG
  478. var bytesReceived, lowFlags: Dword
  479. var ol = PCustomOverlapped()
  480. GC_ref(ol)
  481. ol.data = CompletionData(fd: socket, cb:
  482. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  483. if not retFuture.finished:
  484. if errcode == OSErrorCode(-1):
  485. retFuture.complete()
  486. else:
  487. if flags.isDisconnectionError(errcode):
  488. retFuture.complete()
  489. else:
  490. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  491. )
  492. let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  493. lowFlags, cast[POVERLAPPED](ol), nil)
  494. if ret == -1:
  495. let err = osLastError()
  496. if err.int32 != ERROR_IO_PENDING:
  497. GC_unref(ol)
  498. if flags.isDisconnectionError(err):
  499. retFuture.complete()
  500. else:
  501. retFuture.fail(newException(OSError, osErrorMsg(err)))
  502. else:
  503. retFuture.complete()
  504. # We don't deallocate ``ol`` here because even though this completed
  505. # immediately poll will still be notified about its completion and it will
  506. # free ``ol``.
  507. return retFuture
  508. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  509. saddrLen: Socklen,
  510. flags = {SocketFlag.SafeDisconn}): Future[void] =
  511. ## Sends ``data`` to specified destination ``saddr``, using
  512. ## socket ``socket``. The returned future will complete once all data
  513. ## has been sent.
  514. verifyPresence(socket)
  515. var retFuture = newFuture[void]("sendTo")
  516. var dataBuf: TWSABuf
  517. dataBuf.buf = cast[cstring](data)
  518. dataBuf.len = size.ULONG
  519. var bytesSent = 0.Dword
  520. var lowFlags = 0.Dword
  521. # we will preserve address in our stack
  522. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  523. var stalen: cint = cint(saddrLen)
  524. zeroMem(addr(staddr[0]), 128)
  525. copyMem(addr(staddr[0]), saddr, saddrLen)
  526. var ol = PCustomOverlapped()
  527. GC_ref(ol)
  528. ol.data = CompletionData(fd: socket, cb:
  529. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  530. if not retFuture.finished:
  531. if errcode == OSErrorCode(-1):
  532. retFuture.complete()
  533. else:
  534. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  535. )
  536. let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
  537. lowFlags, cast[ptr SockAddr](addr(staddr[0])),
  538. stalen, cast[POVERLAPPED](ol), nil)
  539. if ret == -1:
  540. let err = osLastError()
  541. if err.int32 != ERROR_IO_PENDING:
  542. GC_unref(ol)
  543. retFuture.fail(newException(OSError, osErrorMsg(err)))
  544. else:
  545. retFuture.complete()
  546. # We don't deallocate ``ol`` here because even though this completed
  547. # immediately poll will still be notified about its completion and it will
  548. # free ``ol``.
  549. return retFuture
  550. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  551. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  552. flags = {SocketFlag.SafeDisconn}): Future[int] =
  553. ## Receives a datagram data from ``socket`` into ``buf``, which must
  554. ## be at least of size ``size``, address of datagram's sender will be
  555. ## stored into ``saddr`` and ``saddrLen``. Returned future will complete
  556. ## once one datagram has been received, and will return size of packet
  557. ## received.
  558. verifyPresence(socket)
  559. var retFuture = newFuture[int]("recvFromInto")
  560. var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
  561. var bytesReceived = 0.Dword
  562. var lowFlags = 0.Dword
  563. var ol = PCustomOverlapped()
  564. GC_ref(ol)
  565. ol.data = CompletionData(fd: socket, cb:
  566. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  567. if not retFuture.finished:
  568. if errcode == OSErrorCode(-1):
  569. assert bytesCount <= size
  570. retFuture.complete(bytesCount)
  571. else:
  572. # datagram sockets don't have disconnection,
  573. # so we can just raise an exception
  574. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  575. )
  576. let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
  577. addr bytesReceived, addr lowFlags,
  578. saddr, cast[ptr cint](saddrLen),
  579. cast[POVERLAPPED](ol), nil)
  580. if res == -1:
  581. let err = osLastError()
  582. if err.int32 != ERROR_IO_PENDING:
  583. GC_unref(ol)
  584. retFuture.fail(newException(OSError, osErrorMsg(err)))
  585. else:
  586. # Request completed immediately.
  587. if bytesReceived != 0:
  588. assert bytesReceived <= size
  589. retFuture.complete(bytesReceived)
  590. else:
  591. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  592. retFuture.complete(bytesReceived)
  593. return retFuture
  594. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
  595. Future[tuple[address: string, client: AsyncFD]] =
  596. ## Accepts a new connection. Returns a future containing the client socket
  597. ## corresponding to that connection and the remote address of the client.
  598. ## The future will complete when the connection is successfully accepted.
  599. ##
  600. ## The resulting client socket is automatically registered to the
  601. ## dispatcher.
  602. ##
  603. ## The ``accept`` call may result in an error if the connecting socket
  604. ## disconnects during the duration of the ``accept``. If the ``SafeDisconn``
  605. ## flag is specified then this error will not be raised and instead
  606. ## accept will be called again.
  607. verifyPresence(socket)
  608. var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
  609. var clientSock = newNativeSocket()
  610. if clientSock == osInvalidSocket: raiseOSError(osLastError())
  611. const lpOutputLen = 1024
  612. var lpOutputBuf = newString(lpOutputLen)
  613. var dwBytesReceived: Dword
  614. let dwReceiveDataLength = 0.Dword # We don't want any data to be read.
  615. let dwLocalAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
  616. let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
  617. template failAccept(errcode) =
  618. if flags.isDisconnectionError(errcode):
  619. var newAcceptFut = acceptAddr(socket, flags)
  620. newAcceptFut.callback =
  621. proc () =
  622. if newAcceptFut.failed:
  623. retFuture.fail(newAcceptFut.readError)
  624. else:
  625. retFuture.complete(newAcceptFut.read)
  626. else:
  627. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  628. template completeAccept() {.dirty.} =
  629. var listenSock = socket
  630. let setoptRet = setsockopt(clientSock, SOL_SOCKET,
  631. SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
  632. sizeof(listenSock).SockLen)
  633. if setoptRet != 0:
  634. let errcode = osLastError()
  635. discard clientSock.closeSocket()
  636. failAccept(errcode)
  637. else:
  638. var localSockaddr, remoteSockaddr: ptr SockAddr
  639. var localLen, remoteLen: int32
  640. getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength,
  641. dwLocalAddressLength, dwRemoteAddressLength,
  642. addr localSockaddr, addr localLen,
  643. addr remoteSockaddr, addr remoteLen)
  644. try:
  645. let address = getAddrString(remoteSockAddr)
  646. register(clientSock.AsyncFD)
  647. retFuture.complete((address: address, client: clientSock.AsyncFD))
  648. except:
  649. # getAddrString may raise
  650. clientSock.close()
  651. retFuture.fail(getCurrentException())
  652. var ol = PCustomOverlapped()
  653. GC_ref(ol)
  654. ol.data = CompletionData(fd: socket, cb:
  655. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  656. if not retFuture.finished:
  657. if errcode == OSErrorCode(-1):
  658. completeAccept()
  659. else:
  660. failAccept(errcode)
  661. )
  662. # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
  663. let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
  664. dwReceiveDataLength,
  665. dwLocalAddressLength,
  666. dwRemoteAddressLength,
  667. addr dwBytesReceived, cast[POVERLAPPED](ol))
  668. if not ret:
  669. let err = osLastError()
  670. if err.int32 != ERROR_IO_PENDING:
  671. failAccept(err)
  672. GC_unref(ol)
  673. else:
  674. completeAccept()
  675. # We don't deallocate ``ol`` here because even though this completed
  676. # immediately poll will still be notified about its completion and it will
  677. # free ``ol``.
  678. return retFuture
  679. proc closeSocket*(socket: AsyncFD) =
  680. ## Closes a socket and ensures that it is unregistered.
  681. socket.SocketHandle.close()
  682. getGlobalDispatcher().handles.excl(socket)
  683. proc unregister*(fd: AsyncFD) =
  684. ## Unregisters ``fd``.
  685. getGlobalDispatcher().handles.excl(fd)
  686. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  687. return fd in disp.handles
  688. {.push stackTrace:off.}
  689. proc waitableCallback(param: pointer,
  690. timerOrWaitFired: WINBOOL): void {.stdcall.} =
  691. var p = cast[PostCallbackDataPtr](param)
  692. discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.Dword,
  693. ULONG_PTR(p.handleFd),
  694. cast[pointer](p.ovl))
  695. {.pop.}
  696. proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: Dword) =
  697. let p = getGlobalDispatcher()
  698. var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).Dword
  699. var hEvent = wsaCreateEvent()
  700. if hEvent == 0:
  701. raiseOSError(osLastError())
  702. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  703. pcd.ioPort = p.ioPort
  704. pcd.handleFd = fd
  705. var ol = PCustomOverlapped()
  706. GC_ref(ol)
  707. ol.data = CompletionData(fd: fd, cb:
  708. proc(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  709. # we excluding our `fd` because cb(fd) can register own handler
  710. # for this `fd`
  711. p.handles.excl(fd)
  712. # unregisterWait() is called before callback, because appropriate
  713. # winsockets function can re-enable event.
  714. # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
  715. if unregisterWait(pcd.waitFd) == 0:
  716. let err = osLastError()
  717. if err.int32 != ERROR_IO_PENDING:
  718. deallocShared(cast[pointer](pcd))
  719. discard wsaCloseEvent(hEvent)
  720. raiseOSError(err)
  721. if cb(fd):
  722. # callback returned `true`, so we free all allocated resources
  723. deallocShared(cast[pointer](pcd))
  724. if not wsaCloseEvent(hEvent):
  725. raiseOSError(osLastError())
  726. # pcd.ovl will be unrefed in poll().
  727. else:
  728. # callback returned `false` we need to continue
  729. if p.handles.contains(fd):
  730. # new callback was already registered with `fd`, so we free all
  731. # allocated resources. This happens because in callback `cb`
  732. # addRead/addWrite was called with same `fd`.
  733. deallocShared(cast[pointer](pcd))
  734. if not wsaCloseEvent(hEvent):
  735. raiseOSError(osLastError())
  736. else:
  737. # we need to include `fd` again
  738. p.handles.incl(fd)
  739. # and register WaitForSingleObject again
  740. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  741. cast[WAITORTIMERCALLBACK](waitableCallback),
  742. cast[pointer](pcd), INFINITE, flags):
  743. # pcd.ovl will be unrefed in poll()
  744. let err = osLastError()
  745. deallocShared(cast[pointer](pcd))
  746. discard wsaCloseEvent(hEvent)
  747. raiseOSError(err)
  748. else:
  749. # we incref `pcd.ovl` and `protect` callback one more time,
  750. # because it will be unrefed and disposed in `poll()` after
  751. # callback finishes.
  752. GC_ref(pcd.ovl)
  753. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  754. )
  755. # We need to protect our callback environment value, so GC will not free it
  756. # accidentally.
  757. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  758. # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
  759. # will be signaled when appropriate `mask` events will be triggered.
  760. if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
  761. let err = osLastError()
  762. GC_unref(ol)
  763. deallocShared(cast[pointer](pcd))
  764. discard wsaCloseEvent(hEvent)
  765. raiseOSError(err)
  766. pcd.ovl = ol
  767. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  768. cast[WAITORTIMERCALLBACK](waitableCallback),
  769. cast[pointer](pcd), INFINITE, flags):
  770. let err = osLastError()
  771. GC_unref(ol)
  772. deallocShared(cast[pointer](pcd))
  773. discard wsaCloseEvent(hEvent)
  774. raiseOSError(err)
  775. p.handles.incl(fd)
  776. proc addRead*(fd: AsyncFD, cb: Callback) =
  777. ## Start watching the file descriptor for read availability and then call
  778. ## the callback ``cb``.
  779. ##
  780. ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
  781. ## so if you can avoid it, please do it. Use `addRead` only if really
  782. ## need it (main usecase is adaptation of unix-like libraries to be
  783. ## asynchronous on Windows).
  784. ##
  785. ## If you use this function, you don't need to use asyncdispatch.recv()
  786. ## or asyncdispatch.accept(), because they are using IOCP, please use
  787. ## nativesockets.recv() and nativesockets.accept() instead.
  788. ##
  789. ## Be sure your callback ``cb`` returns ``true``, if you want to remove
  790. ## watch of `read` notifications, and ``false``, if you want to continue
  791. ## receiving notifications.
  792. registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
  793. proc addWrite*(fd: AsyncFD, cb: Callback) =
  794. ## Start watching the file descriptor for write availability and then call
  795. ## the callback ``cb``.
  796. ##
  797. ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
  798. ## so if you can avoid it, please do it. Use `addWrite` only if really
  799. ## need it (main usecase is adaptation of unix-like libraries to be
  800. ## asynchronous on Windows).
  801. ##
  802. ## If you use this function, you don't need to use asyncdispatch.send()
  803. ## or asyncdispatch.connect(), because they are using IOCP, please use
  804. ## nativesockets.send() and nativesockets.connect() instead.
  805. ##
  806. ## Be sure your callback ``cb`` returns ``true``, if you want to remove
  807. ## watch of `write` notifications, and ``false``, if you want to continue
  808. ## receiving notifications.
  809. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
  810. template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
  811. handleCallback) =
  812. let handleFD = AsyncFD(hEvent)
  813. pcd.ioPort = p.ioPort
  814. pcd.handleFd = handleFD
  815. var ol = PCustomOverlapped()
  816. GC_ref(ol)
  817. ol.data.fd = handleFD
  818. ol.data.cb = handleCallback
  819. # We need to protect our callback environment value, so GC will not free it
  820. # accidentally.
  821. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  822. pcd.ovl = ol
  823. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  824. cast[WAITORTIMERCALLBACK](waitableCallback),
  825. cast[pointer](pcd), timeout.Dword, flags):
  826. let err = osLastError()
  827. GC_unref(ol)
  828. deallocShared(cast[pointer](pcd))
  829. discard closeHandle(hEvent)
  830. raiseOSError(err)
  831. p.handles.incl(handleFD)
  832. template closeWaitable(handle: untyped) =
  833. let waitFd = pcd.waitFd
  834. deallocShared(cast[pointer](pcd))
  835. p.handles.excl(fd)
  836. if unregisterWait(waitFd) == 0:
  837. let err = osLastError()
  838. if err.int32 != ERROR_IO_PENDING:
  839. discard closeHandle(handle)
  840. raiseOSError(err)
  841. if closeHandle(handle) == 0:
  842. raiseOSError(osLastError())
  843. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  844. ## Registers callback ``cb`` to be called when timer expired.
  845. ##
  846. ## Parameters:
  847. ##
  848. ## * ``timeout`` - timeout value in milliseconds.
  849. ## * ``oneshot``
  850. ## * `true` - generate only one timeout event
  851. ## * `false` - generate timeout events periodically
  852. doAssert(timeout > 0)
  853. let p = getGlobalDispatcher()
  854. var hEvent = createEvent(nil, 1, 0, nil)
  855. if hEvent == INVALID_HANDLE_VALUE:
  856. raiseOSError(osLastError())
  857. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  858. var flags = WT_EXECUTEINWAITTHREAD.Dword
  859. if oneshot: flags = flags or WT_EXECUTEONLYONCE
  860. proc timercb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  861. let res = cb(fd)
  862. if res or oneshot:
  863. closeWaitable(hEvent)
  864. else:
  865. # if callback returned `false`, then it wants to be called again, so
  866. # we need to ref and protect `pcd.ovl` again, because it will be
  867. # unrefed and disposed in `poll()`.
  868. GC_ref(pcd.ovl)
  869. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  870. registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
  871. proc addProcess*(pid: int, cb: Callback) =
  872. ## Registers callback ``cb`` to be called when process with process ID
  873. ## ``pid`` exited.
  874. let p = getGlobalDispatcher()
  875. let procFlags = SYNCHRONIZE
  876. var hProcess = openProcess(procFlags, 0, pid.Dword)
  877. if hProcess == INVALID_HANDLE_VALUE:
  878. raiseOSError(osLastError())
  879. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  880. var flags = WT_EXECUTEINWAITTHREAD.Dword
  881. proc proccb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  882. closeWaitable(hProcess)
  883. discard cb(fd)
  884. registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
  885. proc newAsyncEvent*(): AsyncEvent =
  886. ## Creates a new thread-safe ``AsyncEvent`` object.
  887. ##
  888. ## New ``AsyncEvent`` object is not automatically registered with
  889. ## dispatcher like ``AsyncSocket``.
  890. var sa = SECURITY_ATTRIBUTES(
  891. nLength: sizeof(SECURITY_ATTRIBUTES).cint,
  892. bInheritHandle: 1
  893. )
  894. var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
  895. if event == INVALID_HANDLE_VALUE:
  896. raiseOSError(osLastError())
  897. result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
  898. result.hEvent = event
  899. proc trigger*(ev: AsyncEvent) =
  900. ## Set event ``ev`` to signaled state.
  901. if setEvent(ev.hEvent) == 0:
  902. raiseOSError(osLastError())
  903. proc unregister*(ev: AsyncEvent) =
  904. ## Unregisters event ``ev``.
  905. doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
  906. let p = getGlobalDispatcher()
  907. p.handles.excl(AsyncFD(ev.hEvent))
  908. if unregisterWait(ev.hWaiter) == 0:
  909. let err = osLastError()
  910. if err.int32 != ERROR_IO_PENDING:
  911. raiseOSError(err)
  912. ev.hWaiter = 0
  913. proc close*(ev: AsyncEvent) =
  914. ## Closes event ``ev``.
  915. let res = closeHandle(ev.hEvent)
  916. deallocShared(cast[pointer](ev))
  917. if res == 0:
  918. raiseOSError(osLastError())
  919. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  920. ## Registers callback ``cb`` to be called when ``ev`` will be signaled
  921. doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
  922. let p = getGlobalDispatcher()
  923. let hEvent = ev.hEvent
  924. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  925. var flags = WT_EXECUTEINWAITTHREAD.Dword
  926. proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  927. if ev.hWaiter != 0:
  928. if cb(fd):
  929. # we need this check to avoid exception, if `unregister(event)` was
  930. # called in callback.
  931. deallocShared(cast[pointer](pcd))
  932. if ev.hWaiter != 0:
  933. unregister(ev)
  934. else:
  935. # if callback returned `false`, then it wants to be called again, so
  936. # we need to ref and protect `pcd.ovl` again, because it will be
  937. # unrefed and disposed in `poll()`.
  938. GC_ref(pcd.ovl)
  939. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  940. else:
  941. # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
  942. deallocShared(cast[pointer](pcd))
  943. registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
  944. ev.hWaiter = pcd.waitFd
  945. initAll()
  946. else:
  947. import selectors
  948. from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
  949. MSG_NOSIGNAL
  950. const
  951. InitCallbackListSize = 4 # initial size of callbacks sequence,
  952. # associated with file/socket descriptor.
  953. InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
  954. # queue.
  955. type
  956. AsyncFD* = distinct cint
  957. Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
  958. AsyncData = object
  959. readList: seq[Callback]
  960. writeList: seq[Callback]
  961. AsyncEvent* = distinct SelectEvent
  962. PDispatcher* = ref object of PDispatcherBase
  963. selector: Selector[AsyncData]
  964. {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].}
  965. proc `==`*(x, y: AsyncFD): bool {.borrow.}
  966. proc `==`*(x, y: AsyncEvent): bool {.borrow.}
  967. template newAsyncData(): AsyncData =
  968. AsyncData(
  969. readList: newSeqOfCap[Callback](InitCallbackListSize),
  970. writeList: newSeqOfCap[Callback](InitCallbackListSize)
  971. )
  972. proc newDispatcher*(): PDispatcher =
  973. new result
  974. result.selector = newSelector[AsyncData]()
  975. result.timers.newHeapQueue()
  976. result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
  977. var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
  978. proc setGlobalDispatcher*(disp: PDispatcher) =
  979. if not gDisp.isNil:
  980. assert gDisp.callbacks.len == 0
  981. gDisp = disp
  982. initCallSoonProc()
  983. proc getGlobalDispatcher*(): PDispatcher =
  984. if gDisp.isNil:
  985. setGlobalDispatcher(newDispatcher())
  986. result = gDisp
  987. proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
  988. return disp.selector
  989. proc register*(fd: AsyncFD) =
  990. let p = getGlobalDispatcher()
  991. var data = newAsyncData()
  992. p.selector.registerHandle(fd.SocketHandle, {}, data)
  993. proc closeSocket*(sock: AsyncFD) =
  994. let disp = getGlobalDispatcher()
  995. disp.selector.unregister(sock.SocketHandle)
  996. sock.SocketHandle.close()
  997. proc unregister*(fd: AsyncFD) =
  998. getGlobalDispatcher().selector.unregister(fd.SocketHandle)
  999. proc unregister*(ev: AsyncEvent) =
  1000. getGlobalDispatcher().selector.unregister(SelectEvent(ev))
  1001. proc contains*(disp: PDispatcher, fd: AsyncFd): bool =
  1002. return fd.SocketHandle in disp.selector
  1003. proc addRead*(fd: AsyncFD, cb: Callback) =
  1004. let p = getGlobalDispatcher()
  1005. var newEvents = {Event.Read}
  1006. withData(p.selector, fd.SocketHandle, adata) do:
  1007. adata.readList.add(cb)
  1008. newEvents.incl(Event.Read)
  1009. if len(adata.writeList) != 0: newEvents.incl(Event.Write)
  1010. do:
  1011. raise newException(ValueError, "File descriptor not registered.")
  1012. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1013. proc addWrite*(fd: AsyncFD, cb: Callback) =
  1014. let p = getGlobalDispatcher()
  1015. var newEvents = {Event.Write}
  1016. withData(p.selector, fd.SocketHandle, adata) do:
  1017. adata.writeList.add(cb)
  1018. newEvents.incl(Event.Write)
  1019. if len(adata.readList) != 0: newEvents.incl(Event.Read)
  1020. do:
  1021. raise newException(ValueError, "File descriptor not registered.")
  1022. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1023. proc hasPendingOperations*(): bool =
  1024. let p = getGlobalDispatcher()
  1025. not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
  1026. template processBasicCallbacks(ident, rwlist: untyped) =
  1027. # Process pending descriptor and AsyncEvent callbacks.
  1028. #
  1029. # Invoke every callback stored in `rwlist`, until one
  1030. # returns `false` (which means callback wants to stay
  1031. # alive). In such case all remaining callbacks will be added
  1032. # to `rwlist` again, in the order they have been inserted.
  1033. #
  1034. # `rwlist` associated with file descriptor MUST BE emptied before
  1035. # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
  1036. # or it can be possible to fall into endless cycle.
  1037. var curList: seq[Callback]
  1038. withData(p.selector, ident, adata) do:
  1039. shallowCopy(curList, adata.rwlist)
  1040. adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize)
  1041. let newLength = max(len(curList), InitCallbackListSize)
  1042. var newList = newSeqOfCap[Callback](newLength)
  1043. for cb in curList:
  1044. if len(newList) > 0:
  1045. # A callback has already returned with EAGAIN, don't call any others
  1046. # until next `poll`.
  1047. newList.add(cb)
  1048. else:
  1049. if not cb(fd.AsyncFD):
  1050. # Callback wants to be called again.
  1051. newList.add(cb)
  1052. withData(p.selector, ident, adata) do:
  1053. # descriptor still present in queue.
  1054. adata.rwlist = newList & adata.rwlist
  1055. rLength = len(adata.readList)
  1056. wLength = len(adata.writeList)
  1057. do:
  1058. # descriptor was unregistered in callback via `unregister()`.
  1059. rLength = -1
  1060. wLength = -1
  1061. template processCustomCallbacks(ident: untyped) =
  1062. # Process pending custom event callbacks. Custom events are
  1063. # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
  1064. # There can be only one callback registered with one descriptor,
  1065. # so there is no need to iterate over list.
  1066. var curList: seq[Callback]
  1067. withData(p.selector, ident, adata) do:
  1068. shallowCopy(curList, adata.readList)
  1069. adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1070. let newLength = len(curList)
  1071. var newList = newSeqOfCap[Callback](newLength)
  1072. var cb = curList[0]
  1073. if not cb(fd.AsyncFD):
  1074. newList.add(cb)
  1075. withData(p.selector, ident, adata) do:
  1076. # descriptor still present in queue.
  1077. adata.readList = newList & adata.readList
  1078. if len(adata.readList) == 0:
  1079. # if no callbacks registered with descriptor, unregister it.
  1080. p.selector.unregister(fd)
  1081. do:
  1082. # descriptor was unregistered in callback via `unregister()`.
  1083. discard
  1084. proc runOnce(timeout = 500): bool =
  1085. let p = getGlobalDispatcher()
  1086. when ioselSupportedPlatform:
  1087. let customSet = {Event.Timer, Event.Signal, Event.Process,
  1088. Event.Vnode}
  1089. if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
  1090. raise newException(ValueError,
  1091. "No handles or timers registered in dispatcher.")
  1092. result = false
  1093. var keys: array[64, ReadyKey]
  1094. let nextTimer = processTimers(p, result)
  1095. var count = p.selector.selectInto(adjustTimeout(timeout, nextTimer), keys)
  1096. for i in 0..<count:
  1097. var custom = false
  1098. let fd = keys[i].fd
  1099. let events = keys[i].events
  1100. var rLength = 0 # len(data.readList) after callback
  1101. var wLength = 0 # len(data.writeList) after callback
  1102. if Event.Read in events or events == {Event.Error}:
  1103. processBasicCallbacks(fd, readList)
  1104. result = true
  1105. if Event.Write in events or events == {Event.Error}:
  1106. processBasicCallbacks(fd, writeList)
  1107. result = true
  1108. if Event.User in events:
  1109. processBasicCallbacks(fd, readList)
  1110. custom = true
  1111. if rLength == 0:
  1112. p.selector.unregister(fd)
  1113. result = true
  1114. when ioselSupportedPlatform:
  1115. if (customSet * events) != {}:
  1116. custom = true
  1117. processCustomCallbacks(fd)
  1118. result = true
  1119. # because state `data` can be modified in callback we need to update
  1120. # descriptor events with currently registered callbacks.
  1121. if not custom:
  1122. var newEvents: set[Event] = {}
  1123. if rLength != -1 and wLength != -1:
  1124. if rLength > 0: incl(newEvents, Event.Read)
  1125. if wLength > 0: incl(newEvents, Event.Write)
  1126. p.selector.updateHandle(SocketHandle(fd), newEvents)
  1127. # Timer processing.
  1128. discard processTimers(p, result)
  1129. # Callback queue processing
  1130. processPendingCallbacks(p, result)
  1131. proc recv*(socket: AsyncFD, size: int,
  1132. flags = {SocketFlag.SafeDisconn}): Future[string] =
  1133. var retFuture = newFuture[string]("recv")
  1134. var readBuffer = newString(size)
  1135. proc cb(sock: AsyncFD): bool =
  1136. result = true
  1137. let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
  1138. flags.toOSFlags())
  1139. if res < 0:
  1140. let lastError = osLastError()
  1141. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1142. if flags.isDisconnectionError(lastError):
  1143. retFuture.complete("")
  1144. else:
  1145. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1146. else:
  1147. result = false # We still want this callback to be called.
  1148. elif res == 0:
  1149. # Disconnected
  1150. retFuture.complete("")
  1151. else:
  1152. readBuffer.setLen(res)
  1153. retFuture.complete(readBuffer)
  1154. # TODO: The following causes a massive slowdown.
  1155. #if not cb(socket):
  1156. addRead(socket, cb)
  1157. return retFuture
  1158. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  1159. flags = {SocketFlag.SafeDisconn}): Future[int] =
  1160. var retFuture = newFuture[int]("recvInto")
  1161. proc cb(sock: AsyncFD): bool =
  1162. result = true
  1163. let res = recv(sock.SocketHandle, buf, size.cint,
  1164. flags.toOSFlags())
  1165. if res < 0:
  1166. let lastError = osLastError()
  1167. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1168. if flags.isDisconnectionError(lastError):
  1169. retFuture.complete(0)
  1170. else:
  1171. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1172. else:
  1173. result = false # We still want this callback to be called.
  1174. else:
  1175. retFuture.complete(res)
  1176. # TODO: The following causes a massive slowdown.
  1177. #if not cb(socket):
  1178. addRead(socket, cb)
  1179. return retFuture
  1180. proc send*(socket: AsyncFD, buf: pointer, size: int,
  1181. flags = {SocketFlag.SafeDisconn}): Future[void] =
  1182. var retFuture = newFuture[void]("send")
  1183. var written = 0
  1184. proc cb(sock: AsyncFD): bool =
  1185. result = true
  1186. let netSize = size-written
  1187. var d = cast[cstring](buf)
  1188. let res = send(sock.SocketHandle, addr d[written], netSize.cint,
  1189. MSG_NOSIGNAL)
  1190. if res < 0:
  1191. let lastError = osLastError()
  1192. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1193. if flags.isDisconnectionError(lastError):
  1194. retFuture.complete()
  1195. else:
  1196. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1197. else:
  1198. result = false # We still want this callback to be called.
  1199. else:
  1200. written.inc(res)
  1201. if res != netSize:
  1202. result = false # We still have data to send.
  1203. else:
  1204. retFuture.complete()
  1205. # TODO: The following causes crashes.
  1206. #if not cb(socket):
  1207. addWrite(socket, cb)
  1208. return retFuture
  1209. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  1210. saddrLen: SockLen,
  1211. flags = {SocketFlag.SafeDisconn}): Future[void] =
  1212. ## Sends ``data`` of size ``size`` in bytes to specified destination
  1213. ## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``.
  1214. ## The returned future will complete once all data has been sent.
  1215. var retFuture = newFuture[void]("sendTo")
  1216. # we will preserve address in our stack
  1217. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  1218. var stalen = saddrLen
  1219. zeroMem(addr(staddr[0]), 128)
  1220. copyMem(addr(staddr[0]), saddr, saddrLen)
  1221. proc cb(sock: AsyncFD): bool =
  1222. result = true
  1223. let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
  1224. cast[ptr SockAddr](addr(staddr[0])), stalen)
  1225. if res < 0:
  1226. let lastError = osLastError()
  1227. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1228. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1229. else:
  1230. result = false # We still want this callback to be called.
  1231. else:
  1232. retFuture.complete()
  1233. addWrite(socket, cb)
  1234. return retFuture
  1235. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  1236. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  1237. flags = {SocketFlag.SafeDisconn}): Future[int] =
  1238. ## Receives a datagram data from ``socket`` into ``data``, which must
  1239. ## be at least of size ``size`` in bytes, address of datagram's sender
  1240. ## will be stored into ``saddr`` and ``saddrLen``. Returned future will
  1241. ## complete once one datagram has been received, and will return size
  1242. ## of packet received.
  1243. var retFuture = newFuture[int]("recvFromInto")
  1244. proc cb(sock: AsyncFD): bool =
  1245. result = true
  1246. let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
  1247. saddr, saddrLen)
  1248. if res < 0:
  1249. let lastError = osLastError()
  1250. if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
  1251. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1252. else:
  1253. result = false
  1254. else:
  1255. retFuture.complete(res)
  1256. addRead(socket, cb)
  1257. return retFuture
  1258. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
  1259. Future[tuple[address: string, client: AsyncFD]] =
  1260. var retFuture = newFuture[tuple[address: string,
  1261. client: AsyncFD]]("acceptAddr")
  1262. proc cb(sock: AsyncFD): bool =
  1263. result = true
  1264. var sockAddress: Sockaddr_storage
  1265. var addrLen = sizeof(sockAddress).Socklen
  1266. var client = accept(sock.SocketHandle,
  1267. cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
  1268. if client == osInvalidSocket:
  1269. let lastError = osLastError()
  1270. assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
  1271. if lastError.int32 == EINTR:
  1272. return false
  1273. else:
  1274. if flags.isDisconnectionError(lastError):
  1275. return false
  1276. else:
  1277. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1278. else:
  1279. try:
  1280. let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
  1281. register(client.AsyncFD)
  1282. retFuture.complete((address, client.AsyncFD))
  1283. except:
  1284. # getAddrString may raise
  1285. client.close()
  1286. retFuture.fail(getCurrentException())
  1287. addRead(socket, cb)
  1288. return retFuture
  1289. when ioselSupportedPlatform:
  1290. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  1291. ## Start watching for timeout expiration, and then call the
  1292. ## callback ``cb``.
  1293. ## ``timeout`` - time in milliseconds,
  1294. ## ``oneshot`` - if ``true`` only one event will be dispatched,
  1295. ## if ``false`` continuous events every ``timeout`` milliseconds.
  1296. let p = getGlobalDispatcher()
  1297. var data = newAsyncData()
  1298. data.readList.add(cb)
  1299. p.selector.registerTimer(timeout, oneshot, data)
  1300. proc addSignal*(signal: int, cb: Callback) =
  1301. ## Start watching signal ``signal``, and when signal appears, call the
  1302. ## callback ``cb``.
  1303. let p = getGlobalDispatcher()
  1304. var data = newAsyncData()
  1305. data.readList.add(cb)
  1306. p.selector.registerSignal(signal, data)
  1307. proc addProcess*(pid: int, cb: Callback) =
  1308. ## Start watching for process exit with pid ``pid``, and then call
  1309. ## the callback ``cb``.
  1310. let p = getGlobalDispatcher()
  1311. var data = newAsyncData()
  1312. data.readList.add(cb)
  1313. p.selector.registerProcess(pid, data)
  1314. proc newAsyncEvent*(): AsyncEvent =
  1315. ## Creates new ``AsyncEvent``.
  1316. result = AsyncEvent(newSelectEvent())
  1317. proc trigger*(ev: AsyncEvent) =
  1318. ## Sets new ``AsyncEvent`` to signaled state.
  1319. trigger(SelectEvent(ev))
  1320. proc close*(ev: AsyncEvent) =
  1321. ## Closes ``AsyncEvent``
  1322. close(SelectEvent(ev))
  1323. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  1324. ## Start watching for event ``ev``, and call callback ``cb``, when
  1325. ## ev will be set to signaled state.
  1326. let p = getGlobalDispatcher()
  1327. var data = newAsyncData()
  1328. data.readList.add(cb)
  1329. p.selector.registerEvent(SelectEvent(ev), data)
  1330. proc drain*(timeout = 500) =
  1331. ## Waits for completion events and processes them. Raises ``ValueError``
  1332. ## if there are no pending operations. In contrast to ``poll`` this
  1333. ## processes as many events as are available.
  1334. if runOnce(timeout):
  1335. while hasPendingOperations() and runOnce(0): discard
  1336. proc poll*(timeout = 500) =
  1337. ## Waits for completion events and processes them. Raises ``ValueError``
  1338. ## if there are no pending operations. This runs the underlying OS
  1339. ## `epoll`:idx: or `kqueue`:idx: primitive only once.
  1340. discard runOnce(timeout)
  1341. # Common procedures between current and upcoming asyncdispatch
  1342. include includes/asynccommon
  1343. proc sleepAsync*(ms: int | float): Future[void] =
  1344. ## Suspends the execution of the current async procedure for the next
  1345. ## ``ms`` milliseconds.
  1346. var retFuture = newFuture[void]("sleepAsync")
  1347. let p = getGlobalDispatcher()
  1348. p.timers.push((epochTime() + (ms / 1000), retFuture))
  1349. return retFuture
  1350. proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] =
  1351. ## Returns a future which will complete once ``fut`` completes or after
  1352. ## ``timeout`` milliseconds has elapsed.
  1353. ##
  1354. ## If ``fut`` completes first the returned future will hold true,
  1355. ## otherwise, if ``timeout`` milliseconds has elapsed first, the returned
  1356. ## future will hold false.
  1357. var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
  1358. var timeoutFuture = sleepAsync(timeout)
  1359. fut.callback =
  1360. proc () =
  1361. if not retFuture.finished:
  1362. if fut.failed:
  1363. retFuture.fail(fut.error)
  1364. else:
  1365. retFuture.complete(true)
  1366. timeoutFuture.callback =
  1367. proc () =
  1368. if not retFuture.finished: retFuture.complete(false)
  1369. return retFuture
  1370. proc accept*(socket: AsyncFD,
  1371. flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
  1372. ## Accepts a new connection. Returns a future containing the client socket
  1373. ## corresponding to that connection.
  1374. ## The future will complete when the connection is successfully accepted.
  1375. var retFut = newFuture[AsyncFD]("accept")
  1376. var fut = acceptAddr(socket, flags)
  1377. fut.callback =
  1378. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  1379. assert future.finished
  1380. if future.failed:
  1381. retFut.fail(future.error)
  1382. else:
  1383. retFut.complete(future.read.client)
  1384. return retFut
  1385. proc send*(socket: AsyncFD, data: string,
  1386. flags = {SocketFlag.SafeDisconn}): Future[void] =
  1387. ## Sends ``data`` to ``socket``. The returned future will complete once all
  1388. ## data has been sent.
  1389. var retFuture = newFuture[void]("send")
  1390. var copiedData = data
  1391. GC_ref(copiedData) # we need to protect data until send operation is completed
  1392. # or failed.
  1393. let sendFut = socket.send(addr copiedData[0], data.len, flags)
  1394. sendFut.callback =
  1395. proc () =
  1396. GC_unref(copiedData)
  1397. if sendFut.failed:
  1398. retFuture.fail(sendFut.error)
  1399. else:
  1400. retFuture.complete()
  1401. return retFuture
  1402. # -- Await Macro
  1403. include asyncmacro
  1404. proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
  1405. ## Returns a future that will complete when all the string data from the
  1406. ## specified future stream is retrieved.
  1407. result = ""
  1408. while true:
  1409. let (hasValue, value) = await future.read()
  1410. if hasValue:
  1411. result.add(value)
  1412. else:
  1413. break
  1414. proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
  1415. ## Reads a line of data from ``socket``. Returned future will complete once
  1416. ## a full line is read or an error occurs.
  1417. ##
  1418. ## If a full line is read ``\r\L`` is not
  1419. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  1420. ## will be set to it.
  1421. ##
  1422. ## If the socket is disconnected, ``line`` will be set to ``""``.
  1423. ##
  1424. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  1425. ## is read) then line will be set to ``""``.
  1426. ## The partial line **will be lost**.
  1427. ##
  1428. ## **Warning**: This assumes that lines are delimited by ``\r\L``.
  1429. ##
  1430. ## **Note**: This procedure is mostly used for testing. You likely want to
  1431. ## use ``asyncnet.recvLine`` instead.
  1432. ##
  1433. ## **Deprecated since version 0.15.0**: Use ``asyncnet.recvLine()`` instead.
  1434. template addNLIfEmpty(): typed =
  1435. if result.len == 0:
  1436. result.add("\c\L")
  1437. result = ""
  1438. var c = ""
  1439. while true:
  1440. c = await recv(socket, 1)
  1441. if c.len == 0:
  1442. return ""
  1443. if c == "\r":
  1444. c = await recv(socket, 1)
  1445. assert c == "\l"
  1446. addNLIfEmpty()
  1447. return
  1448. elif c == "\L":
  1449. addNLIfEmpty()
  1450. return
  1451. add(result, c)
  1452. proc callSoon(cbproc: proc ()) =
  1453. ## Schedule `cbproc` to be called as soon as possible.
  1454. ## The callback is called when control returns to the event loop.
  1455. getGlobalDispatcher().callbacks.addLast(cbproc)
  1456. proc runForever*() =
  1457. ## Begins a never ending global dispatcher poll loop.
  1458. while true:
  1459. poll()
  1460. proc waitFor*[T](fut: Future[T]): T =
  1461. ## **Blocks** the current thread until the specified future completes.
  1462. while not fut.finished:
  1463. poll()
  1464. fut.read
  1465. proc setEvent*(ev: AsyncEvent) {.deprecated.} =
  1466. ## Set event ``ev`` to signaled state.
  1467. ##
  1468. ## **Deprecated since v0.18.0:** Use ``trigger`` instead.
  1469. ev.trigger()