123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- when defined(windows):
- import asyncdispatch, nativesockets, net, strutils, os, winlean
- from stdtest/netutils import bindAvailablePort
- var msgCount = 0
- const
- swarmSize = 50
- messagesToSend = 100
- var clientCount = 0
- proc winConnect*(socket: AsyncFD, address: string, port: Port,
- domain = Domain.AF_INET): Future[void] =
- var retFuture = newFuture[void]("winConnect")
- proc cb(fd: AsyncFD): bool =
- var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
- if ret == 0:
- # We have connected.
- retFuture.complete()
- return true
- else:
- retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
- return true
- var aiList = getAddrInfo(address, port, domain)
- var success = false
- var lastError: OSErrorCode = OSErrorCode(0)
- var it = aiList
- while it != nil:
- var ret = nativesockets.connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
- if ret == 0:
- # Request to connect completed immediately.
- success = true
- retFuture.complete()
- break
- else:
- lastError = osLastError()
- if lastError.int32 == WSAEWOULDBLOCK:
- success = true
- addWrite(socket, cb)
- break
- else:
- success = false
- it = it.ai_next
- freeAddrInfo(aiList)
- if not success:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- return retFuture
- proc winRecv*(socket: AsyncFD, size: int,
- flags = {SocketFlag.SafeDisconn}): Future[string] =
- var retFuture = newFuture[string]("recv")
- var readBuffer = newString(size)
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
- flags.toOSFlags())
- if res < 0:
- let lastError = osLastError()
- if flags.isDisconnectionError(lastError):
- retFuture.complete("")
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- elif res == 0:
- # Disconnected
- retFuture.complete("")
- else:
- readBuffer.setLen(res)
- retFuture.complete(readBuffer)
- # TODO: The following causes a massive slowdown.
- #if not cb(socket):
- addRead(socket, cb)
- return retFuture
- proc winRecvInto*(socket: AsyncFD, buf: cstring, size: int,
- flags = {SocketFlag.SafeDisconn}): Future[int] =
- var retFuture = newFuture[int]("winRecvInto")
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = nativesockets.recv(sock.SocketHandle, buf, size.cint,
- flags.toOSFlags())
- if res < 0:
- let lastError = osLastError()
- if flags.isDisconnectionError(lastError):
- retFuture.complete(0)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- retFuture.complete(res)
- # TODO: The following causes a massive slowdown.
- #if not cb(socket):
- addRead(socket, cb)
- return retFuture
- proc winSend*(socket: AsyncFD, data: string,
- flags = {SocketFlag.SafeDisconn}): Future[void] =
- var retFuture = newFuture[void]("winSend")
- var written = 0
- proc cb(sock: AsyncFD): bool =
- result = true
- let netSize = data.len-written
- var d = data.cstring
- let res = nativesockets.send(sock.SocketHandle, addr d[written], netSize.cint, 0)
- if res < 0:
- let lastError = osLastError()
- if flags.isDisconnectionError(lastError):
- retFuture.complete()
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- written.inc(res)
- if res != netSize:
- result = false # We still have data to send.
- else:
- retFuture.complete()
- # TODO: The following causes crashes.
- #if not cb(socket):
- addWrite(socket, cb)
- return retFuture
- proc winAcceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
- Future[tuple[address: string, client: AsyncFD]] =
- var retFuture = newFuture[tuple[address: string,
- client: AsyncFD]]("winAcceptAddr")
- proc cb(sock: AsyncFD): bool =
- result = true
- if not retFuture.finished:
- var sockAddress = Sockaddr()
- var addrLen = sizeof(sockAddress).Socklen
- var client = nativesockets.accept(sock.SocketHandle,
- cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
- if client == osInvalidSocket:
- retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
- else:
- retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
- addRead(socket, cb)
- return retFuture
- proc winAccept*(socket: AsyncFD,
- flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
- ## Accepts a new connection. Returns a future containing the client socket
- ## corresponding to that connection.
- ## The future will complete when the connection is successfully accepted.
- var retFut = newFuture[AsyncFD]("winAccept")
- var fut = winAcceptAddr(socket, flags)
- fut.callback =
- proc (future: Future[tuple[address: string, client: AsyncFD]]) =
- assert future.finished
- if future.failed:
- retFut.fail(future.error)
- else:
- retFut.complete(future.read.client)
- return retFut
- proc winRecvLine*(socket: AsyncFD): Future[string] {.async.} =
- ## Reads a line of data from ``socket``. Returned future will complete once
- ## a full line is read or an error occurs.
- ##
- ## If a full line is read ``\r\L`` is not
- ## added to ``line``, however if solely ``\r\L`` is read then ``line``
- ## will be set to it.
- ##
- ## If the socket is disconnected, ``line`` will be set to ``""``.
- ##
- ## If the socket is disconnected in the middle of a line (before ``\r\L``
- ## is read) then line will be set to ``""``.
- ## The partial line **will be lost**.
- ##
- ## **Warning**: This assumes that lines are delimited by ``\r\L``.
- ##
- ## **Note**: This procedure is mostly used for testing. You likely want to
- ## use ``asyncnet.recvLine`` instead.
- template addNLIfEmpty() =
- if result.len == 0:
- result.add("\c\L")
- result = ""
- var c = ""
- while true:
- c = await winRecv(socket, 1)
- if c.len == 0:
- return ""
- if c == "\r":
- c = await winRecv(socket, 1)
- assert c == "\l"
- addNLIfEmpty()
- return
- elif c == "\L":
- addNLIfEmpty()
- return
- add(result, c)
- proc sendMessages(client: AsyncFD) {.async.} =
- for i in 0 ..< messagesToSend:
- await winSend(client, "Message " & $i & "\c\L")
- proc launchSwarm(port: Port) {.async.} =
- for i in 0 ..< swarmSize:
- var sock = createNativeSocket()
- setBlocking(sock, false)
- await winConnect(AsyncFD(sock), "localhost", port)
- await sendMessages(AsyncFD(sock))
- discard closeSocket(sock)
- proc readMessages(client: AsyncFD) {.async.} =
- while true:
- var line = await winRecvLine(client)
- if line == "":
- closeSocket(client)
- clientCount.inc
- break
- else:
- if line.startswith("Message "):
- msgCount.inc
- else:
- doAssert false
- proc createServer(server: SocketHandle) {.async.} =
- discard server.listen()
- while true:
- asyncCheck readMessages(await winAccept(AsyncFD(server)))
- var server = createNativeSocket()
- setBlocking(server, false)
- let port = bindAvailablePort(server)
- asyncCheck createServer(server)
- asyncCheck launchSwarm(port)
- while true:
- poll()
- if clientCount == swarmSize: break
- assert msgCount == swarmSize * messagesToSend
- doAssert msgCount == 5000
|