twinasyncrw.nim 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. discard """
  2. output: "5000"
  3. """
  4. when defined(windows):
  5. import asyncdispatch, nativesockets, net, strutils, os, winlean
  6. var msgCount = 0
  7. const
  8. swarmSize = 50
  9. messagesToSend = 100
  10. var clientCount = 0
  11. proc winConnect*(socket: AsyncFD, address: string, port: Port,
  12. domain = Domain.AF_INET): Future[void] =
  13. var retFuture = newFuture[void]("winConnect")
  14. proc cb(fd: AsyncFD): bool =
  15. var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
  16. if ret == 0:
  17. # We have connected.
  18. retFuture.complete()
  19. return true
  20. else:
  21. retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
  22. return true
  23. var aiList = getAddrInfo(address, port, domain)
  24. var success = false
  25. var lastError: OSErrorCode = OSErrorCode(0)
  26. var it = aiList
  27. while it != nil:
  28. var ret = nativesockets.connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
  29. if ret == 0:
  30. # Request to connect completed immediately.
  31. success = true
  32. retFuture.complete()
  33. break
  34. else:
  35. lastError = osLastError()
  36. if lastError.int32 == WSAEWOULDBLOCK:
  37. success = true
  38. addWrite(socket, cb)
  39. break
  40. else:
  41. success = false
  42. it = it.ai_next
  43. dealloc(aiList)
  44. if not success:
  45. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  46. return retFuture
  47. proc winRecv*(socket: AsyncFD, size: int,
  48. flags = {SocketFlag.SafeDisconn}): Future[string] =
  49. var retFuture = newFuture[string]("recv")
  50. var readBuffer = newString(size)
  51. proc cb(sock: AsyncFD): bool =
  52. result = true
  53. let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
  54. flags.toOSFlags())
  55. if res < 0:
  56. let lastError = osLastError()
  57. if flags.isDisconnectionError(lastError):
  58. retFuture.complete("")
  59. else:
  60. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  61. elif res == 0:
  62. # Disconnected
  63. retFuture.complete("")
  64. else:
  65. readBuffer.setLen(res)
  66. retFuture.complete(readBuffer)
  67. # TODO: The following causes a massive slowdown.
  68. #if not cb(socket):
  69. addRead(socket, cb)
  70. return retFuture
  71. proc winRecvInto*(socket: AsyncFD, buf: cstring, size: int,
  72. flags = {SocketFlag.SafeDisconn}): Future[int] =
  73. var retFuture = newFuture[int]("winRecvInto")
  74. proc cb(sock: AsyncFD): bool =
  75. result = true
  76. let res = nativesockets.recv(sock.SocketHandle, buf, size.cint,
  77. flags.toOSFlags())
  78. if res < 0:
  79. let lastError = osLastError()
  80. if flags.isDisconnectionError(lastError):
  81. retFuture.complete(0)
  82. else:
  83. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  84. else:
  85. retFuture.complete(res)
  86. # TODO: The following causes a massive slowdown.
  87. #if not cb(socket):
  88. addRead(socket, cb)
  89. return retFuture
  90. proc winSend*(socket: AsyncFD, data: string,
  91. flags = {SocketFlag.SafeDisconn}): Future[void] =
  92. var retFuture = newFuture[void]("winSend")
  93. var written = 0
  94. proc cb(sock: AsyncFD): bool =
  95. result = true
  96. let netSize = data.len-written
  97. var d = data.cstring
  98. let res = nativesockets.send(sock.SocketHandle, addr d[written], netSize.cint, 0)
  99. if res < 0:
  100. let lastError = osLastError()
  101. if flags.isDisconnectionError(lastError):
  102. retFuture.complete()
  103. else:
  104. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  105. else:
  106. written.inc(res)
  107. if res != netSize:
  108. result = false # We still have data to send.
  109. else:
  110. retFuture.complete()
  111. # TODO: The following causes crashes.
  112. #if not cb(socket):
  113. addWrite(socket, cb)
  114. return retFuture
  115. proc winAcceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
  116. Future[tuple[address: string, client: AsyncFD]] =
  117. var retFuture = newFuture[tuple[address: string,
  118. client: AsyncFD]]("winAcceptAddr")
  119. proc cb(sock: AsyncFD): bool =
  120. result = true
  121. if not retFuture.finished:
  122. var sockAddress = Sockaddr()
  123. var addrLen = sizeof(sockAddress).Socklen
  124. var client = nativesockets.accept(sock.SocketHandle,
  125. cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
  126. if client == osInvalidSocket:
  127. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  128. else:
  129. retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
  130. addRead(socket, cb)
  131. return retFuture
  132. proc winAccept*(socket: AsyncFD,
  133. flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
  134. ## Accepts a new connection. Returns a future containing the client socket
  135. ## corresponding to that connection.
  136. ## The future will complete when the connection is successfully accepted.
  137. var retFut = newFuture[AsyncFD]("winAccept")
  138. var fut = winAcceptAddr(socket, flags)
  139. fut.callback =
  140. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  141. assert future.finished
  142. if future.failed:
  143. retFut.fail(future.error)
  144. else:
  145. retFut.complete(future.read.client)
  146. return retFut
  147. proc winRecvLine*(socket: AsyncFD): Future[string] {.async.} =
  148. ## Reads a line of data from ``socket``. Returned future will complete once
  149. ## a full line is read or an error occurs.
  150. ##
  151. ## If a full line is read ``\r\L`` is not
  152. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  153. ## will be set to it.
  154. ##
  155. ## If the socket is disconnected, ``line`` will be set to ``""``.
  156. ##
  157. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  158. ## is read) then line will be set to ``""``.
  159. ## The partial line **will be lost**.
  160. ##
  161. ## **Warning**: This assumes that lines are delimited by ``\r\L``.
  162. ##
  163. ## **Note**: This procedure is mostly used for testing. You likely want to
  164. ## use ``asyncnet.recvLine`` instead.
  165. template addNLIfEmpty() =
  166. if result.len == 0:
  167. result.add("\c\L")
  168. result = ""
  169. var c = ""
  170. while true:
  171. c = await winRecv(socket, 1)
  172. if c.len == 0:
  173. return ""
  174. if c == "\r":
  175. c = await winRecv(socket, 1)
  176. assert c == "\l"
  177. addNLIfEmpty()
  178. return
  179. elif c == "\L":
  180. addNLIfEmpty()
  181. return
  182. add(result, c)
  183. proc sendMessages(client: AsyncFD) {.async.} =
  184. for i in 0 .. <messagesToSend:
  185. await winSend(client, "Message " & $i & "\c\L")
  186. proc launchSwarm(port: Port) {.async.} =
  187. for i in 0 .. <swarmSize:
  188. var sock = newNativeSocket()
  189. setBlocking(sock, false)
  190. await winConnect(AsyncFD(sock), "localhost", port)
  191. await sendMessages(AsyncFD(sock))
  192. discard closeSocket(sock)
  193. proc readMessages(client: AsyncFD) {.async.} =
  194. while true:
  195. var line = await winRecvLine(client)
  196. if line == "":
  197. closeSocket(client)
  198. clientCount.inc
  199. break
  200. else:
  201. if line.startswith("Message "):
  202. msgCount.inc
  203. else:
  204. doAssert false
  205. proc createServer(port: Port) {.async.} =
  206. var server = newNativeSocket()
  207. setBlocking(server, false)
  208. block:
  209. var name = Sockaddr_in()
  210. name.sin_family = toInt(Domain.AF_INET).uint16
  211. name.sin_port = htons(uint16(port))
  212. name.sin_addr.s_addr = htonl(INADDR_ANY)
  213. if bindAddr(server, cast[ptr SockAddr](addr(name)),
  214. sizeof(name).Socklen) < 0'i32:
  215. raiseOSError(osLastError())
  216. discard server.listen()
  217. while true:
  218. asyncCheck readMessages(await winAccept(AsyncFD(server)))
  219. asyncCheck createServer(Port(10335))
  220. asyncCheck launchSwarm(Port(10335))
  221. while true:
  222. poll()
  223. if clientCount == swarmSize: break
  224. assert msgCount == swarmSize * messagesToSend
  225. echo msgCount
  226. else:
  227. echo(5000)