asyncnet.nim 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2017 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements a high-level asynchronous sockets API based on the
  10. ## asynchronous dispatcher defined in the ``asyncdispatch`` module.
  11. ##
  12. ## Asynchronous IO in Nim
  13. ## ----------------------
  14. ##
  15. ## Async IO in Nim consists of multiple layers (from highest to lowest):
  16. ##
  17. ## * ``asyncnet`` module
  18. ##
  19. ## * Async await
  20. ##
  21. ## * ``asyncdispatch`` module (event loop)
  22. ##
  23. ## * ``selectors`` module
  24. ##
  25. ## Each builds on top of the layers below it. The selectors module is an
  26. ## abstraction for the various system ``select()`` mechanisms such as epoll or
  27. ## kqueue. If you wish you can use it directly, and some people have done so
  28. ## `successfully <http://goran.krampe.se/2014/10/25/nim-socketserver/>`_.
  29. ## But you must be aware that on Windows it only supports
  30. ## ``select()``.
  31. ##
  32. ## The async dispatcher implements the proactor pattern and also has an
  33. ## implementation of IOCP. It implements the proactor pattern for other
  34. ## OS' via the selectors module. Futures are also implemented here, and
  35. ## indeed all the procedures return a future.
  36. ##
  37. ## The final layer is the async await transformation. This allows you to
  38. ## write asynchronous code in a synchronous style and works similar to
  39. ## C#'s await. The transformation works by converting any async procedures
  40. ## into an iterator.
  41. ##
  42. ## This is all single threaded, fully non-blocking and does give you a
  43. ## lot of control. In theory you should be able to work with any of these
  44. ## layers interchangeably (as long as you only care about non-Windows
  45. ## platforms).
  46. ##
  47. ## For most applications using ``asyncnet`` is the way to go as it builds
  48. ## over all the layers, providing some extra features such as buffering.
  49. ##
  50. ## SSL
  51. ## ----
  52. ##
  53. ## SSL can be enabled by compiling with the ``-d:ssl`` flag.
  54. ##
  55. ## You must create a new SSL context with the ``newContext`` function defined
  56. ## in the ``net`` module. You may then call ``wrapSocket`` on your socket using
  57. ## the newly created SSL context to get an SSL socket.
  58. ##
  59. ## Examples
  60. ## --------
  61. ##
  62. ## Chat server
  63. ## ^^^^^^^^^^^
  64. ##
  65. ## The following example demonstrates a simple chat server.
  66. ##
  67. ## .. code-block::nim
  68. ##
  69. ## import asyncnet, asyncdispatch
  70. ##
  71. ## var clients {.threadvar.}: seq[AsyncSocket]
  72. ##
  73. ## proc processClient(client: AsyncSocket) {.async.} =
  74. ## while true:
  75. ## let line = await client.recvLine()
  76. ## if line.len == 0: break
  77. ## for c in clients:
  78. ## await c.send(line & "\c\L")
  79. ##
  80. ## proc serve() {.async.} =
  81. ## clients = @[]
  82. ## var server = newAsyncSocket()
  83. ## server.setSockOpt(OptReuseAddr, true)
  84. ## server.bindAddr(Port(12345))
  85. ## server.listen()
  86. ##
  87. ## while true:
  88. ## let client = await server.accept()
  89. ## clients.add client
  90. ##
  91. ## asyncCheck processClient(client)
  92. ##
  93. ## asyncCheck serve()
  94. ## runForever()
  95. ##
  96. import asyncdispatch
  97. import nativesockets
  98. import net
  99. import os
  100. export SOBool
  101. # TODO: Remove duplication introduced by PR #4683.
  102. const defineSsl = defined(ssl) or defined(nimdoc)
  103. when defineSsl:
  104. import openssl
  105. type
  106. # TODO: I would prefer to just do:
  107. # AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
  108. AsyncSocketDesc = object
  109. fd: SocketHandle
  110. closed: bool ## determines whether this socket has been closed
  111. case isBuffered: bool ## determines whether this socket is buffered.
  112. of true:
  113. buffer: array[0..BufferSize, char]
  114. currPos: int # current index in buffer
  115. bufLen: int # current length of buffer
  116. of false: nil
  117. case isSsl: bool
  118. of true:
  119. when defineSsl:
  120. sslHandle: SslPtr
  121. sslContext: SslContext
  122. bioIn: BIO
  123. bioOut: BIO
  124. of false: nil
  125. domain: Domain
  126. sockType: SockType
  127. protocol: Protocol
  128. AsyncSocket* = ref AsyncSocketDesc
  129. {.deprecated: [PAsyncSocket: AsyncSocket].}
  130. proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
  131. sockType: SockType = SOCK_STREAM,
  132. protocol: Protocol = IPPROTO_TCP, buffered = true): AsyncSocket =
  133. ## Creates a new ``AsyncSocket`` based on the supplied params.
  134. assert fd != osInvalidSocket.AsyncFD
  135. new(result)
  136. result.fd = fd.SocketHandle
  137. result.isBuffered = buffered
  138. result.domain = domain
  139. result.sockType = sockType
  140. result.protocol = protocol
  141. if buffered:
  142. result.currPos = 0
  143. proc newAsyncSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM,
  144. protocol: Protocol = IPPROTO_TCP, buffered = true): AsyncSocket =
  145. ## Creates a new asynchronous socket.
  146. ##
  147. ## This procedure will also create a brand new file descriptor for
  148. ## this socket.
  149. result = newAsyncSocket(newAsyncNativeSocket(domain, sockType, protocol),
  150. domain, sockType, protocol, buffered)
  151. proc newAsyncSocket*(domain, sockType, protocol: cint,
  152. buffered = true): AsyncSocket =
  153. ## Creates a new asynchronous socket.
  154. ##
  155. ## This procedure will also create a brand new file descriptor for
  156. ## this socket.
  157. result = newAsyncSocket(newAsyncNativeSocket(domain, sockType, protocol),
  158. Domain(domain), SockType(sockType),
  159. Protocol(protocol), buffered)
  160. when defineSsl:
  161. proc getSslError(handle: SslPtr, err: cint): cint =
  162. assert err < 0
  163. var ret = SSLGetError(handle, err.cint)
  164. case ret
  165. of SSL_ERROR_ZERO_RETURN:
  166. raiseSSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
  167. of SSL_ERROR_WANT_CONNECT, SSL_ERROR_WANT_ACCEPT:
  168. return ret
  169. of SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ:
  170. return ret
  171. of SSL_ERROR_WANT_X509_LOOKUP:
  172. raiseSSLError("Function for x509 lookup has been called.")
  173. of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
  174. raiseSSLError()
  175. else: raiseSSLError("Unknown Error")
  176. proc sendPendingSslData(socket: AsyncSocket,
  177. flags: set[SocketFlag]) {.async.} =
  178. let len = bioCtrlPending(socket.bioOut)
  179. if len > 0:
  180. var data = newStringOfCap(len)
  181. let read = bioRead(socket.bioOut, addr data[0], len)
  182. assert read != 0
  183. if read < 0:
  184. raiseSslError()
  185. data.setLen(read)
  186. await socket.fd.AsyncFd.send(data, flags)
  187. proc appeaseSsl(socket: AsyncSocket, flags: set[SocketFlag],
  188. sslError: cint): Future[bool] {.async.} =
  189. ## Returns ``true`` if ``socket`` is still connected, otherwise ``false``.
  190. result = true
  191. case sslError
  192. of SSL_ERROR_WANT_WRITE:
  193. await sendPendingSslData(socket, flags)
  194. of SSL_ERROR_WANT_READ:
  195. var data = await recv(socket.fd.AsyncFD, BufferSize, flags)
  196. let length = len(data)
  197. if length > 0:
  198. let ret = bioWrite(socket.bioIn, addr data[0], data.len.cint)
  199. if ret < 0:
  200. raiseSSLError()
  201. elif length == 0:
  202. # connection not properly closed by remote side or connection dropped
  203. SSL_set_shutdown(socket.sslHandle, SSL_RECEIVED_SHUTDOWN)
  204. result = false
  205. else:
  206. raiseSSLError("Cannot appease SSL.")
  207. template sslLoop(socket: AsyncSocket, flags: set[SocketFlag],
  208. op: untyped) =
  209. var opResult {.inject.} = -1.cint
  210. while opResult < 0:
  211. # Call the desired operation.
  212. opResult = op
  213. # Bit hackish here.
  214. # TODO: Introduce an async template transformation pragma?
  215. # Send any remaining pending SSL data.
  216. yield sendPendingSslData(socket, flags)
  217. # If the operation failed, try to see if SSL has some data to read
  218. # or write.
  219. if opResult < 0:
  220. let err = getSslError(socket.sslHandle, opResult.cint)
  221. let fut = appeaseSsl(socket, flags, err.cint)
  222. yield fut
  223. if not fut.read():
  224. # Socket disconnected.
  225. if SocketFlag.SafeDisconn in flags:
  226. break
  227. else:
  228. raiseSSLError("Socket has been disconnected")
  229. proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
  230. buffered = true): Future[AsyncSocket] {.async.} =
  231. ## Establishes connection to the specified ``address``:``port`` pair via the
  232. ## specified protocol. The procedure iterates through possible
  233. ## resolutions of the ``address`` until it succeeds, meaning that it
  234. ## seamlessly works with both IPv4 and IPv6.
  235. ## Returns AsyncSocket ready to send or receive data.
  236. let asyncFd = await asyncdispatch.dial(address, port, protocol)
  237. let sockType = protocol.toSockType()
  238. let domain = getSockDomain(asyncFd.SocketHandle)
  239. result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)
  240. proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
  241. ## Connects ``socket`` to server at ``address:port``.
  242. ##
  243. ## Returns a ``Future`` which will complete when the connection succeeds
  244. ## or an error occurs.
  245. await connect(socket.fd.AsyncFD, address, port, socket.domain)
  246. if socket.isSsl:
  247. when defineSsl:
  248. if not isIpAddress(address):
  249. # Set the SNI address for this connection. This call can fail if
  250. # we're not using TLSv1+.
  251. discard SSL_set_tlsext_host_name(socket.sslHandle, address)
  252. let flags = {SocketFlag.SafeDisconn}
  253. sslSetConnectState(socket.sslHandle)
  254. sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
  255. template readInto(buf: pointer, size: int, socket: AsyncSocket,
  256. flags: set[SocketFlag]): int =
  257. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that
  258. ## this is a template and not a proc.
  259. var res = 0
  260. if socket.isSsl:
  261. when defineSsl:
  262. # SSL mode.
  263. sslLoop(socket, flags,
  264. sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
  265. res = opResult
  266. else:
  267. var recvIntoFut = asyncdispatch.recvInto(socket.fd.AsyncFD, buf, size, flags)
  268. yield recvIntoFut
  269. # Not in SSL mode.
  270. res = recvIntoFut.read()
  271. res
  272. template readIntoBuf(socket: AsyncSocket,
  273. flags: set[SocketFlag]): int =
  274. var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
  275. socket.currPos = 0
  276. socket.bufLen = size
  277. size
  278. proc recvInto*(socket: AsyncSocket, buf: pointer, size: int,
  279. flags = {SocketFlag.SafeDisconn}): Future[int] {.async.} =
  280. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``.
  281. ##
  282. ## For buffered sockets this function will attempt to read all the requested
  283. ## data. It will read this data in ``BufferSize`` chunks.
  284. ##
  285. ## For unbuffered sockets this function makes no effort to read
  286. ## all the data requested. It will return as much data as the operating system
  287. ## gives it.
  288. ##
  289. ## If socket is disconnected during the
  290. ## recv operation then the future may complete with only a part of the
  291. ## requested data.
  292. ##
  293. ## If socket is disconnected and no data is available
  294. ## to be read then the future will complete with a value of ``0``.
  295. if socket.isBuffered:
  296. let originalBufPos = socket.currPos
  297. if socket.bufLen == 0:
  298. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  299. if res == 0:
  300. return 0
  301. var read = 0
  302. var cbuf = cast[cstring](buf)
  303. while read < size:
  304. if socket.currPos >= socket.bufLen:
  305. if SocketFlag.Peek in flags:
  306. # We don't want to get another buffer if we're peeking.
  307. break
  308. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  309. if res == 0:
  310. break
  311. let chunk = min(socket.bufLen-socket.currPos, size-read)
  312. copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
  313. read.inc(chunk)
  314. socket.currPos.inc(chunk)
  315. if SocketFlag.Peek in flags:
  316. # Restore old buffer cursor position.
  317. socket.currPos = originalBufPos
  318. result = read
  319. else:
  320. result = readInto(buf, size, socket, flags)
  321. proc recv*(socket: AsyncSocket, size: int,
  322. flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
  323. ## Reads **up to** ``size`` bytes from ``socket``.
  324. ##
  325. ## For buffered sockets this function will attempt to read all the requested
  326. ## data. It will read this data in ``BufferSize`` chunks.
  327. ##
  328. ## For unbuffered sockets this function makes no effort to read
  329. ## all the data requested. It will return as much data as the operating system
  330. ## gives it.
  331. ##
  332. ## If socket is disconnected during the
  333. ## recv operation then the future may complete with only a part of the
  334. ## requested data.
  335. ##
  336. ## If socket is disconnected and no data is available
  337. ## to be read then the future will complete with a value of ``""``.
  338. if socket.isBuffered:
  339. result = newString(size)
  340. shallow(result)
  341. let originalBufPos = socket.currPos
  342. if socket.bufLen == 0:
  343. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  344. if res == 0:
  345. result.setLen(0)
  346. return
  347. var read = 0
  348. while read < size:
  349. if socket.currPos >= socket.bufLen:
  350. if SocketFlag.Peek in flags:
  351. # We don't want to get another buffer if we're peeking.
  352. break
  353. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  354. if res == 0:
  355. break
  356. let chunk = min(socket.bufLen-socket.currPos, size-read)
  357. copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
  358. read.inc(chunk)
  359. socket.currPos.inc(chunk)
  360. if SocketFlag.Peek in flags:
  361. # Restore old buffer cursor position.
  362. socket.currPos = originalBufPos
  363. result.setLen(read)
  364. else:
  365. result = newString(size)
  366. let read = readInto(addr result[0], size, socket, flags)
  367. result.setLen(read)
  368. proc send*(socket: AsyncSocket, buf: pointer, size: int,
  369. flags = {SocketFlag.SafeDisconn}) {.async.} =
  370. ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
  371. ## data has been sent.
  372. assert socket != nil
  373. if socket.isSsl:
  374. when defineSsl:
  375. sslLoop(socket, flags,
  376. sslWrite(socket.sslHandle, cast[cstring](buf), size.cint))
  377. await sendPendingSslData(socket, flags)
  378. else:
  379. await send(socket.fd.AsyncFD, buf, size, flags)
  380. proc send*(socket: AsyncSocket, data: string,
  381. flags = {SocketFlag.SafeDisconn}) {.async.} =
  382. ## Sends ``data`` to ``socket``. The returned future will complete once all
  383. ## data has been sent.
  384. assert socket != nil
  385. if socket.isSsl:
  386. when defineSsl:
  387. var copy = data
  388. sslLoop(socket, flags,
  389. sslWrite(socket.sslHandle, addr copy[0], copy.len.cint))
  390. await sendPendingSslData(socket, flags)
  391. else:
  392. await send(socket.fd.AsyncFD, data, flags)
  393. proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn}):
  394. Future[tuple[address: string, client: AsyncSocket]] =
  395. ## Accepts a new connection. Returns a future containing the client socket
  396. ## corresponding to that connection and the remote address of the client.
  397. ## The future will complete when the connection is successfully accepted.
  398. var retFuture = newFuture[tuple[address: string, client: AsyncSocket]]("asyncnet.acceptAddr")
  399. var fut = acceptAddr(socket.fd.AsyncFD, flags)
  400. fut.callback =
  401. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  402. assert future.finished
  403. if future.failed:
  404. retFuture.fail(future.readError)
  405. else:
  406. let resultTup = (future.read.address,
  407. newAsyncSocket(future.read.client, socket.domain,
  408. socket.sockType, socket.protocol, socket.isBuffered))
  409. retFuture.complete(resultTup)
  410. return retFuture
  411. proc accept*(socket: AsyncSocket,
  412. flags = {SocketFlag.SafeDisconn}): Future[AsyncSocket] =
  413. ## Accepts a new connection. Returns a future containing the client socket
  414. ## corresponding to that connection.
  415. ## The future will complete when the connection is successfully accepted.
  416. var retFut = newFuture[AsyncSocket]("asyncnet.accept")
  417. var fut = acceptAddr(socket, flags)
  418. fut.callback =
  419. proc (future: Future[tuple[address: string, client: AsyncSocket]]) =
  420. assert future.finished
  421. if future.failed:
  422. retFut.fail(future.readError)
  423. else:
  424. retFut.complete(future.read.client)
  425. return retFut
  426. proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
  427. flags = {SocketFlag.SafeDisconn}, maxLength = MaxLineLength) {.async.} =
  428. ## Reads a line of data from ``socket`` into ``resString``.
  429. ##
  430. ## If a full line is read ``\r\L`` is not
  431. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  432. ## will be set to it.
  433. ##
  434. ## If the socket is disconnected, ``line`` will be set to ``""``.
  435. ##
  436. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  437. ## is read) then line will be set to ``""``.
  438. ## The partial line **will be lost**.
  439. ##
  440. ## The ``maxLength`` parameter determines the maximum amount of characters
  441. ## that can be read. ``resString`` will be truncated after that.
  442. ##
  443. ## **Warning**: The ``Peek`` flag is not yet implemented.
  444. ##
  445. ## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the
  446. ## protocol uses ``\r\L`` to delimit a new line.
  447. assert SocketFlag.Peek notin flags ## TODO:
  448. assert(not resString.mget.isNil(),
  449. "String inside resString future needs to be initialised")
  450. result = newFuture[void]("asyncnet.recvLineInto")
  451. # TODO: Make the async transformation check for FutureVar params and complete
  452. # them when the result future is completed.
  453. # Can we replace the result future with the FutureVar?
  454. template addNLIfEmpty(): untyped =
  455. if resString.mget.len == 0:
  456. resString.mget.add("\c\L")
  457. if socket.isBuffered:
  458. if socket.bufLen == 0:
  459. let res = socket.readIntoBuf(flags)
  460. if res == 0:
  461. resString.complete()
  462. return
  463. var lastR = false
  464. while true:
  465. if socket.currPos >= socket.bufLen:
  466. let res = socket.readIntoBuf(flags)
  467. if res == 0:
  468. resString.mget.setLen(0)
  469. resString.complete()
  470. return
  471. case socket.buffer[socket.currPos]
  472. of '\r':
  473. lastR = true
  474. addNLIfEmpty()
  475. of '\L':
  476. addNLIfEmpty()
  477. socket.currPos.inc()
  478. resString.complete()
  479. return
  480. else:
  481. if lastR:
  482. socket.currPos.inc()
  483. resString.complete()
  484. return
  485. else:
  486. resString.mget.add socket.buffer[socket.currPos]
  487. socket.currPos.inc()
  488. # Verify that this isn't a DOS attack: #3847.
  489. if resString.mget.len > maxLength: break
  490. else:
  491. var c = ""
  492. while true:
  493. c = await recv(socket, 1, flags)
  494. if c.len == 0:
  495. resString.mget.setLen(0)
  496. resString.complete()
  497. return
  498. if c == "\r":
  499. c = await recv(socket, 1, flags) # Skip \L
  500. assert c == "\L"
  501. addNLIfEmpty()
  502. resString.complete()
  503. return
  504. elif c == "\L":
  505. addNLIfEmpty()
  506. resString.complete()
  507. return
  508. resString.mget.add c
  509. # Verify that this isn't a DOS attack: #3847.
  510. if resString.mget.len > maxLength: break
  511. resString.complete()
  512. proc recvLine*(socket: AsyncSocket,
  513. flags = {SocketFlag.SafeDisconn},
  514. maxLength = MaxLineLength): Future[string] {.async.} =
  515. ## Reads a line of data from ``socket``. Returned future will complete once
  516. ## a full line is read or an error occurs.
  517. ##
  518. ## If a full line is read ``\r\L`` is not
  519. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  520. ## will be set to it.
  521. ##
  522. ## If the socket is disconnected, ``line`` will be set to ``""``.
  523. ##
  524. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  525. ## is read) then line will be set to ``""``.
  526. ## The partial line **will be lost**.
  527. ##
  528. ## The ``maxLength`` parameter determines the maximum amount of characters
  529. ## that can be read. The result is truncated after that.
  530. ##
  531. ## **Warning**: The ``Peek`` flag is not yet implemented.
  532. ##
  533. ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol
  534. ## uses ``\r\L`` to delimit a new line.
  535. assert SocketFlag.Peek notin flags ## TODO:
  536. # TODO: Optimise this
  537. var resString = newFutureVar[string]("asyncnet.recvLine")
  538. resString.mget() = ""
  539. await socket.recvLineInto(resString, flags, maxLength)
  540. result = resString.mget()
  541. proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} =
  542. ## Marks ``socket`` as accepting connections.
  543. ## ``Backlog`` specifies the maximum length of the
  544. ## queue of pending connections.
  545. ##
  546. ## Raises an EOS error upon failure.
  547. if listen(socket.fd, backlog) < 0'i32: raiseOSError(osLastError())
  548. proc bindAddr*(socket: AsyncSocket, port = Port(0), address = "") {.
  549. tags: [ReadIOEffect].} =
  550. ## Binds ``address``:``port`` to the socket.
  551. ##
  552. ## If ``address`` is "" then ADDR_ANY will be bound.
  553. var realaddr = address
  554. if realaddr == "":
  555. case socket.domain
  556. of AF_INET6: realaddr = "::"
  557. of AF_INET: realaddr = "0.0.0.0"
  558. else:
  559. raise newException(ValueError,
  560. "Unknown socket address family and no address specified to bindAddr")
  561. var aiList = getAddrInfo(realaddr, port, socket.domain)
  562. if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32:
  563. freeAddrInfo(aiList)
  564. raiseOSError(osLastError())
  565. freeAddrInfo(aiList)
  566. proc close*(socket: AsyncSocket) =
  567. ## Closes the socket.
  568. defer:
  569. socket.fd.AsyncFD.closeSocket()
  570. when defineSsl:
  571. if socket.isSSL:
  572. let res = SslShutdown(socket.sslHandle)
  573. SSLFree(socket.sslHandle)
  574. if res == 0:
  575. discard
  576. elif res != 1:
  577. raiseSslError()
  578. socket.closed = true # TODO: Add extra debugging checks for this.
  579. when defineSsl:
  580. proc wrapSocket*(ctx: SslContext, socket: AsyncSocket) =
  581. ## Wraps a socket in an SSL context. This function effectively turns
  582. ## ``socket`` into an SSL socket.
  583. ##
  584. ## **Disclaimer**: This code is not well tested, may be very unsafe and
  585. ## prone to security vulnerabilities.
  586. socket.isSsl = true
  587. socket.sslContext = ctx
  588. socket.sslHandle = SSLNew(socket.sslContext.context)
  589. if socket.sslHandle == nil:
  590. raiseSslError()
  591. socket.bioIn = bioNew(bio_s_mem())
  592. socket.bioOut = bioNew(bio_s_mem())
  593. sslSetBio(socket.sslHandle, socket.bioIn, socket.bioOut)
  594. proc wrapConnectedSocket*(ctx: SslContext, socket: AsyncSocket,
  595. handshake: SslHandshakeType,
  596. hostname: string = nil) =
  597. ## Wraps a connected socket in an SSL context. This function effectively
  598. ## turns ``socket`` into an SSL socket.
  599. ## ``hostname`` should be specified so that the client knows which hostname
  600. ## the server certificate should be validated against.
  601. ##
  602. ## This should be called on a connected socket, and will perform
  603. ## an SSL handshake immediately.
  604. ##
  605. ## **Disclaimer**: This code is not well tested, may be very unsafe and
  606. ## prone to security vulnerabilities.
  607. wrapSocket(ctx, socket)
  608. case handshake
  609. of handshakeAsClient:
  610. if not hostname.isNil and not isIpAddress(hostname):
  611. # Set the SNI address for this connection. This call can fail if
  612. # we're not using TLSv1+.
  613. discard SSL_set_tlsext_host_name(socket.sslHandle, hostname)
  614. sslSetConnectState(socket.sslHandle)
  615. of handshakeAsServer:
  616. sslSetAcceptState(socket.sslHandle)
  617. proc getSockOpt*(socket: AsyncSocket, opt: SOBool, level = SOL_SOCKET): bool {.
  618. tags: [ReadIOEffect].} =
  619. ## Retrieves option ``opt`` as a boolean value.
  620. var res = getSockOptInt(socket.fd, cint(level), toCInt(opt))
  621. result = res != 0
  622. proc setSockOpt*(socket: AsyncSocket, opt: SOBool, value: bool,
  623. level = SOL_SOCKET) {.tags: [WriteIOEffect].} =
  624. ## Sets option ``opt`` to a boolean value specified by ``value``.
  625. var valuei = cint(if value: 1 else: 0)
  626. setSockOptInt(socket.fd, cint(level), toCInt(opt), valuei)
  627. proc isSsl*(socket: AsyncSocket): bool =
  628. ## Determines whether ``socket`` is a SSL socket.
  629. socket.isSsl
  630. proc getFd*(socket: AsyncSocket): SocketHandle =
  631. ## Returns the socket's file descriptor.
  632. return socket.fd
  633. proc isClosed*(socket: AsyncSocket): bool =
  634. ## Determines whether the socket has been closed.
  635. return socket.closed
  636. when not defined(testing) and isMainModule:
  637. type
  638. TestCases = enum
  639. HighClient, LowClient, LowServer
  640. const test = HighClient
  641. when test == HighClient:
  642. proc main() {.async.} =
  643. var sock = newAsyncSocket()
  644. await sock.connect("irc.freenode.net", Port(6667))
  645. while true:
  646. let line = await sock.recvLine()
  647. if line == "":
  648. echo("Disconnected")
  649. break
  650. else:
  651. echo("Got line: ", line)
  652. asyncCheck main()
  653. elif test == LowClient:
  654. var sock = newAsyncSocket()
  655. var f = connect(sock, "irc.freenode.net", Port(6667))
  656. f.callback =
  657. proc (future: Future[void]) =
  658. echo("Connected in future!")
  659. for i in 0 .. 50:
  660. var recvF = recv(sock, 10)
  661. recvF.callback =
  662. proc (future: Future[string]) =
  663. echo("Read ", future.read.len, ": ", future.read.repr)
  664. elif test == LowServer:
  665. var sock = newAsyncSocket()
  666. sock.bindAddr(Port(6667))
  667. sock.listen()
  668. proc onAccept(future: Future[AsyncSocket]) =
  669. let client = future.read
  670. echo "Accepted ", client.fd.cint
  671. var t = send(client, "test\c\L")
  672. t.callback =
  673. proc (future: Future[void]) =
  674. echo("Send")
  675. client.close()
  676. var f = accept(sock)
  677. f.callback = onAccept
  678. var f = accept(sock)
  679. f.callback = onAccept
  680. runForever()