12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066 |
- #
- #
- # Nim's Runtime Library
- # (c) Copyright 2015 Dominik Picheta
- #
- # See the file "copying.txt", included in this
- # distribution, for details about the copyright.
- #
- ## This module implements asynchronous IO. This includes a dispatcher,
- ## a `Future` type implementation, and an `async` macro which allows
- ## asynchronous code to be written in a synchronous style with the `await`
- ## keyword.
- ##
- ## The dispatcher acts as a kind of event loop. You must call `poll` on it
- ## (or a function which does so for you such as `waitFor` or `runForever`)
- ## in order to poll for any outstanding events. The underlying implementation
- ## is based on epoll on Linux, IO Completion Ports on Windows and select on
- ## other operating systems.
- ##
- ## The `poll` function will not, on its own, return any events. Instead
- ## an appropriate `Future` object will be completed. A `Future` is a
- ## type which holds a value which is not yet available, but which *may* be
- ## available in the future. You can check whether a future is finished
- ## by using the `finished` function. When a future is finished it means that
- ## either the value that it holds is now available or it holds an error instead.
- ## The latter situation occurs when the operation to complete a future fails
- ## with an exception. You can distinguish between the two situations with the
- ## `failed` function.
- ##
- ## Future objects can also store a callback procedure which will be called
- ## automatically once the future completes.
- ##
- ## Futures therefore can be thought of as an implementation of the proactor
- ## pattern. In this
- ## pattern you make a request for an action, and once that action is fulfilled
- ## a future is completed with the result of that action. Requests can be
- ## made by calling the appropriate functions. For example: calling the `recv`
- ## function will create a request for some data to be read from a socket. The
- ## future which the `recv` function returns will then complete once the
- ## requested amount of data is read **or** an exception occurs.
- ##
- ## Code to read some data from a socket may look something like this:
- ## ```Nim
- ## var future = socket.recv(100)
- ## future.addCallback(
- ## proc () =
- ## echo(future.read)
- ## )
- ## ```
- ##
- ## All asynchronous functions returning a `Future` will not block. They
- ## will not however return immediately. An asynchronous function will have
- ## code which will be executed before an asynchronous request is made, in most
- ## cases this code sets up the request.
- ##
- ## In the above example, the `recv` function will return a brand new
- ## `Future` instance once the request for data to be read from the socket
- ## is made. This `Future` instance will complete once the requested amount
- ## of data is read, in this case it is 100 bytes. The second line sets a
- ## callback on this future which will be called once the future completes.
- ## All the callback does is write the data stored in the future to `stdout`.
- ## The `read` function is used for this and it checks whether the future
- ## completes with an error for you (if it did, it will simply raise the
- ## error), if there is no error, however, it returns the value of the future.
- ##
- ## Asynchronous procedures
- ## =======================
- ##
- ## Asynchronous procedures remove the pain of working with callbacks. They do
- ## this by allowing you to write asynchronous code the same way as you would
- ## write synchronous code.
- ##
- ## An asynchronous procedure is marked using the `{.async.}` pragma.
- ## When marking a procedure with the `{.async.}` pragma it must have a
- ## `Future[T]` return type or no return type at all. If you do not specify
- ## a return type then `Future[void]` is assumed.
- ##
- ## Inside asynchronous procedures `await` can be used to call any
- ## procedures which return a
- ## `Future`; this includes asynchronous procedures. When a procedure is
- ## "awaited", the asynchronous procedure it is awaited in will
- ## suspend its execution
- ## until the awaited procedure's Future completes. At which point the
- ## asynchronous procedure will resume its execution. During the period
- ## when an asynchronous procedure is suspended other asynchronous procedures
- ## will be run by the dispatcher.
- ##
- ## The `await` call may be used in many contexts. It can be used on the right
- ## hand side of a variable declaration: `var data = await socket.recv(100)`,
- ## in which case the variable will be set to the value of the future
- ## automatically. It can be used to await a `Future` object, and it can
- ## be used to await a procedure returning a `Future[void]`:
- ## `await socket.send("foobar")`.
- ##
- ## If an awaited future completes with an error, then `await` will re-raise
- ## this error. To avoid this, you can use the `yield` keyword instead of
- ## `await`. The following section shows different ways that you can handle
- ## exceptions in async procs.
- ##
- ## .. caution::
- ## Procedures marked {.async.} do not support mutable parameters such
- ## as `var int`. References such as `ref int` should be used instead.
- ##
- ## Handling Exceptions
- ## -------------------
- ##
- ## You can handle exceptions in the same way as in ordinary Nim code;
- ## by using the try statement:
- ##
- ## ```Nim
- ## try:
- ## let data = await sock.recv(100)
- ## echo("Received ", data)
- ## except:
- ## # Handle exception
- ## ```
- ##
- ## An alternative approach to handling exceptions is to use `yield` on a future
- ## then check the future's `failed` property. For example:
- ##
- ## ```Nim
- ## var future = sock.recv(100)
- ## yield future
- ## if future.failed:
- ## # Handle exception
- ## ```
- ##
- ##
- ## Discarding futures
- ## ==================
- ##
- ## Futures should **never** be discarded directly because they may contain
- ## errors. If you do not care for the result of a Future then you should use
- ## the `asyncCheck` procedure instead of the `discard` keyword. Note that this
- ## does not wait for completion, and you should use `waitFor` or `await` for that purpose.
- ##
- ## .. note:: `await` also checks if the future fails, so you can safely discard
- ## its result.
- ##
- ## Handling futures
- ## ================
- ##
- ## There are many different operations that apply to a future.
- ## The three primary high-level operations are `asyncCheck`,
- ## `waitFor`, and `await`.
- ##
- ## * `asyncCheck`: Raises an exception if the future fails. It neither waits
- ## for the future to finish nor returns the result of the future.
- ## * `waitFor`: Polls the event loop and blocks the current thread until the
- ## future finishes. This is often used to call an async procedure from a
- ## synchronous context and should never be used in an `async` proc.
- ## * `await`: Pauses execution in the current async procedure until the future
- ## finishes. While the current procedure is paused, other async procedures will
- ## continue running. Should be used instead of `waitFor` in an async
- ## procedure.
- ##
- ## Here is a handy quick reference chart showing their high-level differences:
- ## ============== ===================== =======================
- ## Procedure Context Blocking
- ## ============== ===================== =======================
- ## `asyncCheck` non-async and async non-blocking
- ## `waitFor` non-async blocks current thread
- ## `await` async suspends current proc
- ## ============== ===================== =======================
- ##
- ## Examples
- ## ========
- ##
- ## For examples take a look at the documentation for the modules implementing
- ## asynchronous IO. A good place to start is the
- ## `asyncnet module <asyncnet.html>`_.
- ##
- ## Investigating pending futures
- ## =============================
- ##
- ## It's possible to get into a situation where an async proc, or more accurately
- ## a `Future[T]` gets stuck and
- ## never completes. This can happen for various reasons and can cause serious
- ## memory leaks. When this occurs it's hard to identify the procedure that is
- ## stuck.
- ##
- ## Thankfully there is a mechanism which tracks the count of each pending future.
- ## All you need to do to enable it is compile with `-d:futureLogging` and
- ## use the `getFuturesInProgress` procedure to get the list of pending futures
- ## together with the stack traces to the moment of their creation.
- ##
- ## You may also find it useful to use this
- ## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
- ## the pending futures into prometheus, allowing you to analyse them via a nice
- ## graph.
- ##
- ##
- ##
- ## Limitations/Bugs
- ## ================
- ##
- ## * The effect system (`raises: []`) does not work with async procedures.
- ## * Mutable parameters are not supported by async procedures.
- ##
- ##
- ## Multiple async backend support
- ## ==============================
- ##
- ## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be
- ## implemented in libraries with only minimal support from the language - as
- ## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and
- ## ``chronos``, and more may come to be developed in the future.
- ##
- ## Libraries built on top of async/await may wish to support multiple async
- ## backends - the best way to do so is to create separate modules for each backend
- ## that may be imported side-by-side.
- ##
- ## An alternative way is to select backend using a global compile flag - this
- ## method makes it difficult to compose applications that use both backends as may
- ## happen with transitive dependencies, but may be appropriate in some cases -
- ## libraries choosing this path should call the flag `asyncBackend`, allowing
- ## applications to choose the backend with `-d:asyncBackend=<backend_name>`.
- ##
- ## Known `async` backends include:
- ##
- ## * `-d:asyncBackend=none`: disable `async` support completely
- ## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html
- ## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/
- ##
- ## ``none`` can be used when a library supports both a synchronous and
- ## asynchronous API, to disable the latter.
- import os, tables, strutils, times, heapqueue, options, asyncstreams
- import options, math, std/monotimes
- import asyncfutures except callSoon
- import nativesockets, net, deques
- when defined(nimPreviewSlimSystem):
- import std/[assertions, syncio]
- export Port, SocketFlag
- export asyncfutures except callSoon
- export asyncstreams
- # TODO: Check if yielded future is nil and throw a more meaningful exception
- type
- PDispatcherBase = ref object of RootRef
- timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
- callbacks*: Deque[proc () {.gcsafe.}]
- proc processTimers(
- p: PDispatcherBase, didSomeWork: var bool
- ): Option[int] {.inline.} =
- # Pop the timers in the order in which they will expire (smaller `finishAt`).
- var count = p.timers.len
- let t = getMonoTime()
- while count > 0 and t >= p.timers[0].finishAt:
- p.timers.pop().fut.complete()
- dec count
- didSomeWork = true
- # Return the number of milliseconds in which the next timer will expire.
- if p.timers.len == 0: return
- let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
- return some(millisecs.int + 1)
- proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
- while p.callbacks.len > 0:
- var cb = p.callbacks.popFirst()
- cb()
- didSomeWork = true
- proc adjustTimeout(
- p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
- ): int {.inline.} =
- if p.callbacks.len != 0:
- return 0
- if nextTimer.isNone() or pollTimeout == -1:
- return pollTimeout
- result = max(nextTimer.get(), 0)
- result = min(pollTimeout, result)
- proc runOnce(timeout: int): bool {.gcsafe.}
- proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
- ## Schedule `cbproc` to be called as soon as possible.
- ## The callback is called when control returns to the event loop.
- proc initCallSoonProc =
- if asyncfutures.getCallSoonProc().isNil:
- asyncfutures.setCallSoonProc(callSoon)
- template implementSetInheritable() {.dirty.} =
- when declared(setInheritable):
- proc setInheritable*(fd: AsyncFD, inheritable: bool): bool =
- ## Control whether a file handle can be inherited by child processes.
- ## Returns `true` on success.
- ##
- ## This procedure is not guaranteed to be available for all platforms.
- ## Test for availability with `declared() <system.html#declared,untyped>`_.
- fd.FileHandle.setInheritable(inheritable)
- when defined(windows) or defined(nimdoc):
- import winlean, sets, hashes
- type
- CompletionKey = ULONG_PTR
- CompletionData* = object
- fd*: AsyncFD # TODO: Rename this.
- cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
- errcode: OSErrorCode) {.closure, gcsafe.})
- cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
- # when using RegisterWaitForSingleObject, because
- # waiting is done in different thread.
- PDispatcher* = ref object of PDispatcherBase
- ioPort: Handle
- handles*: HashSet[AsyncFD] # Export handles so that an external library can register them.
- CustomObj = object of OVERLAPPED
- data*: CompletionData
- CustomRef* = ref CustomObj
- AsyncFD* = distinct int
- PostCallbackData = object
- ioPort: Handle
- handleFd: AsyncFD
- waitFd: Handle
- ovl: owned CustomRef
- PostCallbackDataPtr = ptr PostCallbackData
- AsyncEventImpl = object
- hEvent: Handle
- hWaiter: Handle
- pcd: PostCallbackDataPtr
- AsyncEvent* = ptr AsyncEventImpl
- Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
- proc hash(x: AsyncFD): Hash {.borrow.}
- proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
- proc newDispatcher*(): owned PDispatcher =
- ## Creates a new Dispatcher instance.
- new result
- result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
- result.handles = initHashSet[AsyncFD]()
- result.timers.clear()
- result.callbacks = initDeque[proc () {.closure, gcsafe.}](64)
- var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
- proc setGlobalDispatcher*(disp: sink PDispatcher) =
- if not gDisp.isNil:
- assert gDisp.callbacks.len == 0
- gDisp = disp
- initCallSoonProc()
- proc getGlobalDispatcher*(): PDispatcher =
- if gDisp.isNil:
- setGlobalDispatcher(newDispatcher())
- result = gDisp
- proc getIoHandler*(disp: PDispatcher): Handle =
- ## Returns the underlying IO Completion Port handle (Windows) or selector
- ## (Unix) for the specified dispatcher.
- return disp.ioPort
- proc register*(fd: AsyncFD) =
- ## Registers `fd` with the dispatcher.
- let p = getGlobalDispatcher()
- if createIoCompletionPort(fd.Handle, p.ioPort,
- cast[CompletionKey](fd), 1) == 0:
- raiseOSError(osLastError())
- p.handles.incl(fd)
- proc verifyPresence(fd: AsyncFD) =
- ## Ensures that file descriptor has been registered with the dispatcher.
- ## Raises ValueError if `fd` has not been registered.
- let p = getGlobalDispatcher()
- if fd notin p.handles:
- raise newException(ValueError,
- "Operation performed on a socket which has not been registered with" &
- " the dispatcher yet.")
- proc hasPendingOperations*(): bool =
- ## Returns `true` if the global dispatcher has pending operations.
- let p = getGlobalDispatcher()
- p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
- proc runOnce(timeout: int): bool =
- let p = getGlobalDispatcher()
- if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
- raise newException(ValueError,
- "No handles or timers registered in dispatcher.")
- result = false
- let nextTimer = processTimers(p, result)
- let at = adjustTimeout(p, timeout, nextTimer)
- var llTimeout =
- if at == -1: winlean.INFINITE
- else: at.int32
- var lpNumberOfBytesTransferred: DWORD
- var lpCompletionKey: ULONG_PTR
- var customOverlapped: CustomRef
- let res = getQueuedCompletionStatus(p.ioPort,
- addr lpNumberOfBytesTransferred, addr lpCompletionKey,
- cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
- result = true
- # For 'gcDestructors' the destructor of 'customOverlapped' will
- # be called at the end and we are the only owner here. This means
- # We do not have to 'GC_unref(customOverlapped)' because the destructor
- # does that for us.
- # http://stackoverflow.com/a/12277264/492186
- # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
- if res:
- # This is useful for ensuring the reliability of the overlapped struct.
- assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
- customOverlapped.data.cb(customOverlapped.data.fd,
- lpNumberOfBytesTransferred, OSErrorCode(-1))
- # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
- # so we need to dispose our `cb` environment, because it is not needed
- # anymore.
- if customOverlapped.data.cell.data != nil:
- system.dispose(customOverlapped.data.cell)
- when not defined(gcDestructors):
- GC_unref(customOverlapped)
- else:
- let errCode = osLastError()
- if customOverlapped != nil:
- assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
- customOverlapped.data.cb(customOverlapped.data.fd,
- lpNumberOfBytesTransferred, errCode)
- if customOverlapped.data.cell.data != nil:
- system.dispose(customOverlapped.data.cell)
- when not defined(gcDestructors):
- GC_unref(customOverlapped)
- else:
- if errCode.int32 == WAIT_TIMEOUT:
- # Timed out
- result = false
- else: raiseOSError(errCode)
- # Timer processing.
- discard processTimers(p, result)
- # Callback queue processing
- processPendingCallbacks(p, result)
- var acceptEx: WSAPROC_ACCEPTEX
- var connectEx: WSAPROC_CONNECTEX
- var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
- proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
- # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
- var bytesRet: DWORD
- fun = nil
- result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
- sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
- addr bytesRet, nil, nil) == 0
- proc initAll() =
- let dummySock = createNativeSocket()
- if dummySock == INVALID_SOCKET:
- raiseOSError(osLastError())
- var fun: pointer = nil
- if not initPointer(dummySock, fun, WSAID_CONNECTEX):
- raiseOSError(osLastError())
- connectEx = cast[WSAPROC_CONNECTEX](fun)
- if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
- raiseOSError(osLastError())
- acceptEx = cast[WSAPROC_ACCEPTEX](fun)
- if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
- raiseOSError(osLastError())
- getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
- close(dummySock)
- proc newCustom*(): CustomRef =
- result = CustomRef() # 0
- GC_ref(result) # 1 prevent destructor from doing a premature free.
- # destructor of newCustom's caller --> 0. This means
- # Windows holds a ref for us with RC == 0 (single owner).
- # This is passed back to us in the IO completion port.
- proc recv*(socket: AsyncFD, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
- ## Reads **up to** `size` bytes from `socket`. Returned future will
- ## complete once all the data requested is read, a part of the data has been
- ## read, or the socket has disconnected in which case the future will
- ## complete with a value of `""`.
- ##
- ## .. warning:: The `Peek` socket flag is not supported on Windows.
- # Things to note:
- # * When WSARecv completes immediately then `bytesReceived` is very
- # unreliable.
- # * Still need to implement message-oriented socket disconnection,
- # '\0' in the message currently signifies a socket disconnect. Who
- # knows what will happen when someone sends that to our socket.
- verifyPresence(socket)
- assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
- var retFuture = newFuture[string]("recv")
- var dataBuf: TWSABuf
- dataBuf.buf = cast[cstring](alloc0(size))
- dataBuf.len = size.ULONG
- var bytesReceived: DWORD
- var flagsio = flags.toOSFlags().DWORD
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- if bytesCount == 0 and dataBuf.buf[0] == '\0':
- retFuture.complete("")
- else:
- var data = newString(bytesCount)
- assert bytesCount <= size
- copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
- retFuture.complete($data)
- else:
- if flags.isDisconnectionError(errcode):
- retFuture.complete("")
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- if dataBuf.buf != nil:
- dealloc dataBuf.buf
- dataBuf.buf = nil
- )
- let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
- addr flagsio, cast[POVERLAPPED](ol), nil)
- if ret == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- if dataBuf.buf != nil:
- dealloc dataBuf.buf
- dataBuf.buf = nil
- GC_unref(ol)
- if flags.isDisconnectionError(err):
- retFuture.complete("")
- else:
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- elif ret == 0:
- # Request completed immediately.
- if bytesReceived != 0:
- var data = newString(bytesReceived)
- assert bytesReceived <= size
- copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
- retFuture.complete($data)
- else:
- if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
- retFuture.complete("")
- return retFuture
- proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
- ## Reads **up to** `size` bytes from `socket` into `buf`, which must
- ## at least be of that size. Returned future will complete once all the
- ## data requested is read, a part of the data has been read, or the socket
- ## has disconnected in which case the future will complete with a value of
- ## `0`.
- ##
- ## .. warning:: The `Peek` socket flag is not supported on Windows.
- # Things to note:
- # * When WSARecv completes immediately then `bytesReceived` is very
- # unreliable.
- # * Still need to implement message-oriented socket disconnection,
- # '\0' in the message currently signifies a socket disconnect. Who
- # knows what will happen when someone sends that to our socket.
- verifyPresence(socket)
- assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
- var retFuture = newFuture[int]("recvInto")
- #buf[] = '\0'
- var dataBuf: TWSABuf
- dataBuf.buf = cast[cstring](buf)
- dataBuf.len = size.ULONG
- var bytesReceived: DWORD
- var flagsio = flags.toOSFlags().DWORD
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- retFuture.complete(bytesCount)
- else:
- if flags.isDisconnectionError(errcode):
- retFuture.complete(0)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- if dataBuf.buf != nil:
- dataBuf.buf = nil
- )
- let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
- addr flagsio, cast[POVERLAPPED](ol), nil)
- if ret == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- if dataBuf.buf != nil:
- dataBuf.buf = nil
- GC_unref(ol)
- if flags.isDisconnectionError(err):
- retFuture.complete(0)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- elif ret == 0:
- # Request completed immediately.
- if bytesReceived != 0:
- assert bytesReceived <= size
- retFuture.complete(bytesReceived)
- else:
- if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
- retFuture.complete(bytesReceived)
- return retFuture
- proc send*(socket: AsyncFD, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- ## Sends `size` bytes from `buf` to `socket`. The returned future
- ## will complete once all data has been sent.
- ##
- ## .. warning:: Use it with caution. If `buf` refers to GC'ed object,
- ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
- verifyPresence(socket)
- var retFuture = newFuture[void]("send")
- var dataBuf: TWSABuf
- dataBuf.buf = cast[cstring](buf)
- dataBuf.len = size.ULONG
- var bytesReceived, lowFlags: DWORD
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- retFuture.complete()
- else:
- if flags.isDisconnectionError(errcode):
- retFuture.complete()
- else:
- retFuture.fail(newOSError(errcode))
- )
- let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
- lowFlags, cast[POVERLAPPED](ol), nil)
- if ret == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- GC_unref(ol)
- if flags.isDisconnectionError(err):
- retFuture.complete()
- else:
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- else:
- retFuture.complete()
- # We don't deallocate `ol` here because even though this completed
- # immediately poll will still be notified about its completion and it will
- # free `ol`.
- return retFuture
- proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
- saddrLen: SockLen,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- ## Sends `data` to specified destination `saddr`, using
- ## socket `socket`. The returned future will complete once all data
- ## has been sent.
- verifyPresence(socket)
- var retFuture = newFuture[void]("sendTo")
- var dataBuf: TWSABuf
- dataBuf.buf = cast[cstring](data)
- dataBuf.len = size.ULONG
- var bytesSent = 0.DWORD
- var lowFlags = 0.DWORD
- # we will preserve address in our stack
- var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
- var stalen: cint = cint(saddrLen)
- zeroMem(addr(staddr[0]), 128)
- copyMem(addr(staddr[0]), saddr, saddrLen)
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- retFuture.complete()
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- )
- let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
- lowFlags, cast[ptr SockAddr](addr(staddr[0])),
- stalen, cast[POVERLAPPED](ol), nil)
- if ret == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- GC_unref(ol)
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- else:
- retFuture.complete()
- # We don't deallocate `ol` here because even though this completed
- # immediately poll will still be notified about its completion and it will
- # free `ol`.
- return retFuture
- proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
- saddr: ptr SockAddr, saddrLen: ptr SockLen,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
- ## Receives a datagram data from `socket` into `buf`, which must
- ## be at least of size `size`, address of datagram's sender will be
- ## stored into `saddr` and `saddrLen`. Returned future will complete
- ## once one datagram has been received, and will return size of packet
- ## received.
- verifyPresence(socket)
- var retFuture = newFuture[int]("recvFromInto")
- var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
- var bytesReceived = 0.DWORD
- var lowFlags = 0.DWORD
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- assert bytesCount <= size
- retFuture.complete(bytesCount)
- else:
- # datagram sockets don't have disconnection,
- # so we can just raise an exception
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- )
- let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
- addr bytesReceived, addr lowFlags,
- saddr, cast[ptr cint](saddrLen),
- cast[POVERLAPPED](ol), nil)
- if res == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- GC_unref(ol)
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- else:
- # Request completed immediately.
- if bytesReceived != 0:
- assert bytesReceived <= size
- retFuture.complete(bytesReceived)
- else:
- if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
- retFuture.complete(bytesReceived)
- return retFuture
- proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
- inheritable = defined(nimInheritHandles)):
- owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} =
- ## Accepts a new connection. Returns a future containing the client socket
- ## corresponding to that connection and the remote address of the client.
- ## The future will complete when the connection is successfully accepted.
- ##
- ## The resulting client socket is automatically registered to the
- ## dispatcher.
- ##
- ## If `inheritable` is false (the default), the resulting client socket will
- ## not be inheritable by child processes.
- ##
- ## The `accept` call may result in an error if the connecting socket
- ## disconnects during the duration of the `accept`. If the `SafeDisconn`
- ## flag is specified then this error will not be raised and instead
- ## accept will be called again.
- verifyPresence(socket)
- var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
- var clientSock = createNativeSocket(inheritable = inheritable)
- if clientSock == osInvalidSocket: raiseOSError(osLastError())
- const lpOutputLen = 1024
- var lpOutputBuf = newString(lpOutputLen)
- var dwBytesReceived: DWORD
- let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
- let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
- let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
- template failAccept(errcode) =
- if flags.isDisconnectionError(errcode):
- var newAcceptFut = acceptAddr(socket, flags)
- newAcceptFut.callback =
- proc () =
- if newAcceptFut.failed:
- retFuture.fail(newAcceptFut.readError)
- else:
- retFuture.complete(newAcceptFut.read)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- template completeAccept() {.dirty.} =
- var listenSock = socket
- let setoptRet = setsockopt(clientSock, SOL_SOCKET,
- SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
- sizeof(listenSock).SockLen)
- if setoptRet != 0:
- let errcode = osLastError()
- discard clientSock.closesocket()
- failAccept(errcode)
- else:
- var localSockaddr, remoteSockaddr: ptr SockAddr
- var localLen, remoteLen: int32
- getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
- dwLocalAddressLength, dwRemoteAddressLength,
- addr localSockaddr, addr localLen,
- addr remoteSockaddr, addr remoteLen)
- try:
- let address = getAddrString(remoteSockaddr)
- register(clientSock.AsyncFD)
- retFuture.complete((address: address, client: clientSock.AsyncFD))
- except:
- # getAddrString may raise
- clientSock.close()
- retFuture.fail(getCurrentException())
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- completeAccept()
- else:
- failAccept(errcode)
- )
- # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
- let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
- dwReceiveDataLength,
- dwLocalAddressLength,
- dwRemoteAddressLength,
- addr dwBytesReceived, cast[POVERLAPPED](ol))
- if not ret:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- failAccept(err)
- GC_unref(ol)
- else:
- completeAccept()
- # We don't deallocate `ol` here because even though this completed
- # immediately poll will still be notified about its completion and it will
- # free `ol`.
- return retFuture
- implementSetInheritable()
- proc closeSocket*(socket: AsyncFD) =
- ## Closes a socket and ensures that it is unregistered.
- socket.SocketHandle.close()
- getGlobalDispatcher().handles.excl(socket)
- proc unregister*(fd: AsyncFD) =
- ## Unregisters `fd`.
- getGlobalDispatcher().handles.excl(fd)
- proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
- return fd in disp.handles
- {.push stackTrace: off.}
- proc waitableCallback(param: pointer,
- timerOrWaitFired: WINBOOL) {.stdcall.} =
- var p = cast[PostCallbackDataPtr](param)
- discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
- ULONG_PTR(p.handleFd),
- cast[pointer](p.ovl))
- {.pop.}
- proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
- let p = getGlobalDispatcher()
- var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
- var hEvent = wsaCreateEvent()
- if hEvent == 0:
- raiseOSError(osLastError())
- var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
- pcd.ioPort = p.ioPort
- pcd.handleFd = fd
- var ol = newCustom()
- ol.data = CompletionData(fd: fd, cb:
- proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
- # we excluding our `fd` because cb(fd) can register own handler
- # for this `fd`
- p.handles.excl(fd)
- # unregisterWait() is called before callback, because appropriate
- # winsockets function can re-enable event.
- # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
- if unregisterWait(pcd.waitFd) == 0:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- deallocShared(cast[pointer](pcd))
- discard wsaCloseEvent(hEvent)
- raiseOSError(err)
- if cb(fd):
- # callback returned `true`, so we free all allocated resources
- deallocShared(cast[pointer](pcd))
- if not wsaCloseEvent(hEvent):
- raiseOSError(osLastError())
- # pcd.ovl will be unrefed in poll().
- else:
- # callback returned `false` we need to continue
- if p.handles.contains(fd):
- # new callback was already registered with `fd`, so we free all
- # allocated resources. This happens because in callback `cb`
- # addRead/addWrite was called with same `fd`.
- deallocShared(cast[pointer](pcd))
- if not wsaCloseEvent(hEvent):
- raiseOSError(osLastError())
- else:
- # we need to include `fd` again
- p.handles.incl(fd)
- # and register WaitForSingleObject again
- if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
- cast[WAITORTIMERCALLBACK](waitableCallback),
- cast[pointer](pcd), INFINITE, flags):
- # pcd.ovl will be unrefed in poll()
- let err = osLastError()
- deallocShared(cast[pointer](pcd))
- discard wsaCloseEvent(hEvent)
- raiseOSError(err)
- else:
- # we incref `pcd.ovl` and `protect` callback one more time,
- # because it will be unrefed and disposed in `poll()` after
- # callback finishes.
- GC_ref(pcd.ovl)
- pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
- )
- # We need to protect our callback environment value, so GC will not free it
- # accidentally.
- ol.data.cell = system.protect(rawEnv(ol.data.cb))
- # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
- # will be signaled when appropriate `mask` events will be triggered.
- if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
- let err = osLastError()
- GC_unref(ol)
- deallocShared(cast[pointer](pcd))
- discard wsaCloseEvent(hEvent)
- raiseOSError(err)
- pcd.ovl = ol
- if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
- cast[WAITORTIMERCALLBACK](waitableCallback),
- cast[pointer](pcd), INFINITE, flags):
- let err = osLastError()
- GC_unref(ol)
- deallocShared(cast[pointer](pcd))
- discard wsaCloseEvent(hEvent)
- raiseOSError(err)
- p.handles.incl(fd)
- proc addRead*(fd: AsyncFD, cb: Callback) =
- ## Start watching the file descriptor for read availability and then call
- ## the callback `cb`.
- ##
- ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
- ## so if you can avoid it, please do it. Use `addRead` only if really
- ## need it (main usecase is adaptation of unix-like libraries to be
- ## asynchronous on Windows).
- ##
- ## If you use this function, you don't need to use asyncdispatch.recv()
- ## or asyncdispatch.accept(), because they are using IOCP, please use
- ## nativesockets.recv() and nativesockets.accept() instead.
- ##
- ## Be sure your callback `cb` returns `true`, if you want to remove
- ## watch of `read` notifications, and `false`, if you want to continue
- ## receiving notifications.
- registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
- proc addWrite*(fd: AsyncFD, cb: Callback) =
- ## Start watching the file descriptor for write availability and then call
- ## the callback `cb`.
- ##
- ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
- ## so if you can avoid it, please do it. Use `addWrite` only if really
- ## need it (main usecase is adaptation of unix-like libraries to be
- ## asynchronous on Windows).
- ##
- ## If you use this function, you don't need to use asyncdispatch.send()
- ## or asyncdispatch.connect(), because they are using IOCP, please use
- ## nativesockets.send() and nativesockets.connect() instead.
- ##
- ## Be sure your callback `cb` returns `true`, if you want to remove
- ## watch of `write` notifications, and `false`, if you want to continue
- ## receiving notifications.
- registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
- template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
- handleCallback) =
- let handleFD = AsyncFD(hEvent)
- pcd.ioPort = p.ioPort
- pcd.handleFd = handleFD
- var ol = newCustom()
- ol.data.fd = handleFD
- ol.data.cb = handleCallback
- # We need to protect our callback environment value, so GC will not free it
- # accidentally.
- ol.data.cell = system.protect(rawEnv(ol.data.cb))
- pcd.ovl = ol
- if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
- cast[WAITORTIMERCALLBACK](waitableCallback),
- cast[pointer](pcd), timeout.DWORD, flags):
- let err = osLastError()
- GC_unref(ol)
- deallocShared(cast[pointer](pcd))
- discard closeHandle(hEvent)
- raiseOSError(err)
- p.handles.incl(handleFD)
- template closeWaitable(handle: untyped) =
- let waitFd = pcd.waitFd
- deallocShared(cast[pointer](pcd))
- p.handles.excl(fd)
- if unregisterWait(waitFd) == 0:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- discard closeHandle(handle)
- raiseOSError(err)
- if closeHandle(handle) == 0:
- raiseOSError(osLastError())
- proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
- ## Registers callback `cb` to be called when timer expired.
- ##
- ## Parameters:
- ##
- ## * `timeout` - timeout value in milliseconds.
- ## * `oneshot`
- ## * `true` - generate only one timeout event
- ## * `false` - generate timeout events periodically
- doAssert(timeout > 0)
- let p = getGlobalDispatcher()
- var hEvent = createEvent(nil, 1, 0, nil)
- if hEvent == INVALID_HANDLE_VALUE:
- raiseOSError(osLastError())
- var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
- var flags = WT_EXECUTEINWAITTHREAD.DWORD
- if oneshot: flags = flags or WT_EXECUTEONLYONCE
- proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- let res = cb(fd)
- if res or oneshot:
- closeWaitable(hEvent)
- else:
- # if callback returned `false`, then it wants to be called again, so
- # we need to ref and protect `pcd.ovl` again, because it will be
- # unrefed and disposed in `poll()`.
- GC_ref(pcd.ovl)
- pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
- registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
- proc addProcess*(pid: int, cb: Callback) =
- ## Registers callback `cb` to be called when process with process ID
- ## `pid` exited.
- const NULL = Handle(0)
- let p = getGlobalDispatcher()
- let procFlags = SYNCHRONIZE
- var hProcess = openProcess(procFlags, 0, pid.DWORD)
- if hProcess == NULL:
- raiseOSError(osLastError())
- var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
- var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD
- proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- closeWaitable(hProcess)
- discard cb(fd)
- registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
- proc newAsyncEvent*(): AsyncEvent =
- ## Creates a new thread-safe `AsyncEvent` object.
- ##
- ## New `AsyncEvent` object is not automatically registered with
- ## dispatcher like `AsyncSocket`.
- var sa = SECURITY_ATTRIBUTES(
- nLength: sizeof(SECURITY_ATTRIBUTES).cint,
- bInheritHandle: 1
- )
- var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
- if event == INVALID_HANDLE_VALUE:
- raiseOSError(osLastError())
- result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
- result.hEvent = event
- proc trigger*(ev: AsyncEvent) =
- ## Set event `ev` to signaled state.
- if setEvent(ev.hEvent) == 0:
- raiseOSError(osLastError())
- proc unregister*(ev: AsyncEvent) =
- ## Unregisters event `ev`.
- doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
- let p = getGlobalDispatcher()
- p.handles.excl(AsyncFD(ev.hEvent))
- if unregisterWait(ev.hWaiter) == 0:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- raiseOSError(err)
- ev.hWaiter = 0
- proc close*(ev: AsyncEvent) =
- ## Closes event `ev`.
- let res = closeHandle(ev.hEvent)
- deallocShared(cast[pointer](ev))
- if res == 0:
- raiseOSError(osLastError())
- proc addEvent*(ev: AsyncEvent, cb: Callback) =
- ## Registers callback `cb` to be called when `ev` will be signaled
- doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
- let p = getGlobalDispatcher()
- let hEvent = ev.hEvent
- var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
- var flags = WT_EXECUTEINWAITTHREAD.DWORD
- proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if ev.hWaiter != 0:
- if cb(fd):
- # we need this check to avoid exception, if `unregister(event)` was
- # called in callback.
- deallocShared(cast[pointer](pcd))
- if ev.hWaiter != 0:
- unregister(ev)
- else:
- # if callback returned `false`, then it wants to be called again, so
- # we need to ref and protect `pcd.ovl` again, because it will be
- # unrefed and disposed in `poll()`.
- GC_ref(pcd.ovl)
- pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
- else:
- # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
- deallocShared(cast[pointer](pcd))
- registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
- ev.hWaiter = pcd.waitFd
- initAll()
- else:
- import selectors
- from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
- MSG_NOSIGNAL
- when declared(posix.accept4):
- from posix import accept4, SOCK_CLOEXEC
- when defined(genode):
- import genode/env # get the implicit Genode env
- import genode/signals
- const
- InitCallbackListSize = 4 # initial size of callbacks sequence,
- # associated with file/socket descriptor.
- InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
- # queue.
- type
- AsyncFD* = distinct cint
- Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
- AsyncData = object
- readList: seq[Callback]
- writeList: seq[Callback]
- AsyncEvent* = distinct SelectEvent
- PDispatcher* = ref object of PDispatcherBase
- selector: Selector[AsyncData]
- when defined(genode):
- signalHandler: SignalHandler
- proc `==`*(x, y: AsyncFD): bool {.borrow.}
- proc `==`*(x, y: AsyncEvent): bool {.borrow.}
- template newAsyncData(): AsyncData =
- AsyncData(
- readList: newSeqOfCap[Callback](InitCallbackListSize),
- writeList: newSeqOfCap[Callback](InitCallbackListSize)
- )
- proc newDispatcher*(): owned(PDispatcher) =
- new result
- result.selector = newSelector[AsyncData]()
- result.timers.clear()
- result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
- when defined(genode):
- let entrypoint = ep(cast[GenodeEnv](runtimeEnv))
- result.signalHandler = newSignalHandler(entrypoint):
- discard runOnce(0)
- var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
- when defined(nuttx):
- import std/exitprocs
- proc cleanDispatcher() {.noconv.} =
- gDisp = nil
- proc addFinalyzer() =
- addExitProc(cleanDispatcher)
- proc setGlobalDispatcher*(disp: owned PDispatcher) =
- if not gDisp.isNil:
- assert gDisp.callbacks.len == 0
- gDisp = disp
- initCallSoonProc()
- proc getGlobalDispatcher*(): PDispatcher =
- if gDisp.isNil:
- setGlobalDispatcher(newDispatcher())
- when defined(nuttx):
- addFinalyzer()
- result = gDisp
- proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
- return disp.selector
- proc register*(fd: AsyncFD) =
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- p.selector.registerHandle(fd.SocketHandle, {}, data)
- proc unregister*(fd: AsyncFD) =
- getGlobalDispatcher().selector.unregister(fd.SocketHandle)
- proc unregister*(ev: AsyncEvent) =
- getGlobalDispatcher().selector.unregister(SelectEvent(ev))
- proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
- return fd.SocketHandle in disp.selector
- proc addRead*(fd: AsyncFD, cb: Callback) =
- let p = getGlobalDispatcher()
- var newEvents = {Event.Read}
- withData(p.selector, fd.SocketHandle, adata) do:
- adata.readList.add(cb)
- newEvents.incl(Event.Read)
- if len(adata.writeList) != 0: newEvents.incl(Event.Write)
- do:
- raise newException(ValueError, "File descriptor not registered.")
- p.selector.updateHandle(fd.SocketHandle, newEvents)
- proc addWrite*(fd: AsyncFD, cb: Callback) =
- let p = getGlobalDispatcher()
- var newEvents = {Event.Write}
- withData(p.selector, fd.SocketHandle, adata) do:
- adata.writeList.add(cb)
- newEvents.incl(Event.Write)
- if len(adata.readList) != 0: newEvents.incl(Event.Read)
- do:
- raise newException(ValueError, "File descriptor not registered.")
- p.selector.updateHandle(fd.SocketHandle, newEvents)
- proc hasPendingOperations*(): bool =
- let p = getGlobalDispatcher()
- not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
- proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) =
- var old = move dest
- dest = src
- for i in 0..high(old):
- dest.add(move old[i])
- proc processBasicCallbacks(
- fd: AsyncFD, event: Event
- ): tuple[readCbListCount, writeCbListCount: int] =
- # Process pending descriptor and AsyncEvent callbacks.
- #
- # Invoke every callback stored in `rwlist`, until one
- # returns `false` (which means callback wants to stay
- # alive). In such case all remaining callbacks will be added
- # to `rwlist` again, in the order they have been inserted.
- #
- # `rwlist` associated with file descriptor MUST BE emptied before
- # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
- # or it can be possible to fall into endless cycle.
- var curList: seq[Callback]
- let selector = getGlobalDispatcher().selector
- withData(selector, fd.int, fdData):
- case event
- of Event.Read:
- #shallowCopy(curList, fdData.readList)
- curList = move fdData.readList
- fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
- of Event.Write:
- #shallowCopy(curList, fdData.writeList)
- curList = move fdData.writeList
- fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
- else:
- assert false, "Cannot process callbacks for " & $event
- let newLength = max(len(curList), InitCallbackListSize)
- var newList = newSeqOfCap[Callback](newLength)
- var eventsExtinguished = false
- for cb in curList:
- if eventsExtinguished:
- newList.add(cb)
- elif not cb(fd):
- # Callback wants to be called again.
- newList.add(cb)
- # This callback has returned with EAGAIN, so we don't need to
- # call any other callbacks as they are all waiting for the same event
- # on the same fd.
- # We do need to ensure they are called again though.
- eventsExtinguished = true
- withData(selector, fd.int, fdData) do:
- # Descriptor is still present in the queue.
- case event
- of Event.Read: prependSeq(fdData.readList, newList)
- of Event.Write: prependSeq(fdData.writeList, newList)
- else:
- assert false, "Cannot process callbacks for " & $event
- result.readCbListCount = len(fdData.readList)
- result.writeCbListCount = len(fdData.writeList)
- do:
- # Descriptor was unregistered in callback via `unregister()`.
- result.readCbListCount = -1
- result.writeCbListCount = -1
- proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) =
- # Process pending custom event callbacks. Custom events are
- # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
- # There can be only one callback registered with one descriptor,
- # so there is no need to iterate over list.
- var curList: seq[Callback]
- withData(p.selector, fd.int, adata) do:
- curList = move adata.readList
- adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
- let newLength = len(curList)
- var newList = newSeqOfCap[Callback](newLength)
- var cb = curList[0]
- if not cb(fd):
- newList.add(cb)
- withData(p.selector, fd.int, adata) do:
- # descriptor still present in queue.
- adata.readList = newList & adata.readList
- if len(adata.readList) == 0:
- # if no callbacks registered with descriptor, unregister it.
- p.selector.unregister(fd.int)
- do:
- # descriptor was unregistered in callback via `unregister()`.
- discard
- implementSetInheritable()
- proc closeSocket*(sock: AsyncFD) =
- let selector = getGlobalDispatcher().selector
- if sock.SocketHandle notin selector:
- raise newException(ValueError, "File descriptor not registered.")
- let data = selector.getData(sock.SocketHandle)
- sock.unregister()
- sock.SocketHandle.close()
- # We need to unblock the read and write callbacks which could still be
- # waiting for the socket to become readable and/or writeable.
- for cb in data.readList & data.writeList:
- if not cb(sock):
- raise newException(
- ValueError, "Expecting async operations to stop when fd has closed."
- )
- proc runOnce(timeout: int): bool =
- let p = getGlobalDispatcher()
- if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
- when defined(genode):
- if timeout == 0: return
- raise newException(ValueError,
- "No handles or timers registered in dispatcher.")
- result = false
- var keys: array[64, ReadyKey]
- let nextTimer = processTimers(p, result)
- var count =
- p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
- for i in 0..<count:
- let fd = keys[i].fd.AsyncFD
- let events = keys[i].events
- var (readCbListCount, writeCbListCount) = (0, 0)
- if Event.Read in events or events == {Event.Error}:
- (readCbListCount, writeCbListCount) =
- processBasicCallbacks(fd, Event.Read)
- result = true
- if Event.Write in events or events == {Event.Error}:
- (readCbListCount, writeCbListCount) =
- processBasicCallbacks(fd, Event.Write)
- result = true
- var isCustomEvent = false
- if Event.User in events:
- (readCbListCount, writeCbListCount) =
- processBasicCallbacks(fd, Event.Read)
- isCustomEvent = true
- if readCbListCount == 0:
- p.selector.unregister(fd.int)
- result = true
- when ioselSupportedPlatform:
- const customSet = {Event.Timer, Event.Signal, Event.Process,
- Event.Vnode}
- if (customSet * events) != {}:
- isCustomEvent = true
- processCustomCallbacks(p, fd)
- result = true
- # because state `data` can be modified in callback we need to update
- # descriptor events with currently registered callbacks.
- if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
- var newEvents: set[Event] = {}
- if readCbListCount > 0: incl(newEvents, Event.Read)
- if writeCbListCount > 0: incl(newEvents, Event.Write)
- p.selector.updateHandle(SocketHandle(fd), newEvents)
- # Timer processing.
- discard processTimers(p, result)
- # Callback queue processing
- processPendingCallbacks(p, result)
- proc recv*(socket: AsyncFD, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
- var retFuture = newFuture[string]("recv")
- var readBuffer = newString(size)
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
- flags.toOSFlags())
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- if flags.isDisconnectionError(lastError):
- retFuture.complete("")
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- result = false # We still want this callback to be called.
- elif res == 0:
- # Disconnected
- retFuture.complete("")
- else:
- readBuffer.setLen(res)
- retFuture.complete(readBuffer)
- # TODO: The following causes a massive slowdown.
- #if not cb(socket):
- addRead(socket, cb)
- return retFuture
- proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
- var retFuture = newFuture[int]("recvInto")
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = recv(sock.SocketHandle, buf, size.cint,
- flags.toOSFlags())
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- if flags.isDisconnectionError(lastError):
- retFuture.complete(0)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- result = false # We still want this callback to be called.
- else:
- retFuture.complete(res)
- # TODO: The following causes a massive slowdown.
- #if not cb(socket):
- addRead(socket, cb)
- return retFuture
- proc send*(socket: AsyncFD, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- var retFuture = newFuture[void]("send")
- var written = 0
- proc cb(sock: AsyncFD): bool =
- result = true
- let netSize = size-written
- var d = cast[cstring](buf)
- let res = send(sock.SocketHandle, addr d[written], netSize.cint,
- MSG_NOSIGNAL)
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and
- lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- if flags.isDisconnectionError(lastError):
- retFuture.complete()
- else:
- retFuture.fail(newOSError(lastError))
- else:
- result = false # We still want this callback to be called.
- else:
- written.inc(res)
- if res != netSize:
- result = false # We still have data to send.
- else:
- retFuture.complete()
- # TODO: The following causes crashes.
- #if not cb(socket):
- addWrite(socket, cb)
- return retFuture
- proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
- saddrLen: SockLen,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- ## Sends `data` of size `size` in bytes to specified destination
- ## (`saddr` of size `saddrLen` in bytes, using socket `socket`.
- ## The returned future will complete once all data has been sent.
- var retFuture = newFuture[void]("sendTo")
- # we will preserve address in our stack
- var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
- var stalen = saddrLen
- zeroMem(addr(staddr[0]), 128)
- copyMem(addr(staddr[0]), saddr, saddrLen)
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
- cast[ptr SockAddr](addr(staddr[0])), stalen)
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- result = false # We still want this callback to be called.
- else:
- retFuture.complete()
- addWrite(socket, cb)
- return retFuture
- proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
- saddr: ptr SockAddr, saddrLen: ptr SockLen,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
- ## Receives a datagram data from `socket` into `data`, which must
- ## be at least of size `size` in bytes, address of datagram's sender
- ## will be stored into `saddr` and `saddrLen`. Returned future will
- ## complete once one datagram has been received, and will return size
- ## of packet received.
- var retFuture = newFuture[int]("recvFromInto")
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
- saddr, saddrLen)
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- result = false
- else:
- retFuture.complete(res)
- addRead(socket, cb)
- return retFuture
- proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
- inheritable = defined(nimInheritHandles)):
- owned(Future[tuple[address: string, client: AsyncFD]]) =
- var retFuture = newFuture[tuple[address: string,
- client: AsyncFD]]("acceptAddr")
- proc cb(sock: AsyncFD): bool {.gcsafe.} =
- result = true
- var sockAddress: Sockaddr_storage
- var addrLen = sizeof(sockAddress).SockLen
- var client =
- when declared(accept4):
- accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
- addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC)
- else:
- accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
- addr(addrLen))
- when declared(setInheritable) and not declared(accept4):
- if client != osInvalidSocket and not setInheritable(client, inheritable):
- # Set failure first because close() itself can fail,
- # altering osLastError().
- retFuture.fail(newOSError(osLastError()))
- close client
- return false
- if client == osInvalidSocket:
- let lastError = osLastError()
- assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN
- if lastError.int32 == EINTR:
- return false
- else:
- if flags.isDisconnectionError(lastError):
- return false
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- try:
- let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
- register(client.AsyncFD)
- retFuture.complete((address, client.AsyncFD))
- except:
- # getAddrString may raise
- client.close()
- retFuture.fail(getCurrentException())
- addRead(socket, cb)
- return retFuture
- when ioselSupportedPlatform:
- proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
- ## Start watching for timeout expiration, and then call the
- ## callback `cb`.
- ## `timeout` - time in milliseconds,
- ## `oneshot` - if `true` only one event will be dispatched,
- ## if `false` continuous events every `timeout` milliseconds.
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- data.readList.add(cb)
- p.selector.registerTimer(timeout, oneshot, data)
- proc addSignal*(signal: int, cb: Callback) =
- ## Start watching signal `signal`, and when signal appears, call the
- ## callback `cb`.
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- data.readList.add(cb)
- p.selector.registerSignal(signal, data)
- proc addProcess*(pid: int, cb: Callback) =
- ## Start watching for process exit with pid `pid`, and then call
- ## the callback `cb`.
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- data.readList.add(cb)
- p.selector.registerProcess(pid, data)
- proc newAsyncEvent*(): AsyncEvent =
- ## Creates new `AsyncEvent`.
- result = AsyncEvent(newSelectEvent())
- proc trigger*(ev: AsyncEvent) =
- ## Sets new `AsyncEvent` to signaled state.
- trigger(SelectEvent(ev))
- proc close*(ev: AsyncEvent) =
- ## Closes `AsyncEvent`
- close(SelectEvent(ev))
- proc addEvent*(ev: AsyncEvent, cb: Callback) =
- ## Start watching for event `ev`, and call callback `cb`, when
- ## ev will be set to signaled state.
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- data.readList.add(cb)
- p.selector.registerEvent(SelectEvent(ev), data)
- proc drain*(timeout = 500) =
- ## Waits for completion of **all** events and processes them. Raises `ValueError`
- ## if there are no pending operations. In contrast to `poll` this
- ## processes as many events as are available until the timeout has elapsed.
- var curTimeout = timeout
- let start = now()
- while hasPendingOperations():
- discard runOnce(curTimeout)
- curTimeout -= (now() - start).inMilliseconds.int
- if curTimeout < 0:
- break
- proc poll*(timeout = 500) =
- ## Waits for completion events and processes them. Raises `ValueError`
- ## if there are no pending operations. This runs the underlying OS
- ## `epoll`:idx: or `kqueue`:idx: primitive only once.
- discard runOnce(timeout)
- template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped,
- inheritable = defined(nimInheritHandles)) =
- let handle = createNativeSocket(domain, sockType, protocol, inheritable)
- if handle == osInvalidSocket:
- return osInvalidSocket.AsyncFD
- handle.setBlocking(false)
- when defined(macosx) and not defined(nimdoc):
- handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
- result = handle.AsyncFD
- register(result)
- proc createAsyncNativeSocket*(domain: cint, sockType: cint,
- protocol: cint,
- inheritable = defined(nimInheritHandles)): AsyncFD =
- createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
- proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
- sockType: SockType = SOCK_STREAM,
- protocol: Protocol = IPPROTO_TCP,
- inheritable = defined(nimInheritHandles)): AsyncFD =
- createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
- when defined(windows) or defined(nimdoc):
- proc bindToDomain(handle: SocketHandle, domain: Domain) =
- # Extracted into a separate proc, because connect() on Windows requires
- # the socket to be initially bound.
- template doBind(saddr) =
- if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
- sizeof(saddr).SockLen) < 0'i32:
- raiseOSError(osLastError())
- if domain == Domain.AF_INET6:
- var saddr: Sockaddr_in6
- saddr.sin6_family = uint16(toInt(domain))
- doBind(saddr)
- else:
- var saddr: Sockaddr_in
- saddr.sin_family = uint16(toInt(domain))
- doBind(saddr)
- proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
- let retFuture = newFuture[void]("doConnect")
- result = retFuture
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- const SO_UPDATE_CONNECT_CONTEXT = 0x7010
- socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022
- retFuture.complete()
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- )
- let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
- cint(addrInfo.ai_addrlen), nil, 0, nil,
- cast[POVERLAPPED](ol))
- if ret:
- # Request to connect completed immediately.
- retFuture.complete()
- # We don't deallocate `ol` here because even though this completed
- # immediately poll will still be notified about its completion and it
- # will free `ol`.
- else:
- let lastError = osLastError()
- if lastError.int32 != ERROR_IO_PENDING:
- # With ERROR_IO_PENDING `ol` will be deallocated in `poll`,
- # and the future will be completed/failed there, too.
- GC_unref(ol)
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
- let retFuture = newFuture[void]("doConnect")
- result = retFuture
- proc cb(fd: AsyncFD): bool =
- let ret = SocketHandle(fd).getSockOptInt(
- cint(SOL_SOCKET), cint(SO_ERROR))
- if ret == 0:
- # We have connected.
- retFuture.complete()
- return true
- elif ret == EINTR:
- # interrupted, keep waiting
- return false
- else:
- retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
- return true
- let ret = connect(socket.SocketHandle,
- addrInfo.ai_addr,
- addrInfo.ai_addrlen.SockLen)
- if ret == 0:
- # Request to connect completed immediately.
- retFuture.complete()
- else:
- let lastError = osLastError()
- if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
- addWrite(socket, cb)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
- protocol: Protocol = IPPROTO_RAW) =
- ## Iterates through the AddrInfo linked list asynchronously
- ## until the connection can be established.
- const shouldCreateFd = not declared(fd)
- when shouldCreateFd:
- let sockType = protocol.toSockType()
- var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
- for i in low(fdPerDomain)..high(fdPerDomain):
- fdPerDomain[i] = osInvalidSocket.AsyncFD
- template closeUnusedFds(domainToKeep = -1) {.dirty.} =
- for i, fd in fdPerDomain:
- if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
- fd.closeSocket()
- var lastException: ref Exception
- var curAddrInfo = addrInfo
- var domain: Domain
- when shouldCreateFd:
- var curFd: AsyncFD
- else:
- var curFd = fd
- proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
- if fut == nil or fut.failed:
- if fut != nil:
- lastException = fut.readError()
- while curAddrInfo != nil:
- let domainOpt = curAddrInfo.ai_family.toKnownDomain()
- if domainOpt.isSome:
- domain = domainOpt.unsafeGet()
- break
- curAddrInfo = curAddrInfo.ai_next
- if curAddrInfo == nil:
- freeAddrInfo(addrInfo)
- when shouldCreateFd:
- closeUnusedFds()
- if lastException != nil:
- retFuture.fail(lastException)
- else:
- retFuture.fail(newException(
- IOError, "Couldn't resolve address: " & address))
- return
- when shouldCreateFd:
- curFd = fdPerDomain[ord(domain)]
- if curFd == osInvalidSocket.AsyncFD:
- try:
- curFd = createAsyncNativeSocket(domain, sockType, protocol)
- except:
- freeAddrInfo(addrInfo)
- closeUnusedFds()
- raise getCurrentException()
- when defined(windows):
- curFd.SocketHandle.bindToDomain(domain)
- fdPerDomain[ord(domain)] = curFd
- doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
- curAddrInfo = curAddrInfo.ai_next
- else:
- freeAddrInfo(addrInfo)
- when shouldCreateFd:
- closeUnusedFds(ord(domain))
- retFuture.complete(curFd)
- else:
- retFuture.complete()
- tryNextAddrInfo(nil)
- proc dial*(address: string, port: Port,
- protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
- ## Establishes connection to the specified `address`:`port` pair via the
- ## specified protocol. The procedure iterates through possible
- ## resolutions of the `address` until it succeeds, meaning that it
- ## seamlessly works with both IPv4 and IPv6.
- ## Returns the async file descriptor, registered in the dispatcher of
- ## the current thread, ready to send or receive data.
- let retFuture = newFuture[AsyncFD]("dial")
- result = retFuture
- let sockType = protocol.toSockType()
- let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
- asyncAddrInfoLoop(aiList, noFD, protocol)
- proc connect*(socket: AsyncFD, address: string, port: Port,
- domain = Domain.AF_INET): owned(Future[void]) =
- let retFuture = newFuture[void]("connect")
- result = retFuture
- when defined(windows):
- verifyPresence(socket)
- else:
- assert getSockDomain(socket.SocketHandle) == domain
- let aiList = getAddrInfo(address, port, domain)
- when defined(windows):
- socket.SocketHandle.bindToDomain(domain)
- asyncAddrInfoLoop(aiList, socket)
- proc sleepAsync*(ms: int | float): owned(Future[void]) =
- ## Suspends the execution of the current async procedure for the next
- ## `ms` milliseconds.
- var retFuture = newFuture[void]("sleepAsync")
- let p = getGlobalDispatcher()
- when ms is int:
- p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
- elif ms is float:
- let ns = (ms * 1_000_000).int64
- p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
- return retFuture
- proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
- ## Returns a future which will complete once `fut` completes or after
- ## `timeout` milliseconds has elapsed.
- ##
- ## If `fut` completes first the returned future will hold true,
- ## otherwise, if `timeout` milliseconds has elapsed first, the returned
- ## future will hold false.
- var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
- var timeoutFuture = sleepAsync(timeout)
- fut.callback =
- proc () =
- if not retFuture.finished:
- if fut.failed:
- retFuture.fail(fut.error)
- else:
- retFuture.complete(true)
- timeoutFuture.callback =
- proc () =
- if not retFuture.finished: retFuture.complete(false)
- return retFuture
- proc accept*(socket: AsyncFD,
- flags = {SocketFlag.SafeDisconn},
- inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) =
- ## Accepts a new connection. Returns a future containing the client socket
- ## corresponding to that connection.
- ##
- ## If `inheritable` is false (the default), the resulting client socket
- ## will not be inheritable by child processes.
- ##
- ## The future will complete when the connection is successfully accepted.
- var retFut = newFuture[AsyncFD]("accept")
- var fut = acceptAddr(socket, flags, inheritable)
- fut.callback =
- proc (future: Future[tuple[address: string, client: AsyncFD]]) =
- assert future.finished
- if future.failed:
- retFut.fail(future.error)
- else:
- retFut.complete(future.read.client)
- return retFut
- proc keepAlive(x: string) =
- discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive"
- proc send*(socket: AsyncFD, data: string,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- ## Sends `data` to `socket`. The returned future will complete once all
- ## data has been sent.
- var retFuture = newFuture[void]("send")
- if data.len > 0:
- let sendFut = socket.send(unsafeAddr data[0], data.len, flags)
- sendFut.callback =
- proc () =
- keepAlive(data)
- if sendFut.failed:
- retFuture.fail(sendFut.error)
- else:
- retFuture.complete()
- else:
- retFuture.complete()
- return retFuture
- # -- Await Macro
- import asyncmacro
- export asyncmacro
- proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
- ## Returns a future that will complete when all the string data from the
- ## specified future stream is retrieved.
- result = ""
- while true:
- let (hasValue, value) = await future.read()
- if hasValue:
- result.add(value)
- else:
- break
- proc callSoon(cbproc: proc () {.gcsafe.}) =
- getGlobalDispatcher().callbacks.addLast(cbproc)
- proc runForever*() =
- ## Begins a never ending global dispatcher poll loop.
- while true:
- poll()
- proc waitFor*[T](fut: Future[T]): T =
- ## **Blocks** the current thread until the specified future completes.
- while not fut.finished:
- poll()
- fut.read
- proc activeDescriptors*(): int {.inline.} =
- ## Returns the current number of active file descriptors for the current
- ## event loop. This is a cheap operation that does not involve a system call.
- when defined(windows):
- result = getGlobalDispatcher().handles.len
- elif not defined(nimdoc):
- result = getGlobalDispatcher().selector.count
- when defined(posix):
- import posix
- when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or
- defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx):
- proc maxDescriptors*(): int {.raises: OSError.} =
- ## Returns the maximum number of active file descriptors for the current
- ## process. This involves a system call. For now `maxDescriptors` is
- ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris.
- when defined(windows):
- result = 16_700_000
- elif defined(zephyr) or defined(freertos):
- result = FD_MAX
- else:
- var fdLim: RLimit
- if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
- raiseOSError(osLastError())
- result = int(fdLim.rlim_cur) - 1
- when defined(genode):
- proc scheduleCallbacks*(): bool {.discardable.} =
- ## *Genode only.*
- ## Schedule callback processing and return immediately.
- ## Returns `false` if there is nothing to schedule.
- ## RPC servers should call this to dispatch `callSoon`
- ## bodies after retiring an RPC to its client.
- ## This is effectively a non-blocking `poll(…)` and is
- ## equivalent to scheduling a momentary no-op timeout
- ## but faster and with less overhead.
- let dis = getGlobalDispatcher()
- result = dis.callbacks.len > 0
- if result: submit(dis.signalHandler.cap)
|