twinasyncrw.nim 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. when defined(windows):
  2. import asyncdispatch, nativesockets, net, strutils, os, winlean
  3. from stdtest/netutils import bindAvailablePort
  4. var msgCount = 0
  5. const
  6. swarmSize = 50
  7. messagesToSend = 100
  8. var clientCount = 0
  9. proc winConnect*(socket: AsyncFD, address: string, port: Port,
  10. domain = Domain.AF_INET): Future[void] =
  11. var retFuture = newFuture[void]("winConnect")
  12. proc cb(fd: AsyncFD): bool =
  13. var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
  14. if ret == 0:
  15. # We have connected.
  16. retFuture.complete()
  17. return true
  18. else:
  19. retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
  20. return true
  21. var aiList = getAddrInfo(address, port, domain)
  22. var success = false
  23. var lastError: OSErrorCode = OSErrorCode(0)
  24. var it = aiList
  25. while it != nil:
  26. var ret = nativesockets.connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
  27. if ret == 0:
  28. # Request to connect completed immediately.
  29. success = true
  30. retFuture.complete()
  31. break
  32. else:
  33. lastError = osLastError()
  34. if lastError.int32 == WSAEWOULDBLOCK:
  35. success = true
  36. addWrite(socket, cb)
  37. break
  38. else:
  39. success = false
  40. it = it.ai_next
  41. freeAddrInfo(aiList)
  42. if not success:
  43. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  44. return retFuture
  45. proc winRecv*(socket: AsyncFD, size: int,
  46. flags = {SocketFlag.SafeDisconn}): Future[string] =
  47. var retFuture = newFuture[string]("recv")
  48. var readBuffer = newString(size)
  49. proc cb(sock: AsyncFD): bool =
  50. result = true
  51. let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
  52. flags.toOSFlags())
  53. if res < 0:
  54. let lastError = osLastError()
  55. if flags.isDisconnectionError(lastError):
  56. retFuture.complete("")
  57. else:
  58. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  59. elif res == 0:
  60. # Disconnected
  61. retFuture.complete("")
  62. else:
  63. readBuffer.setLen(res)
  64. retFuture.complete(readBuffer)
  65. # TODO: The following causes a massive slowdown.
  66. #if not cb(socket):
  67. addRead(socket, cb)
  68. return retFuture
  69. proc winRecvInto*(socket: AsyncFD, buf: cstring, size: int,
  70. flags = {SocketFlag.SafeDisconn}): Future[int] =
  71. var retFuture = newFuture[int]("winRecvInto")
  72. proc cb(sock: AsyncFD): bool =
  73. result = true
  74. let res = nativesockets.recv(sock.SocketHandle, buf, size.cint,
  75. flags.toOSFlags())
  76. if res < 0:
  77. let lastError = osLastError()
  78. if flags.isDisconnectionError(lastError):
  79. retFuture.complete(0)
  80. else:
  81. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  82. else:
  83. retFuture.complete(res)
  84. # TODO: The following causes a massive slowdown.
  85. #if not cb(socket):
  86. addRead(socket, cb)
  87. return retFuture
  88. proc winSend*(socket: AsyncFD, data: string,
  89. flags = {SocketFlag.SafeDisconn}): Future[void] =
  90. var retFuture = newFuture[void]("winSend")
  91. var written = 0
  92. proc cb(sock: AsyncFD): bool =
  93. result = true
  94. let netSize = data.len-written
  95. var d = data.cstring
  96. let res = nativesockets.send(sock.SocketHandle, addr d[written], netSize.cint, 0)
  97. if res < 0:
  98. let lastError = osLastError()
  99. if flags.isDisconnectionError(lastError):
  100. retFuture.complete()
  101. else:
  102. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  103. else:
  104. written.inc(res)
  105. if res != netSize:
  106. result = false # We still have data to send.
  107. else:
  108. retFuture.complete()
  109. # TODO: The following causes crashes.
  110. #if not cb(socket):
  111. addWrite(socket, cb)
  112. return retFuture
  113. proc winAcceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
  114. Future[tuple[address: string, client: AsyncFD]] =
  115. var retFuture = newFuture[tuple[address: string,
  116. client: AsyncFD]]("winAcceptAddr")
  117. proc cb(sock: AsyncFD): bool =
  118. result = true
  119. if not retFuture.finished:
  120. var sockAddress = Sockaddr()
  121. var addrLen = sizeof(sockAddress).Socklen
  122. var client = nativesockets.accept(sock.SocketHandle,
  123. cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
  124. if client == osInvalidSocket:
  125. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  126. else:
  127. retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
  128. addRead(socket, cb)
  129. return retFuture
  130. proc winAccept*(socket: AsyncFD,
  131. flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
  132. ## Accepts a new connection. Returns a future containing the client socket
  133. ## corresponding to that connection.
  134. ## The future will complete when the connection is successfully accepted.
  135. var retFut = newFuture[AsyncFD]("winAccept")
  136. var fut = winAcceptAddr(socket, flags)
  137. fut.callback =
  138. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  139. assert future.finished
  140. if future.failed:
  141. retFut.fail(future.error)
  142. else:
  143. retFut.complete(future.read.client)
  144. return retFut
  145. proc winRecvLine*(socket: AsyncFD): Future[string] {.async.} =
  146. ## Reads a line of data from ``socket``. Returned future will complete once
  147. ## a full line is read or an error occurs.
  148. ##
  149. ## If a full line is read ``\r\L`` is not
  150. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  151. ## will be set to it.
  152. ##
  153. ## If the socket is disconnected, ``line`` will be set to ``""``.
  154. ##
  155. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  156. ## is read) then line will be set to ``""``.
  157. ## The partial line **will be lost**.
  158. ##
  159. ## **Warning**: This assumes that lines are delimited by ``\r\L``.
  160. ##
  161. ## **Note**: This procedure is mostly used for testing. You likely want to
  162. ## use ``asyncnet.recvLine`` instead.
  163. template addNLIfEmpty() =
  164. if result.len == 0:
  165. result.add("\c\L")
  166. result = ""
  167. var c = ""
  168. while true:
  169. c = await winRecv(socket, 1)
  170. if c.len == 0:
  171. return ""
  172. if c == "\r":
  173. c = await winRecv(socket, 1)
  174. assert c == "\l"
  175. addNLIfEmpty()
  176. return
  177. elif c == "\L":
  178. addNLIfEmpty()
  179. return
  180. add(result, c)
  181. proc sendMessages(client: AsyncFD) {.async.} =
  182. for i in 0 ..< messagesToSend:
  183. await winSend(client, "Message " & $i & "\c\L")
  184. proc launchSwarm(port: Port) {.async.} =
  185. for i in 0 ..< swarmSize:
  186. var sock = createNativeSocket()
  187. setBlocking(sock, false)
  188. await winConnect(AsyncFD(sock), "localhost", port)
  189. await sendMessages(AsyncFD(sock))
  190. discard closeSocket(sock)
  191. proc readMessages(client: AsyncFD) {.async.} =
  192. while true:
  193. var line = await winRecvLine(client)
  194. if line == "":
  195. closeSocket(client)
  196. clientCount.inc
  197. break
  198. else:
  199. if line.startsWith("Message "):
  200. msgCount.inc
  201. else:
  202. doAssert false
  203. proc createServer(server: SocketHandle) {.async.} =
  204. discard server.listen()
  205. while true:
  206. asyncCheck readMessages(await winAccept(AsyncFD(server)))
  207. var server = createNativeSocket()
  208. setBlocking(server, false)
  209. let port = bindAvailablePort(server)
  210. asyncCheck createServer(server)
  211. asyncCheck launchSwarm(port)
  212. while true:
  213. poll()
  214. if clientCount == swarmSize: break
  215. assert msgCount == swarmSize * messagesToSend
  216. doAssert msgCount == 5000