asyncdispatch.nim 76 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous IO. This includes a dispatcher,
  10. ## a `Future` type implementation, and an `async` macro which allows
  11. ## asynchronous code to be written in a synchronous style with the `await`
  12. ## keyword.
  13. ##
  14. ## The dispatcher acts as a kind of event loop. You must call `poll` on it
  15. ## (or a function which does so for you such as `waitFor` or `runForever`)
  16. ## in order to poll for any outstanding events. The underlying implementation
  17. ## is based on epoll on Linux, IO Completion Ports on Windows and select on
  18. ## other operating systems.
  19. ##
  20. ## The `poll` function will not, on its own, return any events. Instead
  21. ## an appropriate `Future` object will be completed. A `Future` is a
  22. ## type which holds a value which is not yet available, but which *may* be
  23. ## available in the future. You can check whether a future is finished
  24. ## by using the `finished` function. When a future is finished it means that
  25. ## either the value that it holds is now available or it holds an error instead.
  26. ## The latter situation occurs when the operation to complete a future fails
  27. ## with an exception. You can distinguish between the two situations with the
  28. ## `failed` function.
  29. ##
  30. ## Future objects can also store a callback procedure which will be called
  31. ## automatically once the future completes.
  32. ##
  33. ## Futures therefore can be thought of as an implementation of the proactor
  34. ## pattern. In this
  35. ## pattern you make a request for an action, and once that action is fulfilled
  36. ## a future is completed with the result of that action. Requests can be
  37. ## made by calling the appropriate functions. For example: calling the `recv`
  38. ## function will create a request for some data to be read from a socket. The
  39. ## future which the `recv` function returns will then complete once the
  40. ## requested amount of data is read **or** an exception occurs.
  41. ##
  42. ## Code to read some data from a socket may look something like this:
  43. ## ```Nim
  44. ## var future = socket.recv(100)
  45. ## future.addCallback(
  46. ## proc () =
  47. ## echo(future.read)
  48. ## )
  49. ## ```
  50. ##
  51. ## All asynchronous functions returning a `Future` will not block. They
  52. ## will not however return immediately. An asynchronous function will have
  53. ## code which will be executed before an asynchronous request is made, in most
  54. ## cases this code sets up the request.
  55. ##
  56. ## In the above example, the `recv` function will return a brand new
  57. ## `Future` instance once the request for data to be read from the socket
  58. ## is made. This `Future` instance will complete once the requested amount
  59. ## of data is read, in this case it is 100 bytes. The second line sets a
  60. ## callback on this future which will be called once the future completes.
  61. ## All the callback does is write the data stored in the future to `stdout`.
  62. ## The `read` function is used for this and it checks whether the future
  63. ## completes with an error for you (if it did, it will simply raise the
  64. ## error), if there is no error, however, it returns the value of the future.
  65. ##
  66. ## Asynchronous procedures
  67. ## =======================
  68. ##
  69. ## Asynchronous procedures remove the pain of working with callbacks. They do
  70. ## this by allowing you to write asynchronous code the same way as you would
  71. ## write synchronous code.
  72. ##
  73. ## An asynchronous procedure is marked using the `{.async.}` pragma.
  74. ## When marking a procedure with the `{.async.}` pragma it must have a
  75. ## `Future[T]` return type or no return type at all. If you do not specify
  76. ## a return type then `Future[void]` is assumed.
  77. ##
  78. ## Inside asynchronous procedures `await` can be used to call any
  79. ## procedures which return a
  80. ## `Future`; this includes asynchronous procedures. When a procedure is
  81. ## "awaited", the asynchronous procedure it is awaited in will
  82. ## suspend its execution
  83. ## until the awaited procedure's Future completes. At which point the
  84. ## asynchronous procedure will resume its execution. During the period
  85. ## when an asynchronous procedure is suspended other asynchronous procedures
  86. ## will be run by the dispatcher.
  87. ##
  88. ## The `await` call may be used in many contexts. It can be used on the right
  89. ## hand side of a variable declaration: `var data = await socket.recv(100)`,
  90. ## in which case the variable will be set to the value of the future
  91. ## automatically. It can be used to await a `Future` object, and it can
  92. ## be used to await a procedure returning a `Future[void]`:
  93. ## `await socket.send("foobar")`.
  94. ##
  95. ## If an awaited future completes with an error, then `await` will re-raise
  96. ## this error. To avoid this, you can use the `yield` keyword instead of
  97. ## `await`. The following section shows different ways that you can handle
  98. ## exceptions in async procs.
  99. ##
  100. ## .. caution::
  101. ## Procedures marked {.async.} do not support mutable parameters such
  102. ## as `var int`. References such as `ref int` should be used instead.
  103. ##
  104. ## Handling Exceptions
  105. ## -------------------
  106. ##
  107. ## You can handle exceptions in the same way as in ordinary Nim code;
  108. ## by using the try statement:
  109. ##
  110. ## ```Nim
  111. ## try:
  112. ## let data = await sock.recv(100)
  113. ## echo("Received ", data)
  114. ## except:
  115. ## # Handle exception
  116. ## ```
  117. ##
  118. ## An alternative approach to handling exceptions is to use `yield` on a future
  119. ## then check the future's `failed` property. For example:
  120. ##
  121. ## ```Nim
  122. ## var future = sock.recv(100)
  123. ## yield future
  124. ## if future.failed:
  125. ## # Handle exception
  126. ## ```
  127. ##
  128. ##
  129. ## Discarding futures
  130. ## ==================
  131. ##
  132. ## Futures should **never** be discarded directly because they may contain
  133. ## errors. If you do not care for the result of a Future then you should use
  134. ## the `asyncCheck` procedure instead of the `discard` keyword. Note that this
  135. ## does not wait for completion, and you should use `waitFor` or `await` for that purpose.
  136. ##
  137. ## .. note:: `await` also checks if the future fails, so you can safely discard
  138. ## its result.
  139. ##
  140. ## Handling futures
  141. ## ================
  142. ##
  143. ## There are many different operations that apply to a future.
  144. ## The three primary high-level operations are `asyncCheck`,
  145. ## `waitFor`, and `await`.
  146. ##
  147. ## * `asyncCheck`: Raises an exception if the future fails. It neither waits
  148. ## for the future to finish nor returns the result of the future.
  149. ## * `waitFor`: Polls the event loop and blocks the current thread until the
  150. ## future finishes. This is often used to call an async procedure from a
  151. ## synchronous context and should never be used in an `async` proc.
  152. ## * `await`: Pauses execution in the current async procedure until the future
  153. ## finishes. While the current procedure is paused, other async procedures will
  154. ## continue running. Should be used instead of `waitFor` in an async
  155. ## procedure.
  156. ##
  157. ## Here is a handy quick reference chart showing their high-level differences:
  158. ## ============== ===================== =======================
  159. ## Procedure Context Blocking
  160. ## ============== ===================== =======================
  161. ## `asyncCheck` non-async and async non-blocking
  162. ## `waitFor` non-async blocks current thread
  163. ## `await` async suspends current proc
  164. ## ============== ===================== =======================
  165. ##
  166. ## Examples
  167. ## ========
  168. ##
  169. ## For examples take a look at the documentation for the modules implementing
  170. ## asynchronous IO. A good place to start is the
  171. ## `asyncnet module <asyncnet.html>`_.
  172. ##
  173. ## Investigating pending futures
  174. ## =============================
  175. ##
  176. ## It's possible to get into a situation where an async proc, or more accurately
  177. ## a `Future[T]` gets stuck and
  178. ## never completes. This can happen for various reasons and can cause serious
  179. ## memory leaks. When this occurs it's hard to identify the procedure that is
  180. ## stuck.
  181. ##
  182. ## Thankfully there is a mechanism which tracks the count of each pending future.
  183. ## All you need to do to enable it is compile with `-d:futureLogging` and
  184. ## use the `getFuturesInProgress` procedure to get the list of pending futures
  185. ## together with the stack traces to the moment of their creation.
  186. ##
  187. ## You may also find it useful to use this
  188. ## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
  189. ## the pending futures into prometheus, allowing you to analyse them via a nice
  190. ## graph.
  191. ##
  192. ##
  193. ##
  194. ## Limitations/Bugs
  195. ## ================
  196. ##
  197. ## * The effect system (`raises: []`) does not work with async procedures.
  198. ## * Mutable parameters are not supported by async procedures.
  199. ##
  200. ##
  201. ## Multiple async backend support
  202. ## ==============================
  203. ##
  204. ## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be
  205. ## implemented in libraries with only minimal support from the language - as
  206. ## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and
  207. ## ``chronos``, and more may come to be developed in the future.
  208. ##
  209. ## Libraries built on top of async/await may wish to support multiple async
  210. ## backends - the best way to do so is to create separate modules for each backend
  211. ## that may be imported side-by-side.
  212. ##
  213. ## An alternative way is to select backend using a global compile flag - this
  214. ## method makes it difficult to compose applications that use both backends as may
  215. ## happen with transitive dependencies, but may be appropriate in some cases -
  216. ## libraries choosing this path should call the flag `asyncBackend`, allowing
  217. ## applications to choose the backend with `-d:asyncBackend=<backend_name>`.
  218. ##
  219. ## Known `async` backends include:
  220. ##
  221. ## * `-d:asyncBackend=none`: disable `async` support completely
  222. ## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html
  223. ## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/
  224. ##
  225. ## ``none`` can be used when a library supports both a synchronous and
  226. ## asynchronous API, to disable the latter.
  227. import std/[os, tables, strutils, times, heapqueue, options, asyncstreams]
  228. import std/[math, monotimes]
  229. import std/asyncfutures except callSoon
  230. import std/[nativesockets, net, deques]
  231. when defined(nimPreviewSlimSystem):
  232. import std/[assertions, syncio]
  233. export Port, SocketFlag
  234. export asyncfutures except callSoon
  235. export asyncstreams
  236. # TODO: Check if yielded future is nil and throw a more meaningful exception
  237. type
  238. PDispatcherBase = ref object of RootRef
  239. timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
  240. callbacks*: Deque[proc () {.gcsafe.}]
  241. proc processTimers(
  242. p: PDispatcherBase, didSomeWork: var bool
  243. ): Option[int] {.inline.} =
  244. # Pop the timers in the order in which they will expire (smaller `finishAt`).
  245. var count = p.timers.len
  246. let t = getMonoTime()
  247. while count > 0 and t >= p.timers[0].finishAt:
  248. p.timers.pop().fut.complete()
  249. dec count
  250. didSomeWork = true
  251. # Return the number of milliseconds in which the next timer will expire.
  252. if p.timers.len == 0: return
  253. let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
  254. return some(millisecs.int + 1)
  255. proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
  256. while p.callbacks.len > 0:
  257. var cb = p.callbacks.popFirst()
  258. cb()
  259. didSomeWork = true
  260. proc adjustTimeout(
  261. p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
  262. ): int {.inline.} =
  263. if p.callbacks.len != 0:
  264. return 0
  265. if nextTimer.isNone() or pollTimeout == -1:
  266. return pollTimeout
  267. result = max(nextTimer.get(), 0)
  268. result = min(pollTimeout, result)
  269. proc runOnce(timeout: int): bool {.gcsafe.}
  270. proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
  271. ## Schedule `cbproc` to be called as soon as possible.
  272. ## The callback is called when control returns to the event loop.
  273. proc initCallSoonProc =
  274. if asyncfutures.getCallSoonProc().isNil:
  275. asyncfutures.setCallSoonProc(callSoon)
  276. template implementSetInheritable() {.dirty.} =
  277. when declared(setInheritable):
  278. proc setInheritable*(fd: AsyncFD, inheritable: bool): bool =
  279. ## Control whether a file handle can be inherited by child processes.
  280. ## Returns `true` on success.
  281. ##
  282. ## This procedure is not guaranteed to be available for all platforms.
  283. ## Test for availability with `declared() <system.html#declared,untyped>`_.
  284. fd.FileHandle.setInheritable(inheritable)
  285. when defined(windows) or defined(nimdoc):
  286. import std/[winlean, sets, hashes]
  287. type
  288. CompletionKey = ULONG_PTR
  289. CompletionData* = object
  290. fd*: AsyncFD # TODO: Rename this.
  291. cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
  292. errcode: OSErrorCode) {.closure, gcsafe.})
  293. cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
  294. # when using RegisterWaitForSingleObject, because
  295. # waiting is done in different thread.
  296. PDispatcher* = ref object of PDispatcherBase
  297. ioPort: Handle
  298. handles*: HashSet[AsyncFD] # Export handles so that an external library can register them.
  299. CustomObj = object of OVERLAPPED
  300. data*: CompletionData
  301. CustomRef* = ref CustomObj
  302. AsyncFD* = distinct int
  303. PostCallbackData = object
  304. ioPort: Handle
  305. handleFd: AsyncFD
  306. waitFd: Handle
  307. ovl: owned CustomRef
  308. PostCallbackDataPtr = ptr PostCallbackData
  309. AsyncEventImpl = object
  310. hEvent: Handle
  311. hWaiter: Handle
  312. pcd: PostCallbackDataPtr
  313. AsyncEvent* = ptr AsyncEventImpl
  314. Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
  315. proc hash(x: AsyncFD): Hash {.borrow.}
  316. proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
  317. proc newDispatcher*(): owned PDispatcher =
  318. ## Creates a new Dispatcher instance.
  319. new result
  320. result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
  321. result.handles = initHashSet[AsyncFD]()
  322. result.timers.clear()
  323. result.callbacks = initDeque[proc () {.closure, gcsafe.}](64)
  324. var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
  325. proc setGlobalDispatcher*(disp: sink PDispatcher) =
  326. if not gDisp.isNil:
  327. assert gDisp.callbacks.len == 0
  328. gDisp = disp
  329. initCallSoonProc()
  330. proc getGlobalDispatcher*(): PDispatcher =
  331. if gDisp.isNil:
  332. setGlobalDispatcher(newDispatcher())
  333. result = gDisp
  334. proc getIoHandler*(disp: PDispatcher): Handle =
  335. ## Returns the underlying IO Completion Port handle (Windows) or selector
  336. ## (Unix) for the specified dispatcher.
  337. return disp.ioPort
  338. proc register*(fd: AsyncFD) =
  339. ## Registers `fd` with the dispatcher.
  340. let p = getGlobalDispatcher()
  341. if createIoCompletionPort(fd.Handle, p.ioPort,
  342. cast[CompletionKey](fd), 1) == 0:
  343. raiseOSError(osLastError())
  344. p.handles.incl(fd)
  345. proc verifyPresence(fd: AsyncFD) =
  346. ## Ensures that file descriptor has been registered with the dispatcher.
  347. ## Raises ValueError if `fd` has not been registered.
  348. let p = getGlobalDispatcher()
  349. if fd notin p.handles:
  350. raise newException(ValueError,
  351. "Operation performed on a socket which has not been registered with" &
  352. " the dispatcher yet.")
  353. proc hasPendingOperations*(): bool =
  354. ## Returns `true` if the global dispatcher has pending operations.
  355. let p = getGlobalDispatcher()
  356. p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
  357. proc runOnce(timeout: int): bool =
  358. let p = getGlobalDispatcher()
  359. if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
  360. raise newException(ValueError,
  361. "No handles or timers registered in dispatcher.")
  362. result = false
  363. let nextTimer = processTimers(p, result)
  364. let at = adjustTimeout(p, timeout, nextTimer)
  365. var llTimeout =
  366. if at == -1: winlean.INFINITE
  367. else: at.int32
  368. var lpNumberOfBytesTransferred: DWORD
  369. var lpCompletionKey: ULONG_PTR
  370. var customOverlapped: CustomRef
  371. let res = getQueuedCompletionStatus(p.ioPort,
  372. addr lpNumberOfBytesTransferred, addr lpCompletionKey,
  373. cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
  374. result = true
  375. # For 'gcDestructors' the destructor of 'customOverlapped' will
  376. # be called at the end and we are the only owner here. This means
  377. # We do not have to 'GC_unref(customOverlapped)' because the destructor
  378. # does that for us.
  379. # http://stackoverflow.com/a/12277264/492186
  380. # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
  381. if res:
  382. # This is useful for ensuring the reliability of the overlapped struct.
  383. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  384. customOverlapped.data.cb(customOverlapped.data.fd,
  385. lpNumberOfBytesTransferred, OSErrorCode(-1))
  386. # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
  387. # so we need to dispose our `cb` environment, because it is not needed
  388. # anymore.
  389. if customOverlapped.data.cell.data != nil:
  390. system.dispose(customOverlapped.data.cell)
  391. when not defined(gcDestructors):
  392. GC_unref(customOverlapped)
  393. else:
  394. let errCode = osLastError()
  395. if customOverlapped != nil:
  396. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  397. customOverlapped.data.cb(customOverlapped.data.fd,
  398. lpNumberOfBytesTransferred, errCode)
  399. if customOverlapped.data.cell.data != nil:
  400. system.dispose(customOverlapped.data.cell)
  401. when not defined(gcDestructors):
  402. GC_unref(customOverlapped)
  403. else:
  404. if errCode.int32 == WAIT_TIMEOUT:
  405. # Timed out
  406. result = false
  407. else: raiseOSError(errCode)
  408. # Timer processing.
  409. discard processTimers(p, result)
  410. # Callback queue processing
  411. processPendingCallbacks(p, result)
  412. var acceptEx: WSAPROC_ACCEPTEX
  413. var connectEx: WSAPROC_CONNECTEX
  414. var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
  415. proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
  416. # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
  417. var bytesRet: DWORD
  418. fun = nil
  419. result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
  420. sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
  421. addr bytesRet, nil, nil) == 0
  422. proc initAll() =
  423. let dummySock = createNativeSocket()
  424. if dummySock == INVALID_SOCKET:
  425. raiseOSError(osLastError())
  426. var fun: pointer = nil
  427. if not initPointer(dummySock, fun, WSAID_CONNECTEX):
  428. raiseOSError(osLastError())
  429. connectEx = cast[WSAPROC_CONNECTEX](fun)
  430. if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
  431. raiseOSError(osLastError())
  432. acceptEx = cast[WSAPROC_ACCEPTEX](fun)
  433. if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
  434. raiseOSError(osLastError())
  435. getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
  436. close(dummySock)
  437. proc newCustom*(): CustomRef =
  438. result = CustomRef() # 0
  439. GC_ref(result) # 1 prevent destructor from doing a premature free.
  440. # destructor of newCustom's caller --> 0. This means
  441. # Windows holds a ref for us with RC == 0 (single owner).
  442. # This is passed back to us in the IO completion port.
  443. proc recv*(socket: AsyncFD, size: int,
  444. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
  445. ## Reads **up to** `size` bytes from `socket`. Returned future will
  446. ## complete once all the data requested is read, a part of the data has been
  447. ## read, or the socket has disconnected in which case the future will
  448. ## complete with a value of `""`.
  449. ##
  450. ## .. warning:: The `Peek` socket flag is not supported on Windows.
  451. # Things to note:
  452. # * When WSARecv completes immediately then `bytesReceived` is very
  453. # unreliable.
  454. # * Still need to implement message-oriented socket disconnection,
  455. # '\0' in the message currently signifies a socket disconnect. Who
  456. # knows what will happen when someone sends that to our socket.
  457. verifyPresence(socket)
  458. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  459. var retFuture = newFuture[string]("recv")
  460. var dataBuf: TWSABuf
  461. dataBuf.buf = cast[cstring](alloc0(size))
  462. dataBuf.len = size.ULONG
  463. var bytesReceived: DWORD
  464. var flagsio = flags.toOSFlags().DWORD
  465. var ol = newCustom()
  466. ol.data = CompletionData(fd: socket, cb:
  467. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  468. if not retFuture.finished:
  469. if errcode == OSErrorCode(-1):
  470. if bytesCount == 0 and dataBuf.buf[0] == '\0':
  471. retFuture.complete("")
  472. else:
  473. var data = newString(bytesCount)
  474. assert bytesCount <= size
  475. copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
  476. retFuture.complete($data)
  477. else:
  478. if flags.isDisconnectionError(errcode):
  479. retFuture.complete("")
  480. else:
  481. retFuture.fail(newOSError(errcode))
  482. if dataBuf.buf != nil:
  483. dealloc dataBuf.buf
  484. dataBuf.buf = nil
  485. )
  486. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  487. addr flagsio, cast[POVERLAPPED](ol), nil)
  488. if ret == -1:
  489. let err = osLastError()
  490. if err.int32 != ERROR_IO_PENDING:
  491. if dataBuf.buf != nil:
  492. dealloc dataBuf.buf
  493. dataBuf.buf = nil
  494. GC_unref(ol)
  495. if flags.isDisconnectionError(err):
  496. retFuture.complete("")
  497. else:
  498. retFuture.fail(newOSError(err))
  499. elif ret == 0:
  500. # Request completed immediately.
  501. if bytesReceived != 0:
  502. var data = newString(bytesReceived)
  503. assert bytesReceived <= size
  504. copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
  505. retFuture.complete($data)
  506. else:
  507. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  508. retFuture.complete("")
  509. return retFuture
  510. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  511. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  512. ## Reads **up to** `size` bytes from `socket` into `buf`, which must
  513. ## at least be of that size. Returned future will complete once all the
  514. ## data requested is read, a part of the data has been read, or the socket
  515. ## has disconnected in which case the future will complete with a value of
  516. ## `0`.
  517. ##
  518. ## .. warning:: The `Peek` socket flag is not supported on Windows.
  519. # Things to note:
  520. # * When WSARecv completes immediately then `bytesReceived` is very
  521. # unreliable.
  522. # * Still need to implement message-oriented socket disconnection,
  523. # '\0' in the message currently signifies a socket disconnect. Who
  524. # knows what will happen when someone sends that to our socket.
  525. verifyPresence(socket)
  526. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  527. var retFuture = newFuture[int]("recvInto")
  528. #buf[] = '\0'
  529. var dataBuf: TWSABuf
  530. dataBuf.buf = cast[cstring](buf)
  531. dataBuf.len = size.ULONG
  532. var bytesReceived: DWORD
  533. var flagsio = flags.toOSFlags().DWORD
  534. var ol = newCustom()
  535. ol.data = CompletionData(fd: socket, cb:
  536. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  537. if not retFuture.finished:
  538. if errcode == OSErrorCode(-1):
  539. retFuture.complete(bytesCount)
  540. else:
  541. if flags.isDisconnectionError(errcode):
  542. retFuture.complete(0)
  543. else:
  544. retFuture.fail(newOSError(errcode))
  545. if dataBuf.buf != nil:
  546. dataBuf.buf = nil
  547. )
  548. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  549. addr flagsio, cast[POVERLAPPED](ol), nil)
  550. if ret == -1:
  551. let err = osLastError()
  552. if err.int32 != ERROR_IO_PENDING:
  553. if dataBuf.buf != nil:
  554. dataBuf.buf = nil
  555. GC_unref(ol)
  556. if flags.isDisconnectionError(err):
  557. retFuture.complete(0)
  558. else:
  559. retFuture.fail(newOSError(err))
  560. elif ret == 0:
  561. # Request completed immediately.
  562. if bytesReceived != 0:
  563. assert bytesReceived <= size
  564. retFuture.complete(bytesReceived)
  565. else:
  566. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  567. retFuture.complete(bytesReceived)
  568. return retFuture
  569. proc send*(socket: AsyncFD, buf: pointer, size: int,
  570. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  571. ## Sends `size` bytes from `buf` to `socket`. The returned future
  572. ## will complete once all data has been sent.
  573. ##
  574. ## .. warning:: Use it with caution. If `buf` refers to GC'ed object,
  575. ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
  576. verifyPresence(socket)
  577. var retFuture = newFuture[void]("send")
  578. var dataBuf: TWSABuf
  579. dataBuf.buf = cast[cstring](buf)
  580. dataBuf.len = size.ULONG
  581. var bytesReceived, lowFlags: DWORD
  582. var ol = newCustom()
  583. ol.data = CompletionData(fd: socket, cb:
  584. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  585. if not retFuture.finished:
  586. if errcode == OSErrorCode(-1):
  587. retFuture.complete()
  588. else:
  589. if flags.isDisconnectionError(errcode):
  590. retFuture.complete()
  591. else:
  592. retFuture.fail(newOSError(errcode))
  593. )
  594. let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  595. lowFlags, cast[POVERLAPPED](ol), nil)
  596. if ret == -1:
  597. let err = osLastError()
  598. if err.int32 != ERROR_IO_PENDING:
  599. GC_unref(ol)
  600. if flags.isDisconnectionError(err):
  601. retFuture.complete()
  602. else:
  603. retFuture.fail(newOSError(err))
  604. else:
  605. retFuture.complete()
  606. # We don't deallocate `ol` here because even though this completed
  607. # immediately poll will still be notified about its completion and it will
  608. # free `ol`.
  609. return retFuture
  610. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  611. saddrLen: SockLen,
  612. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  613. ## Sends `data` to specified destination `saddr`, using
  614. ## socket `socket`. The returned future will complete once all data
  615. ## has been sent.
  616. verifyPresence(socket)
  617. var retFuture = newFuture[void]("sendTo")
  618. var dataBuf: TWSABuf
  619. dataBuf.buf = cast[cstring](data)
  620. dataBuf.len = size.ULONG
  621. var bytesSent = 0.DWORD
  622. var lowFlags = 0.DWORD
  623. # we will preserve address in our stack
  624. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  625. var stalen: cint = cint(saddrLen)
  626. zeroMem(addr(staddr[0]), 128)
  627. copyMem(addr(staddr[0]), saddr, saddrLen)
  628. var ol = newCustom()
  629. ol.data = CompletionData(fd: socket, cb:
  630. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  631. if not retFuture.finished:
  632. if errcode == OSErrorCode(-1):
  633. retFuture.complete()
  634. else:
  635. retFuture.fail(newOSError(errcode))
  636. )
  637. let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
  638. lowFlags, cast[ptr SockAddr](addr(staddr[0])),
  639. stalen, cast[POVERLAPPED](ol), nil)
  640. if ret == -1:
  641. let err = osLastError()
  642. if err.int32 != ERROR_IO_PENDING:
  643. GC_unref(ol)
  644. retFuture.fail(newOSError(err))
  645. else:
  646. retFuture.complete()
  647. # We don't deallocate `ol` here because even though this completed
  648. # immediately poll will still be notified about its completion and it will
  649. # free `ol`.
  650. return retFuture
  651. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  652. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  653. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  654. ## Receives a datagram data from `socket` into `buf`, which must
  655. ## be at least of size `size`, address of datagram's sender will be
  656. ## stored into `saddr` and `saddrLen`. Returned future will complete
  657. ## once one datagram has been received, and will return size of packet
  658. ## received.
  659. verifyPresence(socket)
  660. var retFuture = newFuture[int]("recvFromInto")
  661. var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
  662. var bytesReceived = 0.DWORD
  663. var lowFlags = 0.DWORD
  664. var ol = newCustom()
  665. ol.data = CompletionData(fd: socket, cb:
  666. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  667. if not retFuture.finished:
  668. if errcode == OSErrorCode(-1):
  669. assert bytesCount <= size
  670. retFuture.complete(bytesCount)
  671. else:
  672. # datagram sockets don't have disconnection,
  673. # so we can just raise an exception
  674. retFuture.fail(newOSError(errcode))
  675. )
  676. let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
  677. addr bytesReceived, addr lowFlags,
  678. saddr, cast[ptr cint](saddrLen),
  679. cast[POVERLAPPED](ol), nil)
  680. if res == -1:
  681. let err = osLastError()
  682. if err.int32 != ERROR_IO_PENDING:
  683. GC_unref(ol)
  684. retFuture.fail(newOSError(err))
  685. else:
  686. # Request completed immediately.
  687. if bytesReceived != 0:
  688. assert bytesReceived <= size
  689. retFuture.complete(bytesReceived)
  690. else:
  691. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  692. retFuture.complete(bytesReceived)
  693. return retFuture
  694. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
  695. inheritable = defined(nimInheritHandles)):
  696. owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} =
  697. ## Accepts a new connection. Returns a future containing the client socket
  698. ## corresponding to that connection and the remote address of the client.
  699. ## The future will complete when the connection is successfully accepted.
  700. ##
  701. ## The resulting client socket is automatically registered to the
  702. ## dispatcher.
  703. ##
  704. ## If `inheritable` is false (the default), the resulting client socket will
  705. ## not be inheritable by child processes.
  706. ##
  707. ## The `accept` call may result in an error if the connecting socket
  708. ## disconnects during the duration of the `accept`. If the `SafeDisconn`
  709. ## flag is specified then this error will not be raised and instead
  710. ## accept will be called again.
  711. verifyPresence(socket)
  712. var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
  713. var clientSock = createNativeSocket(inheritable = inheritable)
  714. if clientSock == osInvalidSocket: raiseOSError(osLastError())
  715. const lpOutputLen = 1024
  716. var lpOutputBuf = newString(lpOutputLen)
  717. var dwBytesReceived: DWORD
  718. let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
  719. let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
  720. let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
  721. template failAccept(errcode) =
  722. if flags.isDisconnectionError(errcode):
  723. var newAcceptFut = acceptAddr(socket, flags)
  724. newAcceptFut.callback =
  725. proc () =
  726. if newAcceptFut.failed:
  727. retFuture.fail(newAcceptFut.readError)
  728. else:
  729. retFuture.complete(newAcceptFut.read)
  730. else:
  731. retFuture.fail(newOSError(errcode))
  732. template completeAccept() {.dirty.} =
  733. var listenSock = socket
  734. let setoptRet = setsockopt(clientSock, SOL_SOCKET,
  735. SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
  736. sizeof(listenSock).SockLen)
  737. if setoptRet != 0:
  738. let errcode = osLastError()
  739. discard clientSock.closesocket()
  740. failAccept(errcode)
  741. else:
  742. var localSockaddr, remoteSockaddr: ptr SockAddr
  743. var localLen, remoteLen: int32
  744. getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
  745. dwLocalAddressLength, dwRemoteAddressLength,
  746. addr localSockaddr, addr localLen,
  747. addr remoteSockaddr, addr remoteLen)
  748. try:
  749. let address = getAddrString(remoteSockaddr)
  750. register(clientSock.AsyncFD)
  751. retFuture.complete((address: address, client: clientSock.AsyncFD))
  752. except:
  753. # getAddrString may raise
  754. clientSock.close()
  755. retFuture.fail(getCurrentException())
  756. var ol = newCustom()
  757. ol.data = CompletionData(fd: socket, cb:
  758. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
  759. if not retFuture.finished:
  760. if errcode == OSErrorCode(-1):
  761. completeAccept()
  762. else:
  763. failAccept(errcode)
  764. )
  765. # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
  766. let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
  767. dwReceiveDataLength,
  768. dwLocalAddressLength,
  769. dwRemoteAddressLength,
  770. addr dwBytesReceived, cast[POVERLAPPED](ol))
  771. if not ret:
  772. let err = osLastError()
  773. if err.int32 != ERROR_IO_PENDING:
  774. failAccept(err)
  775. GC_unref(ol)
  776. else:
  777. completeAccept()
  778. # We don't deallocate `ol` here because even though this completed
  779. # immediately poll will still be notified about its completion and it will
  780. # free `ol`.
  781. return retFuture
  782. implementSetInheritable()
  783. proc closeSocket*(socket: AsyncFD) =
  784. ## Closes a socket and ensures that it is unregistered.
  785. socket.SocketHandle.close()
  786. getGlobalDispatcher().handles.excl(socket)
  787. proc unregister*(fd: AsyncFD) =
  788. ## Unregisters `fd`.
  789. getGlobalDispatcher().handles.excl(fd)
  790. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  791. return fd in disp.handles
  792. {.push stackTrace: off.}
  793. proc waitableCallback(param: pointer,
  794. timerOrWaitFired: WINBOOL) {.stdcall.} =
  795. var p = cast[PostCallbackDataPtr](param)
  796. discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
  797. ULONG_PTR(p.handleFd),
  798. cast[pointer](p.ovl))
  799. {.pop.}
  800. proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
  801. let p = getGlobalDispatcher()
  802. var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
  803. var hEvent = wsaCreateEvent()
  804. if hEvent == 0:
  805. raiseOSError(osLastError())
  806. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  807. pcd.ioPort = p.ioPort
  808. pcd.handleFd = fd
  809. var ol = newCustom()
  810. ol.data = CompletionData(fd: fd, cb:
  811. proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
  812. # we excluding our `fd` because cb(fd) can register own handler
  813. # for this `fd`
  814. p.handles.excl(fd)
  815. # unregisterWait() is called before callback, because appropriate
  816. # winsockets function can re-enable event.
  817. # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
  818. if unregisterWait(pcd.waitFd) == 0:
  819. let err = osLastError()
  820. if err.int32 != ERROR_IO_PENDING:
  821. deallocShared(cast[pointer](pcd))
  822. discard wsaCloseEvent(hEvent)
  823. raiseOSError(err)
  824. if cb(fd):
  825. # callback returned `true`, so we free all allocated resources
  826. deallocShared(cast[pointer](pcd))
  827. if not wsaCloseEvent(hEvent):
  828. raiseOSError(osLastError())
  829. # pcd.ovl will be unrefed in poll().
  830. else:
  831. # callback returned `false` we need to continue
  832. if p.handles.contains(fd):
  833. # new callback was already registered with `fd`, so we free all
  834. # allocated resources. This happens because in callback `cb`
  835. # addRead/addWrite was called with same `fd`.
  836. deallocShared(cast[pointer](pcd))
  837. if not wsaCloseEvent(hEvent):
  838. raiseOSError(osLastError())
  839. else:
  840. # we need to include `fd` again
  841. p.handles.incl(fd)
  842. # and register WaitForSingleObject again
  843. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  844. cast[WAITORTIMERCALLBACK](waitableCallback),
  845. cast[pointer](pcd), INFINITE, flags):
  846. # pcd.ovl will be unrefed in poll()
  847. let err = osLastError()
  848. deallocShared(cast[pointer](pcd))
  849. discard wsaCloseEvent(hEvent)
  850. raiseOSError(err)
  851. else:
  852. # we incref `pcd.ovl` and `protect` callback one more time,
  853. # because it will be unrefed and disposed in `poll()` after
  854. # callback finishes.
  855. GC_ref(pcd.ovl)
  856. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  857. )
  858. # We need to protect our callback environment value, so GC will not free it
  859. # accidentally.
  860. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  861. # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
  862. # will be signaled when appropriate `mask` events will be triggered.
  863. if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
  864. let err = osLastError()
  865. GC_unref(ol)
  866. deallocShared(cast[pointer](pcd))
  867. discard wsaCloseEvent(hEvent)
  868. raiseOSError(err)
  869. pcd.ovl = ol
  870. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  871. cast[WAITORTIMERCALLBACK](waitableCallback),
  872. cast[pointer](pcd), INFINITE, flags):
  873. let err = osLastError()
  874. GC_unref(ol)
  875. deallocShared(cast[pointer](pcd))
  876. discard wsaCloseEvent(hEvent)
  877. raiseOSError(err)
  878. p.handles.incl(fd)
  879. proc addRead*(fd: AsyncFD, cb: Callback) =
  880. ## Start watching the file descriptor for read availability and then call
  881. ## the callback `cb`.
  882. ##
  883. ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
  884. ## so if you can avoid it, please do it. Use `addRead` only if really
  885. ## need it (main usecase is adaptation of unix-like libraries to be
  886. ## asynchronous on Windows).
  887. ##
  888. ## If you use this function, you don't need to use asyncdispatch.recv()
  889. ## or asyncdispatch.accept(), because they are using IOCP, please use
  890. ## nativesockets.recv() and nativesockets.accept() instead.
  891. ##
  892. ## Be sure your callback `cb` returns `true`, if you want to remove
  893. ## watch of `read` notifications, and `false`, if you want to continue
  894. ## receiving notifications.
  895. registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
  896. proc addWrite*(fd: AsyncFD, cb: Callback) =
  897. ## Start watching the file descriptor for write availability and then call
  898. ## the callback `cb`.
  899. ##
  900. ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
  901. ## so if you can avoid it, please do it. Use `addWrite` only if really
  902. ## need it (main usecase is adaptation of unix-like libraries to be
  903. ## asynchronous on Windows).
  904. ##
  905. ## If you use this function, you don't need to use asyncdispatch.send()
  906. ## or asyncdispatch.connect(), because they are using IOCP, please use
  907. ## nativesockets.send() and nativesockets.connect() instead.
  908. ##
  909. ## Be sure your callback `cb` returns `true`, if you want to remove
  910. ## watch of `write` notifications, and `false`, if you want to continue
  911. ## receiving notifications.
  912. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
  913. template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
  914. handleCallback) =
  915. let handleFD = AsyncFD(hEvent)
  916. pcd.ioPort = p.ioPort
  917. pcd.handleFd = handleFD
  918. var ol = newCustom()
  919. ol.data.fd = handleFD
  920. ol.data.cb = handleCallback
  921. # We need to protect our callback environment value, so GC will not free it
  922. # accidentally.
  923. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  924. pcd.ovl = ol
  925. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  926. cast[WAITORTIMERCALLBACK](waitableCallback),
  927. cast[pointer](pcd), timeout.DWORD, flags):
  928. let err = osLastError()
  929. GC_unref(ol)
  930. deallocShared(cast[pointer](pcd))
  931. discard closeHandle(hEvent)
  932. raiseOSError(err)
  933. p.handles.incl(handleFD)
  934. template closeWaitable(handle: untyped) =
  935. let waitFd = pcd.waitFd
  936. deallocShared(cast[pointer](pcd))
  937. p.handles.excl(fd)
  938. if unregisterWait(waitFd) == 0:
  939. let err = osLastError()
  940. if err.int32 != ERROR_IO_PENDING:
  941. discard closeHandle(handle)
  942. raiseOSError(err)
  943. if closeHandle(handle) == 0:
  944. raiseOSError(osLastError())
  945. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  946. ## Registers callback `cb` to be called when timer expired.
  947. ##
  948. ## Parameters:
  949. ##
  950. ## * `timeout` - timeout value in milliseconds.
  951. ## * `oneshot`
  952. ## * `true` - generate only one timeout event
  953. ## * `false` - generate timeout events periodically
  954. doAssert(timeout > 0)
  955. let p = getGlobalDispatcher()
  956. var hEvent = createEvent(nil, 1, 0, nil)
  957. if hEvent == INVALID_HANDLE_VALUE:
  958. raiseOSError(osLastError())
  959. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  960. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  961. if oneshot: flags = flags or WT_EXECUTEONLYONCE
  962. proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  963. let res = cb(fd)
  964. if res or oneshot:
  965. closeWaitable(hEvent)
  966. else:
  967. # if callback returned `false`, then it wants to be called again, so
  968. # we need to ref and protect `pcd.ovl` again, because it will be
  969. # unrefed and disposed in `poll()`.
  970. GC_ref(pcd.ovl)
  971. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  972. registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
  973. proc addProcess*(pid: int, cb: Callback) =
  974. ## Registers callback `cb` to be called when process with process ID
  975. ## `pid` exited.
  976. const NULL = Handle(0)
  977. let p = getGlobalDispatcher()
  978. let procFlags = SYNCHRONIZE
  979. var hProcess = openProcess(procFlags, 0, pid.DWORD)
  980. if hProcess == NULL:
  981. raiseOSError(osLastError())
  982. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  983. var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD
  984. proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  985. closeWaitable(hProcess)
  986. discard cb(fd)
  987. registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
  988. proc newAsyncEvent*(): AsyncEvent =
  989. ## Creates a new thread-safe `AsyncEvent` object.
  990. ##
  991. ## New `AsyncEvent` object is not automatically registered with
  992. ## dispatcher like `AsyncSocket`.
  993. var sa = SECURITY_ATTRIBUTES(
  994. nLength: sizeof(SECURITY_ATTRIBUTES).cint,
  995. bInheritHandle: 1
  996. )
  997. var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
  998. if event == INVALID_HANDLE_VALUE:
  999. raiseOSError(osLastError())
  1000. result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
  1001. result.hEvent = event
  1002. proc trigger*(ev: AsyncEvent) =
  1003. ## Set event `ev` to signaled state.
  1004. if setEvent(ev.hEvent) == 0:
  1005. raiseOSError(osLastError())
  1006. proc unregister*(ev: AsyncEvent) =
  1007. ## Unregisters event `ev`.
  1008. doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
  1009. let p = getGlobalDispatcher()
  1010. p.handles.excl(AsyncFD(ev.hEvent))
  1011. if unregisterWait(ev.hWaiter) == 0:
  1012. let err = osLastError()
  1013. if err.int32 != ERROR_IO_PENDING:
  1014. raiseOSError(err)
  1015. ev.hWaiter = 0
  1016. proc close*(ev: AsyncEvent) =
  1017. ## Closes event `ev`.
  1018. let res = closeHandle(ev.hEvent)
  1019. deallocShared(cast[pointer](ev))
  1020. if res == 0:
  1021. raiseOSError(osLastError())
  1022. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  1023. ## Registers callback `cb` to be called when `ev` will be signaled
  1024. doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
  1025. let p = getGlobalDispatcher()
  1026. let hEvent = ev.hEvent
  1027. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  1028. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  1029. proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  1030. if ev.hWaiter != 0:
  1031. if cb(fd):
  1032. # we need this check to avoid exception, if `unregister(event)` was
  1033. # called in callback.
  1034. deallocShared(cast[pointer](pcd))
  1035. if ev.hWaiter != 0:
  1036. unregister(ev)
  1037. else:
  1038. # if callback returned `false`, then it wants to be called again, so
  1039. # we need to ref and protect `pcd.ovl` again, because it will be
  1040. # unrefed and disposed in `poll()`.
  1041. GC_ref(pcd.ovl)
  1042. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  1043. else:
  1044. # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
  1045. deallocShared(cast[pointer](pcd))
  1046. registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
  1047. ev.hWaiter = pcd.waitFd
  1048. initAll()
  1049. else:
  1050. import std/selectors
  1051. from std/posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
  1052. MSG_NOSIGNAL
  1053. when declared(posix.accept4):
  1054. from std/posix import accept4, SOCK_CLOEXEC
  1055. when defined(genode):
  1056. import genode/env # get the implicit Genode env
  1057. import genode/signals
  1058. const
  1059. InitCallbackListSize = 4 # initial size of callbacks sequence,
  1060. # associated with file/socket descriptor.
  1061. InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
  1062. # queue.
  1063. type
  1064. AsyncFD* = distinct cint
  1065. Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
  1066. AsyncData = object
  1067. readList: seq[Callback]
  1068. writeList: seq[Callback]
  1069. AsyncEvent* = distinct SelectEvent
  1070. PDispatcher* = ref object of PDispatcherBase
  1071. selector: Selector[AsyncData]
  1072. when defined(genode):
  1073. signalHandler: SignalHandler
  1074. proc `==`*(x, y: AsyncFD): bool {.borrow.}
  1075. proc `==`*(x, y: AsyncEvent): bool {.borrow.}
  1076. template newAsyncData(): AsyncData =
  1077. AsyncData(
  1078. readList: newSeqOfCap[Callback](InitCallbackListSize),
  1079. writeList: newSeqOfCap[Callback](InitCallbackListSize)
  1080. )
  1081. proc newDispatcher*(): owned(PDispatcher) =
  1082. new result
  1083. result.selector = newSelector[AsyncData]()
  1084. result.timers.clear()
  1085. result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
  1086. when defined(genode):
  1087. let entrypoint = ep(cast[GenodeEnv](runtimeEnv))
  1088. result.signalHandler = newSignalHandler(entrypoint):
  1089. discard runOnce(0)
  1090. var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
  1091. when defined(nuttx):
  1092. import std/exitprocs
  1093. proc cleanDispatcher() {.noconv.} =
  1094. gDisp = nil
  1095. proc addFinalyzer() =
  1096. addExitProc(cleanDispatcher)
  1097. proc setGlobalDispatcher*(disp: owned PDispatcher) =
  1098. if not gDisp.isNil:
  1099. assert gDisp.callbacks.len == 0
  1100. gDisp = disp
  1101. initCallSoonProc()
  1102. proc getGlobalDispatcher*(): PDispatcher =
  1103. if gDisp.isNil:
  1104. setGlobalDispatcher(newDispatcher())
  1105. when defined(nuttx):
  1106. addFinalyzer()
  1107. result = gDisp
  1108. proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
  1109. return disp.selector
  1110. proc register*(fd: AsyncFD) =
  1111. let p = getGlobalDispatcher()
  1112. var data = newAsyncData()
  1113. p.selector.registerHandle(fd.SocketHandle, {}, data)
  1114. proc unregister*(fd: AsyncFD) =
  1115. getGlobalDispatcher().selector.unregister(fd.SocketHandle)
  1116. proc unregister*(ev: AsyncEvent) =
  1117. getGlobalDispatcher().selector.unregister(SelectEvent(ev))
  1118. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  1119. return fd.SocketHandle in disp.selector
  1120. proc addRead*(fd: AsyncFD, cb: Callback) =
  1121. let p = getGlobalDispatcher()
  1122. var newEvents = {Event.Read}
  1123. withData(p.selector, fd.SocketHandle, adata) do:
  1124. adata.readList.add(cb)
  1125. newEvents.incl(Event.Read)
  1126. if len(adata.writeList) != 0: newEvents.incl(Event.Write)
  1127. do:
  1128. raise newException(ValueError, "File descriptor not registered.")
  1129. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1130. proc addWrite*(fd: AsyncFD, cb: Callback) =
  1131. let p = getGlobalDispatcher()
  1132. var newEvents = {Event.Write}
  1133. withData(p.selector, fd.SocketHandle, adata) do:
  1134. adata.writeList.add(cb)
  1135. newEvents.incl(Event.Write)
  1136. if len(adata.readList) != 0: newEvents.incl(Event.Read)
  1137. do:
  1138. raise newException(ValueError, "File descriptor not registered.")
  1139. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1140. proc hasPendingOperations*(): bool =
  1141. let p = getGlobalDispatcher()
  1142. not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
  1143. proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) =
  1144. var old = move dest
  1145. dest = src
  1146. for i in 0..high(old):
  1147. dest.add(move old[i])
  1148. proc processBasicCallbacks(
  1149. fd: AsyncFD, event: Event
  1150. ): tuple[readCbListCount, writeCbListCount: int] =
  1151. # Process pending descriptor and AsyncEvent callbacks.
  1152. #
  1153. # Invoke every callback stored in `rwlist`, until one
  1154. # returns `false` (which means callback wants to stay
  1155. # alive). In such case all remaining callbacks will be added
  1156. # to `rwlist` again, in the order they have been inserted.
  1157. #
  1158. # `rwlist` associated with file descriptor MUST BE emptied before
  1159. # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
  1160. # or it can be possible to fall into endless cycle.
  1161. var curList: seq[Callback]
  1162. let selector = getGlobalDispatcher().selector
  1163. withData(selector, fd.int, fdData):
  1164. case event
  1165. of Event.Read:
  1166. #shallowCopy(curList, fdData.readList)
  1167. curList = move fdData.readList
  1168. fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1169. of Event.Write:
  1170. #shallowCopy(curList, fdData.writeList)
  1171. curList = move fdData.writeList
  1172. fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
  1173. else:
  1174. assert false, "Cannot process callbacks for " & $event
  1175. let newLength = max(len(curList), InitCallbackListSize)
  1176. var newList = newSeqOfCap[Callback](newLength)
  1177. var eventsExtinguished = false
  1178. for cb in curList:
  1179. if eventsExtinguished:
  1180. newList.add(cb)
  1181. elif not cb(fd):
  1182. # Callback wants to be called again.
  1183. newList.add(cb)
  1184. # This callback has returned with EAGAIN, so we don't need to
  1185. # call any other callbacks as they are all waiting for the same event
  1186. # on the same fd.
  1187. # We do need to ensure they are called again though.
  1188. eventsExtinguished = true
  1189. withData(selector, fd.int, fdData) do:
  1190. # Descriptor is still present in the queue.
  1191. case event
  1192. of Event.Read: prependSeq(fdData.readList, newList)
  1193. of Event.Write: prependSeq(fdData.writeList, newList)
  1194. else:
  1195. assert false, "Cannot process callbacks for " & $event
  1196. result.readCbListCount = len(fdData.readList)
  1197. result.writeCbListCount = len(fdData.writeList)
  1198. do:
  1199. # Descriptor was unregistered in callback via `unregister()`.
  1200. result.readCbListCount = -1
  1201. result.writeCbListCount = -1
  1202. proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) =
  1203. # Process pending custom event callbacks. Custom events are
  1204. # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
  1205. # There can be only one callback registered with one descriptor,
  1206. # so there is no need to iterate over list.
  1207. var curList: seq[Callback]
  1208. withData(p.selector, fd.int, adata) do:
  1209. curList = move adata.readList
  1210. adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1211. let newLength = len(curList)
  1212. var newList = newSeqOfCap[Callback](newLength)
  1213. var cb = curList[0]
  1214. if not cb(fd):
  1215. newList.add(cb)
  1216. withData(p.selector, fd.int, adata) do:
  1217. # descriptor still present in queue.
  1218. adata.readList = newList & adata.readList
  1219. if len(adata.readList) == 0:
  1220. # if no callbacks registered with descriptor, unregister it.
  1221. p.selector.unregister(fd.int)
  1222. do:
  1223. # descriptor was unregistered in callback via `unregister()`.
  1224. discard
  1225. implementSetInheritable()
  1226. proc closeSocket*(sock: AsyncFD) =
  1227. let selector = getGlobalDispatcher().selector
  1228. if sock.SocketHandle notin selector:
  1229. raise newException(ValueError, "File descriptor not registered.")
  1230. let data = selector.getData(sock.SocketHandle)
  1231. sock.unregister()
  1232. sock.SocketHandle.close()
  1233. # We need to unblock the read and write callbacks which could still be
  1234. # waiting for the socket to become readable and/or writeable.
  1235. for cb in data.readList & data.writeList:
  1236. if not cb(sock):
  1237. raise newException(
  1238. ValueError, "Expecting async operations to stop when fd has closed."
  1239. )
  1240. proc runOnce(timeout: int): bool =
  1241. let p = getGlobalDispatcher()
  1242. if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
  1243. when defined(genode):
  1244. if timeout == 0: return
  1245. raise newException(ValueError,
  1246. "No handles or timers registered in dispatcher.")
  1247. result = false
  1248. var keys: array[64, ReadyKey]
  1249. let nextTimer = processTimers(p, result)
  1250. var count =
  1251. p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
  1252. for i in 0..<count:
  1253. let fd = keys[i].fd.AsyncFD
  1254. let events = keys[i].events
  1255. var (readCbListCount, writeCbListCount) = (0, 0)
  1256. if Event.Read in events or events == {Event.Error}:
  1257. (readCbListCount, writeCbListCount) =
  1258. processBasicCallbacks(fd, Event.Read)
  1259. result = true
  1260. if Event.Write in events or events == {Event.Error}:
  1261. (readCbListCount, writeCbListCount) =
  1262. processBasicCallbacks(fd, Event.Write)
  1263. result = true
  1264. var isCustomEvent = false
  1265. if Event.User in events:
  1266. (readCbListCount, writeCbListCount) =
  1267. processBasicCallbacks(fd, Event.Read)
  1268. isCustomEvent = true
  1269. if readCbListCount == 0:
  1270. p.selector.unregister(fd.int)
  1271. result = true
  1272. when ioselSupportedPlatform:
  1273. const customSet = {Event.Timer, Event.Signal, Event.Process,
  1274. Event.Vnode}
  1275. if (customSet * events) != {}:
  1276. isCustomEvent = true
  1277. processCustomCallbacks(p, fd)
  1278. result = true
  1279. # because state `data` can be modified in callback we need to update
  1280. # descriptor events with currently registered callbacks.
  1281. if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
  1282. var newEvents: set[Event] = {}
  1283. if readCbListCount > 0: incl(newEvents, Event.Read)
  1284. if writeCbListCount > 0: incl(newEvents, Event.Write)
  1285. p.selector.updateHandle(SocketHandle(fd), newEvents)
  1286. # Timer processing.
  1287. discard processTimers(p, result)
  1288. # Callback queue processing
  1289. processPendingCallbacks(p, result)
  1290. proc recv*(socket: AsyncFD, size: int,
  1291. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
  1292. var retFuture = newFuture[string]("recv")
  1293. var readBuffer = newString(size)
  1294. proc cb(sock: AsyncFD): bool =
  1295. result = true
  1296. let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
  1297. flags.toOSFlags())
  1298. if res < 0:
  1299. let lastError = osLastError()
  1300. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1301. lastError.int32 != EAGAIN:
  1302. if flags.isDisconnectionError(lastError):
  1303. retFuture.complete("")
  1304. else:
  1305. retFuture.fail(newOSError(lastError))
  1306. else:
  1307. result = false # We still want this callback to be called.
  1308. elif res == 0:
  1309. # Disconnected
  1310. retFuture.complete("")
  1311. else:
  1312. readBuffer.setLen(res)
  1313. retFuture.complete(readBuffer)
  1314. # TODO: The following causes a massive slowdown.
  1315. #if not cb(socket):
  1316. addRead(socket, cb)
  1317. return retFuture
  1318. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  1319. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  1320. var retFuture = newFuture[int]("recvInto")
  1321. proc cb(sock: AsyncFD): bool =
  1322. result = true
  1323. let res = recv(sock.SocketHandle, buf, size.cint,
  1324. flags.toOSFlags())
  1325. if res < 0:
  1326. let lastError = osLastError()
  1327. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1328. lastError.int32 != EAGAIN:
  1329. if flags.isDisconnectionError(lastError):
  1330. retFuture.complete(0)
  1331. else:
  1332. retFuture.fail(newOSError(lastError))
  1333. else:
  1334. result = false # We still want this callback to be called.
  1335. else:
  1336. retFuture.complete(res)
  1337. # TODO: The following causes a massive slowdown.
  1338. #if not cb(socket):
  1339. addRead(socket, cb)
  1340. return retFuture
  1341. proc send*(socket: AsyncFD, buf: pointer, size: int,
  1342. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1343. var retFuture = newFuture[void]("send")
  1344. var written = 0
  1345. proc cb(sock: AsyncFD): bool =
  1346. result = true
  1347. let netSize = size-written
  1348. var d = cast[cstring](buf)
  1349. let res = send(sock.SocketHandle, addr d[written], netSize.cint,
  1350. MSG_NOSIGNAL)
  1351. if res < 0:
  1352. let lastError = osLastError()
  1353. if lastError.int32 != EINTR and
  1354. lastError.int32 != EWOULDBLOCK and
  1355. lastError.int32 != EAGAIN:
  1356. if flags.isDisconnectionError(lastError):
  1357. retFuture.complete()
  1358. else:
  1359. retFuture.fail(newOSError(lastError))
  1360. else:
  1361. result = false # We still want this callback to be called.
  1362. else:
  1363. written.inc(res)
  1364. if res != netSize:
  1365. result = false # We still have data to send.
  1366. else:
  1367. retFuture.complete()
  1368. # TODO: The following causes crashes.
  1369. #if not cb(socket):
  1370. addWrite(socket, cb)
  1371. return retFuture
  1372. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  1373. saddrLen: SockLen,
  1374. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1375. ## Sends `data` of size `size` in bytes to specified destination
  1376. ## (`saddr` of size `saddrLen` in bytes, using socket `socket`.
  1377. ## The returned future will complete once all data has been sent.
  1378. var retFuture = newFuture[void]("sendTo")
  1379. # we will preserve address in our stack
  1380. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  1381. var stalen = saddrLen
  1382. zeroMem(addr(staddr[0]), 128)
  1383. copyMem(addr(staddr[0]), saddr, saddrLen)
  1384. proc cb(sock: AsyncFD): bool =
  1385. result = true
  1386. let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
  1387. cast[ptr SockAddr](addr(staddr[0])), stalen)
  1388. if res < 0:
  1389. let lastError = osLastError()
  1390. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1391. lastError.int32 != EAGAIN:
  1392. retFuture.fail(newOSError(lastError))
  1393. else:
  1394. result = false # We still want this callback to be called.
  1395. else:
  1396. retFuture.complete()
  1397. addWrite(socket, cb)
  1398. return retFuture
  1399. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  1400. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  1401. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  1402. ## Receives a datagram data from `socket` into `data`, which must
  1403. ## be at least of size `size` in bytes, address of datagram's sender
  1404. ## will be stored into `saddr` and `saddrLen`. Returned future will
  1405. ## complete once one datagram has been received, and will return size
  1406. ## of packet received.
  1407. var retFuture = newFuture[int]("recvFromInto")
  1408. proc cb(sock: AsyncFD): bool =
  1409. result = true
  1410. let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
  1411. saddr, saddrLen)
  1412. if res < 0:
  1413. let lastError = osLastError()
  1414. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1415. lastError.int32 != EAGAIN:
  1416. retFuture.fail(newOSError(lastError))
  1417. else:
  1418. result = false
  1419. else:
  1420. retFuture.complete(res)
  1421. addRead(socket, cb)
  1422. return retFuture
  1423. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
  1424. inheritable = defined(nimInheritHandles)):
  1425. owned(Future[tuple[address: string, client: AsyncFD]]) =
  1426. var retFuture = newFuture[tuple[address: string,
  1427. client: AsyncFD]]("acceptAddr")
  1428. proc cb(sock: AsyncFD): bool {.gcsafe.} =
  1429. result = true
  1430. var sockAddress: Sockaddr_storage
  1431. var addrLen = sizeof(sockAddress).SockLen
  1432. var client =
  1433. when declared(accept4):
  1434. accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
  1435. addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC)
  1436. else:
  1437. accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
  1438. addr(addrLen))
  1439. when declared(setInheritable) and not declared(accept4):
  1440. if client != osInvalidSocket and not setInheritable(client, inheritable):
  1441. # Set failure first because close() itself can fail,
  1442. # altering osLastError().
  1443. retFuture.fail(newOSError(osLastError()))
  1444. close client
  1445. return false
  1446. if client == osInvalidSocket:
  1447. let lastError = osLastError()
  1448. assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN
  1449. if lastError.int32 == EINTR:
  1450. return false
  1451. else:
  1452. if flags.isDisconnectionError(lastError):
  1453. return false
  1454. else:
  1455. retFuture.fail(newOSError(lastError))
  1456. else:
  1457. try:
  1458. let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
  1459. register(client.AsyncFD)
  1460. retFuture.complete((address, client.AsyncFD))
  1461. except:
  1462. # getAddrString may raise
  1463. client.close()
  1464. retFuture.fail(getCurrentException())
  1465. addRead(socket, cb)
  1466. return retFuture
  1467. when ioselSupportedPlatform:
  1468. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  1469. ## Start watching for timeout expiration, and then call the
  1470. ## callback `cb`.
  1471. ## `timeout` - time in milliseconds,
  1472. ## `oneshot` - if `true` only one event will be dispatched,
  1473. ## if `false` continuous events every `timeout` milliseconds.
  1474. let p = getGlobalDispatcher()
  1475. var data = newAsyncData()
  1476. data.readList.add(cb)
  1477. p.selector.registerTimer(timeout, oneshot, data)
  1478. proc addSignal*(signal: int, cb: Callback) =
  1479. ## Start watching signal `signal`, and when signal appears, call the
  1480. ## callback `cb`.
  1481. let p = getGlobalDispatcher()
  1482. var data = newAsyncData()
  1483. data.readList.add(cb)
  1484. p.selector.registerSignal(signal, data)
  1485. proc addProcess*(pid: int, cb: Callback) =
  1486. ## Start watching for process exit with pid `pid`, and then call
  1487. ## the callback `cb`.
  1488. let p = getGlobalDispatcher()
  1489. var data = newAsyncData()
  1490. data.readList.add(cb)
  1491. p.selector.registerProcess(pid, data)
  1492. proc newAsyncEvent*(): AsyncEvent =
  1493. ## Creates new `AsyncEvent`.
  1494. result = AsyncEvent(newSelectEvent())
  1495. proc trigger*(ev: AsyncEvent) =
  1496. ## Sets new `AsyncEvent` to signaled state.
  1497. trigger(SelectEvent(ev))
  1498. proc close*(ev: AsyncEvent) =
  1499. ## Closes `AsyncEvent`
  1500. close(SelectEvent(ev))
  1501. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  1502. ## Start watching for event `ev`, and call callback `cb`, when
  1503. ## ev will be set to signaled state.
  1504. let p = getGlobalDispatcher()
  1505. var data = newAsyncData()
  1506. data.readList.add(cb)
  1507. p.selector.registerEvent(SelectEvent(ev), data)
  1508. proc drain*(timeout = 500) =
  1509. ## Waits for completion of **all** events and processes them. Raises `ValueError`
  1510. ## if there are no pending operations. In contrast to `poll` this
  1511. ## processes as many events as are available until the timeout has elapsed.
  1512. var curTimeout = timeout
  1513. let start = now()
  1514. while hasPendingOperations():
  1515. discard runOnce(curTimeout)
  1516. curTimeout -= (now() - start).inMilliseconds.int
  1517. if curTimeout < 0:
  1518. break
  1519. proc poll*(timeout = 500) =
  1520. ## Waits for completion events and processes them. Raises `ValueError`
  1521. ## if there are no pending operations. This runs the underlying OS
  1522. ## `epoll`:idx: or `kqueue`:idx: primitive only once.
  1523. discard runOnce(timeout)
  1524. template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped,
  1525. inheritable = defined(nimInheritHandles)) =
  1526. let handle = createNativeSocket(domain, sockType, protocol, inheritable)
  1527. if handle == osInvalidSocket:
  1528. return osInvalidSocket.AsyncFD
  1529. handle.setBlocking(false)
  1530. when defined(macosx) and not defined(nimdoc):
  1531. handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
  1532. result = handle.AsyncFD
  1533. register(result)
  1534. proc createAsyncNativeSocket*(domain: cint, sockType: cint,
  1535. protocol: cint,
  1536. inheritable = defined(nimInheritHandles)): AsyncFD =
  1537. createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
  1538. proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
  1539. sockType: SockType = SOCK_STREAM,
  1540. protocol: Protocol = IPPROTO_TCP,
  1541. inheritable = defined(nimInheritHandles)): AsyncFD =
  1542. createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
  1543. when defined(windows) or defined(nimdoc):
  1544. proc bindToDomain(handle: SocketHandle, domain: Domain) =
  1545. # Extracted into a separate proc, because connect() on Windows requires
  1546. # the socket to be initially bound.
  1547. template doBind(saddr) =
  1548. if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
  1549. sizeof(saddr).SockLen) < 0'i32:
  1550. raiseOSError(osLastError())
  1551. if domain == Domain.AF_INET6:
  1552. var saddr: Sockaddr_in6
  1553. saddr.sin6_family = uint16(toInt(domain))
  1554. doBind(saddr)
  1555. else:
  1556. var saddr: Sockaddr_in
  1557. saddr.sin_family = uint16(toInt(domain))
  1558. doBind(saddr)
  1559. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
  1560. let retFuture = newFuture[void]("doConnect")
  1561. result = retFuture
  1562. var ol = newCustom()
  1563. ol.data = CompletionData(fd: socket, cb:
  1564. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  1565. if not retFuture.finished:
  1566. if errcode == OSErrorCode(-1):
  1567. const SO_UPDATE_CONNECT_CONTEXT = 0x7010
  1568. socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022
  1569. retFuture.complete()
  1570. else:
  1571. retFuture.fail(newOSError(errcode))
  1572. )
  1573. let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
  1574. cint(addrInfo.ai_addrlen), nil, 0, nil,
  1575. cast[POVERLAPPED](ol))
  1576. if ret:
  1577. # Request to connect completed immediately.
  1578. retFuture.complete()
  1579. # We don't deallocate `ol` here because even though this completed
  1580. # immediately poll will still be notified about its completion and it
  1581. # will free `ol`.
  1582. else:
  1583. let lastError = osLastError()
  1584. if lastError.int32 != ERROR_IO_PENDING:
  1585. # With ERROR_IO_PENDING `ol` will be deallocated in `poll`,
  1586. # and the future will be completed/failed there, too.
  1587. GC_unref(ol)
  1588. retFuture.fail(newOSError(lastError))
  1589. else:
  1590. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
  1591. let retFuture = newFuture[void]("doConnect")
  1592. result = retFuture
  1593. proc cb(fd: AsyncFD): bool =
  1594. let ret = SocketHandle(fd).getSockOptInt(
  1595. cint(SOL_SOCKET), cint(SO_ERROR))
  1596. if ret == 0:
  1597. # We have connected.
  1598. retFuture.complete()
  1599. return true
  1600. elif ret == EINTR:
  1601. # interrupted, keep waiting
  1602. return false
  1603. else:
  1604. retFuture.fail(newOSError(OSErrorCode(ret)))
  1605. return true
  1606. let ret = connect(socket.SocketHandle,
  1607. addrInfo.ai_addr,
  1608. addrInfo.ai_addrlen.SockLen)
  1609. if ret == 0:
  1610. # Request to connect completed immediately.
  1611. retFuture.complete()
  1612. else:
  1613. let lastError = osLastError()
  1614. if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
  1615. addWrite(socket, cb)
  1616. else:
  1617. retFuture.fail(newOSError(lastError))
  1618. template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
  1619. protocol: Protocol = IPPROTO_RAW) =
  1620. ## Iterates through the AddrInfo linked list asynchronously
  1621. ## until the connection can be established.
  1622. const shouldCreateFd = not declared(fd)
  1623. when shouldCreateFd:
  1624. let sockType = protocol.toSockType()
  1625. var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
  1626. for i in low(fdPerDomain)..high(fdPerDomain):
  1627. fdPerDomain[i] = osInvalidSocket.AsyncFD
  1628. template closeUnusedFds(domainToKeep = -1) {.dirty.} =
  1629. for i, fd in fdPerDomain:
  1630. if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
  1631. fd.closeSocket()
  1632. var lastException: ref Exception
  1633. var curAddrInfo = addrInfo
  1634. var domain: Domain
  1635. when shouldCreateFd:
  1636. var curFd: AsyncFD
  1637. else:
  1638. var curFd = fd
  1639. proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
  1640. if fut == nil or fut.failed:
  1641. if fut != nil:
  1642. lastException = fut.readError()
  1643. while curAddrInfo != nil:
  1644. let domainOpt = curAddrInfo.ai_family.toKnownDomain()
  1645. if domainOpt.isSome:
  1646. domain = domainOpt.unsafeGet()
  1647. break
  1648. curAddrInfo = curAddrInfo.ai_next
  1649. if curAddrInfo == nil:
  1650. freeAddrInfo(addrInfo)
  1651. when shouldCreateFd:
  1652. closeUnusedFds()
  1653. if lastException != nil:
  1654. retFuture.fail(lastException)
  1655. else:
  1656. retFuture.fail(newException(
  1657. IOError, "Couldn't resolve address: " & address))
  1658. return
  1659. when shouldCreateFd:
  1660. curFd = fdPerDomain[ord(domain)]
  1661. if curFd == osInvalidSocket.AsyncFD:
  1662. try:
  1663. curFd = createAsyncNativeSocket(domain, sockType, protocol)
  1664. except:
  1665. freeAddrInfo(addrInfo)
  1666. closeUnusedFds()
  1667. raise getCurrentException()
  1668. when defined(windows):
  1669. curFd.SocketHandle.bindToDomain(domain)
  1670. fdPerDomain[ord(domain)] = curFd
  1671. doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
  1672. curAddrInfo = curAddrInfo.ai_next
  1673. else:
  1674. freeAddrInfo(addrInfo)
  1675. when shouldCreateFd:
  1676. closeUnusedFds(ord(domain))
  1677. retFuture.complete(curFd)
  1678. else:
  1679. retFuture.complete()
  1680. tryNextAddrInfo(nil)
  1681. proc dial*(address: string, port: Port,
  1682. protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
  1683. ## Establishes connection to the specified `address`:`port` pair via the
  1684. ## specified protocol. The procedure iterates through possible
  1685. ## resolutions of the `address` until it succeeds, meaning that it
  1686. ## seamlessly works with both IPv4 and IPv6.
  1687. ## Returns the async file descriptor, registered in the dispatcher of
  1688. ## the current thread, ready to send or receive data.
  1689. let retFuture = newFuture[AsyncFD]("dial")
  1690. result = retFuture
  1691. let sockType = protocol.toSockType()
  1692. let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
  1693. asyncAddrInfoLoop(aiList, noFD, protocol)
  1694. proc connect*(socket: AsyncFD, address: string, port: Port,
  1695. domain = Domain.AF_INET): owned(Future[void]) =
  1696. let retFuture = newFuture[void]("connect")
  1697. result = retFuture
  1698. when defined(windows):
  1699. verifyPresence(socket)
  1700. else:
  1701. assert getSockDomain(socket.SocketHandle) == domain
  1702. let aiList = getAddrInfo(address, port, domain)
  1703. when defined(windows):
  1704. socket.SocketHandle.bindToDomain(domain)
  1705. asyncAddrInfoLoop(aiList, socket)
  1706. proc sleepAsync*(ms: int | float): owned(Future[void]) =
  1707. ## Suspends the execution of the current async procedure for the next
  1708. ## `ms` milliseconds.
  1709. var retFuture = newFuture[void]("sleepAsync")
  1710. let p = getGlobalDispatcher()
  1711. when ms is int:
  1712. p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
  1713. elif ms is float:
  1714. let ns = (ms * 1_000_000).int64
  1715. p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
  1716. return retFuture
  1717. proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
  1718. ## Returns a future which will complete once `fut` completes or after
  1719. ## `timeout` milliseconds has elapsed.
  1720. ##
  1721. ## If `fut` completes first the returned future will hold true,
  1722. ## otherwise, if `timeout` milliseconds has elapsed first, the returned
  1723. ## future will hold false.
  1724. var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
  1725. var timeoutFuture = sleepAsync(timeout)
  1726. fut.callback =
  1727. proc () =
  1728. if not retFuture.finished:
  1729. if fut.failed:
  1730. retFuture.fail(fut.error)
  1731. else:
  1732. retFuture.complete(true)
  1733. timeoutFuture.callback =
  1734. proc () =
  1735. if not retFuture.finished: retFuture.complete(false)
  1736. return retFuture
  1737. proc accept*(socket: AsyncFD,
  1738. flags = {SocketFlag.SafeDisconn},
  1739. inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) =
  1740. ## Accepts a new connection. Returns a future containing the client socket
  1741. ## corresponding to that connection.
  1742. ##
  1743. ## If `inheritable` is false (the default), the resulting client socket
  1744. ## will not be inheritable by child processes.
  1745. ##
  1746. ## The future will complete when the connection is successfully accepted.
  1747. var retFut = newFuture[AsyncFD]("accept")
  1748. var fut = acceptAddr(socket, flags, inheritable)
  1749. fut.callback =
  1750. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  1751. assert future.finished
  1752. if future.failed:
  1753. retFut.fail(future.error)
  1754. else:
  1755. retFut.complete(future.read.client)
  1756. return retFut
  1757. proc keepAlive(x: string) =
  1758. discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive"
  1759. proc send*(socket: AsyncFD, data: string,
  1760. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1761. ## Sends `data` to `socket`. The returned future will complete once all
  1762. ## data has been sent.
  1763. var retFuture = newFuture[void]("send")
  1764. if data.len > 0:
  1765. let sendFut = socket.send(unsafeAddr data[0], data.len, flags)
  1766. sendFut.callback =
  1767. proc () =
  1768. keepAlive(data)
  1769. if sendFut.failed:
  1770. retFuture.fail(sendFut.error)
  1771. else:
  1772. retFuture.complete()
  1773. else:
  1774. retFuture.complete()
  1775. return retFuture
  1776. # -- Await Macro
  1777. import std/asyncmacro
  1778. export asyncmacro
  1779. proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
  1780. ## Returns a future that will complete when all the string data from the
  1781. ## specified future stream is retrieved.
  1782. result = ""
  1783. while true:
  1784. let (hasValue, value) = await future.read()
  1785. if hasValue:
  1786. result.add(value)
  1787. else:
  1788. break
  1789. proc callSoon(cbproc: proc () {.gcsafe.}) =
  1790. getGlobalDispatcher().callbacks.addLast(cbproc)
  1791. proc runForever*() =
  1792. ## Begins a never ending global dispatcher poll loop.
  1793. while true:
  1794. poll()
  1795. proc waitFor*[T](fut: Future[T]): T =
  1796. ## **Blocks** the current thread until the specified future completes.
  1797. while not fut.finished:
  1798. poll()
  1799. fut.read
  1800. proc activeDescriptors*(): int {.inline.} =
  1801. ## Returns the current number of active file descriptors for the current
  1802. ## event loop. This is a cheap operation that does not involve a system call.
  1803. when defined(windows):
  1804. result = getGlobalDispatcher().handles.len
  1805. elif not defined(nimdoc):
  1806. result = getGlobalDispatcher().selector.count
  1807. when defined(posix):
  1808. import std/posix
  1809. when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or
  1810. defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx) or defined(haiku):
  1811. proc maxDescriptors*(): int {.raises: OSError.} =
  1812. ## Returns the maximum number of active file descriptors for the current
  1813. ## process. This involves a system call. For now `maxDescriptors` is
  1814. ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris.
  1815. when defined(windows):
  1816. result = 16_700_000
  1817. elif defined(zephyr) or defined(freertos):
  1818. result = FD_MAX
  1819. else:
  1820. var fdLim: RLimit
  1821. if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
  1822. raiseOSError(osLastError())
  1823. result = int(fdLim.rlim_cur) - 1
  1824. when defined(genode):
  1825. proc scheduleCallbacks*(): bool {.discardable.} =
  1826. ## *Genode only.*
  1827. ## Schedule callback processing and return immediately.
  1828. ## Returns `false` if there is nothing to schedule.
  1829. ## RPC servers should call this to dispatch `callSoon`
  1830. ## bodies after retiring an RPC to its client.
  1831. ## This is effectively a non-blocking `poll(…)` and is
  1832. ## equivalent to scheduling a momentary no-op timeout
  1833. ## but faster and with less overhead.
  1834. let dis = getGlobalDispatcher()
  1835. result = dis.callbacks.len > 0
  1836. if result: submit(dis.signalHandler.cap)