asyncnet.nim 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2017 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements a high-level asynchronous sockets API based on the
  10. ## asynchronous dispatcher defined in the ``asyncdispatch`` module.
  11. ##
  12. ## Asynchronous IO in Nim
  13. ## ----------------------
  14. ##
  15. ## Async IO in Nim consists of multiple layers (from highest to lowest):
  16. ##
  17. ## * ``asyncnet`` module
  18. ##
  19. ## * Async await
  20. ##
  21. ## * ``asyncdispatch`` module (event loop)
  22. ##
  23. ## * ``selectors`` module
  24. ##
  25. ## Each builds on top of the layers below it. The selectors module is an
  26. ## abstraction for the various system ``select()`` mechanisms such as epoll or
  27. ## kqueue. If you wish you can use it directly, and some people have done so
  28. ## `successfully <http://goran.krampe.se/2014/10/25/nim-socketserver/>`_.
  29. ## But you must be aware that on Windows it only supports
  30. ## ``select()``.
  31. ##
  32. ## The async dispatcher implements the proactor pattern and also has an
  33. ## implementation of IOCP. It implements the proactor pattern for other
  34. ## OS' via the selectors module. Futures are also implemented here, and
  35. ## indeed all the procedures return a future.
  36. ##
  37. ## The final layer is the async await transformation. This allows you to
  38. ## write asynchronous code in a synchronous style and works similar to
  39. ## C#'s await. The transformation works by converting any async procedures
  40. ## into an iterator.
  41. ##
  42. ## This is all single threaded, fully non-blocking and does give you a
  43. ## lot of control. In theory you should be able to work with any of these
  44. ## layers interchangeably (as long as you only care about non-Windows
  45. ## platforms).
  46. ##
  47. ## For most applications using ``asyncnet`` is the way to go as it builds
  48. ## over all the layers, providing some extra features such as buffering.
  49. ##
  50. ## SSL
  51. ## ----
  52. ##
  53. ## SSL can be enabled by compiling with the ``-d:ssl`` flag.
  54. ##
  55. ## You must create a new SSL context with the ``newContext`` function defined
  56. ## in the ``net`` module. You may then call ``wrapSocket`` on your socket using
  57. ## the newly created SSL context to get an SSL socket.
  58. ##
  59. ## Examples
  60. ## --------
  61. ##
  62. ## Chat server
  63. ## ^^^^^^^^^^^
  64. ##
  65. ## The following example demonstrates a simple chat server.
  66. ##
  67. ## .. code-block::nim
  68. ##
  69. ## import asyncnet, asyncdispatch
  70. ##
  71. ## var clients {.threadvar.}: seq[AsyncSocket]
  72. ##
  73. ## proc processClient(client: AsyncSocket) {.async.} =
  74. ## while true:
  75. ## let line = await client.recvLine()
  76. ## if line.len == 0: break
  77. ## for c in clients:
  78. ## await c.send(line & "\c\L")
  79. ##
  80. ## proc serve() {.async.} =
  81. ## clients = @[]
  82. ## var server = newAsyncSocket()
  83. ## server.setSockOpt(OptReuseAddr, true)
  84. ## server.bindAddr(Port(12345))
  85. ## server.listen()
  86. ##
  87. ## while true:
  88. ## let client = await server.accept()
  89. ## clients.add client
  90. ##
  91. ## asyncCheck processClient(client)
  92. ##
  93. ## asyncCheck serve()
  94. ## runForever()
  95. ##
  96. import asyncdispatch
  97. import nativesockets
  98. import net
  99. import os
  100. export SOBool
  101. # TODO: Remove duplication introduced by PR #4683.
  102. const defineSsl = defined(ssl) or defined(nimdoc)
  103. when defineSsl:
  104. import openssl
  105. type
  106. # TODO: I would prefer to just do:
  107. # AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
  108. AsyncSocketDesc = object
  109. fd: SocketHandle
  110. closed: bool ## determines whether this socket has been closed
  111. case isBuffered: bool ## determines whether this socket is buffered.
  112. of true:
  113. buffer: array[0..BufferSize, char]
  114. currPos: int # current index in buffer
  115. bufLen: int # current length of buffer
  116. of false: nil
  117. case isSsl: bool
  118. of true:
  119. when defineSsl:
  120. sslHandle: SslPtr
  121. sslContext: SslContext
  122. bioIn: BIO
  123. bioOut: BIO
  124. of false: nil
  125. domain: Domain
  126. sockType: SockType
  127. protocol: Protocol
  128. AsyncSocket* = ref AsyncSocketDesc
  129. proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
  130. sockType: SockType = SOCK_STREAM,
  131. protocol: Protocol = IPPROTO_TCP, buffered = true): AsyncSocket =
  132. ## Creates a new ``AsyncSocket`` based on the supplied params.
  133. ##
  134. ## The supplied ``fd``'s non-blocking state will be enabled implicitly.
  135. ##
  136. ## **Note**: This procedure will **NOT** register ``fd`` with the global
  137. ## async dispatcher. You need to do this manually. If you have used
  138. ## ``newAsyncNativeSocket`` to create ``fd`` then it's already registered.
  139. assert fd != osInvalidSocket.AsyncFD
  140. new(result)
  141. result.fd = fd.SocketHandle
  142. fd.SocketHandle.setBlocking(false)
  143. result.isBuffered = buffered
  144. result.domain = domain
  145. result.sockType = sockType
  146. result.protocol = protocol
  147. if buffered:
  148. result.currPos = 0
  149. proc newAsyncSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM,
  150. protocol: Protocol = IPPROTO_TCP, buffered = true): AsyncSocket =
  151. ## Creates a new asynchronous socket.
  152. ##
  153. ## This procedure will also create a brand new file descriptor for
  154. ## this socket.
  155. let fd = createAsyncNativeSocket(domain, sockType, protocol)
  156. if fd.SocketHandle == osInvalidSocket:
  157. raiseOSError(osLastError())
  158. result = newAsyncSocket(fd, domain, sockType, protocol, buffered)
  159. proc newAsyncSocket*(domain, sockType, protocol: cint,
  160. buffered = true): AsyncSocket =
  161. ## Creates a new asynchronous socket.
  162. ##
  163. ## This procedure will also create a brand new file descriptor for
  164. ## this socket.
  165. let fd = createAsyncNativeSocket(domain, sockType, protocol)
  166. if fd.SocketHandle == osInvalidSocket:
  167. raiseOSError(osLastError())
  168. result = newAsyncSocket(fd, Domain(domain), SockType(sockType),
  169. Protocol(protocol), buffered)
  170. when defineSsl:
  171. proc getSslError(handle: SslPtr, err: cint): cint =
  172. assert err < 0
  173. var ret = SSLGetError(handle, err.cint)
  174. case ret
  175. of SSL_ERROR_ZERO_RETURN:
  176. raiseSSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
  177. of SSL_ERROR_WANT_CONNECT, SSL_ERROR_WANT_ACCEPT:
  178. return ret
  179. of SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ:
  180. return ret
  181. of SSL_ERROR_WANT_X509_LOOKUP:
  182. raiseSSLError("Function for x509 lookup has been called.")
  183. of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
  184. raiseSSLError()
  185. else: raiseSSLError("Unknown Error")
  186. proc sendPendingSslData(socket: AsyncSocket,
  187. flags: set[SocketFlag]) {.async.} =
  188. let len = bioCtrlPending(socket.bioOut)
  189. if len > 0:
  190. var data = newString(len)
  191. let read = bioRead(socket.bioOut, addr data[0], len)
  192. assert read != 0
  193. if read < 0:
  194. raiseSslError()
  195. data.setLen(read)
  196. await socket.fd.AsyncFd.send(data, flags)
  197. proc appeaseSsl(socket: AsyncSocket, flags: set[SocketFlag],
  198. sslError: cint): Future[bool] {.async.} =
  199. ## Returns ``true`` if ``socket`` is still connected, otherwise ``false``.
  200. result = true
  201. case sslError
  202. of SSL_ERROR_WANT_WRITE:
  203. await sendPendingSslData(socket, flags)
  204. of SSL_ERROR_WANT_READ:
  205. var data = await recv(socket.fd.AsyncFD, BufferSize, flags)
  206. let length = len(data)
  207. if length > 0:
  208. let ret = bioWrite(socket.bioIn, addr data[0], data.len.cint)
  209. if ret < 0:
  210. raiseSSLError()
  211. elif length == 0:
  212. # connection not properly closed by remote side or connection dropped
  213. SSL_set_shutdown(socket.sslHandle, SSL_RECEIVED_SHUTDOWN)
  214. result = false
  215. else:
  216. raiseSSLError("Cannot appease SSL.")
  217. template sslLoop(socket: AsyncSocket, flags: set[SocketFlag],
  218. op: untyped) =
  219. var opResult {.inject.} = -1.cint
  220. while opResult < 0:
  221. # Call the desired operation.
  222. opResult = op
  223. # Bit hackish here.
  224. # TODO: Introduce an async template transformation pragma?
  225. # Send any remaining pending SSL data.
  226. yield sendPendingSslData(socket, flags)
  227. # If the operation failed, try to see if SSL has some data to read
  228. # or write.
  229. if opResult < 0:
  230. let err = getSslError(socket.sslHandle, opResult.cint)
  231. let fut = appeaseSsl(socket, flags, err.cint)
  232. yield fut
  233. if not fut.read():
  234. # Socket disconnected.
  235. if SocketFlag.SafeDisconn in flags:
  236. break
  237. else:
  238. raiseSSLError("Socket has been disconnected")
  239. proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
  240. buffered = true): Future[AsyncSocket] {.async.} =
  241. ## Establishes connection to the specified ``address``:``port`` pair via the
  242. ## specified protocol. The procedure iterates through possible
  243. ## resolutions of the ``address`` until it succeeds, meaning that it
  244. ## seamlessly works with both IPv4 and IPv6.
  245. ## Returns AsyncSocket ready to send or receive data.
  246. let asyncFd = await asyncdispatch.dial(address, port, protocol)
  247. let sockType = protocol.toSockType()
  248. let domain = getSockDomain(asyncFd.SocketHandle)
  249. result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)
  250. proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
  251. ## Connects ``socket`` to server at ``address:port``.
  252. ##
  253. ## Returns a ``Future`` which will complete when the connection succeeds
  254. ## or an error occurs.
  255. await connect(socket.fd.AsyncFD, address, port, socket.domain)
  256. if socket.isSsl:
  257. when defineSsl:
  258. if not isIpAddress(address):
  259. # Set the SNI address for this connection. This call can fail if
  260. # we're not using TLSv1+.
  261. discard SSL_set_tlsext_host_name(socket.sslHandle, address)
  262. let flags = {SocketFlag.SafeDisconn}
  263. sslSetConnectState(socket.sslHandle)
  264. sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
  265. template readInto(buf: pointer, size: int, socket: AsyncSocket,
  266. flags: set[SocketFlag]): int =
  267. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that
  268. ## this is a template and not a proc.
  269. assert(not socket.closed, "Cannot `recv` on a closed socket")
  270. var res = 0
  271. if socket.isSsl:
  272. when defineSsl:
  273. # SSL mode.
  274. sslLoop(socket, flags,
  275. sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
  276. res = opResult
  277. else:
  278. var recvIntoFut = asyncdispatch.recvInto(socket.fd.AsyncFD, buf, size, flags)
  279. yield recvIntoFut
  280. # Not in SSL mode.
  281. res = recvIntoFut.read()
  282. res
  283. template readIntoBuf(socket: AsyncSocket,
  284. flags: set[SocketFlag]): int =
  285. var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
  286. socket.currPos = 0
  287. socket.bufLen = size
  288. size
  289. proc recvInto*(socket: AsyncSocket, buf: pointer, size: int,
  290. flags = {SocketFlag.SafeDisconn}): Future[int] {.async.} =
  291. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``.
  292. ##
  293. ## For buffered sockets this function will attempt to read all the requested
  294. ## data. It will read this data in ``BufferSize`` chunks.
  295. ##
  296. ## For unbuffered sockets this function makes no effort to read
  297. ## all the data requested. It will return as much data as the operating system
  298. ## gives it.
  299. ##
  300. ## If socket is disconnected during the
  301. ## recv operation then the future may complete with only a part of the
  302. ## requested data.
  303. ##
  304. ## If socket is disconnected and no data is available
  305. ## to be read then the future will complete with a value of ``0``.
  306. if socket.isBuffered:
  307. let originalBufPos = socket.currPos
  308. if socket.bufLen == 0:
  309. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  310. if res == 0:
  311. return 0
  312. var read = 0
  313. var cbuf = cast[cstring](buf)
  314. while read < size:
  315. if socket.currPos >= socket.bufLen:
  316. if SocketFlag.Peek in flags:
  317. # We don't want to get another buffer if we're peeking.
  318. break
  319. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  320. if res == 0:
  321. break
  322. let chunk = min(socket.bufLen-socket.currPos, size-read)
  323. copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
  324. read.inc(chunk)
  325. socket.currPos.inc(chunk)
  326. if SocketFlag.Peek in flags:
  327. # Restore old buffer cursor position.
  328. socket.currPos = originalBufPos
  329. result = read
  330. else:
  331. result = readInto(buf, size, socket, flags)
  332. proc recv*(socket: AsyncSocket, size: int,
  333. flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
  334. ## Reads **up to** ``size`` bytes from ``socket``.
  335. ##
  336. ## For buffered sockets this function will attempt to read all the requested
  337. ## data. It will read this data in ``BufferSize`` chunks.
  338. ##
  339. ## For unbuffered sockets this function makes no effort to read
  340. ## all the data requested. It will return as much data as the operating system
  341. ## gives it.
  342. ##
  343. ## If socket is disconnected during the
  344. ## recv operation then the future may complete with only a part of the
  345. ## requested data.
  346. ##
  347. ## If socket is disconnected and no data is available
  348. ## to be read then the future will complete with a value of ``""``.
  349. if socket.isBuffered:
  350. result = newString(size)
  351. shallow(result)
  352. let originalBufPos = socket.currPos
  353. if socket.bufLen == 0:
  354. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  355. if res == 0:
  356. result.setLen(0)
  357. return
  358. var read = 0
  359. while read < size:
  360. if socket.currPos >= socket.bufLen:
  361. if SocketFlag.Peek in flags:
  362. # We don't want to get another buffer if we're peeking.
  363. break
  364. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  365. if res == 0:
  366. break
  367. let chunk = min(socket.bufLen-socket.currPos, size-read)
  368. copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
  369. read.inc(chunk)
  370. socket.currPos.inc(chunk)
  371. if SocketFlag.Peek in flags:
  372. # Restore old buffer cursor position.
  373. socket.currPos = originalBufPos
  374. result.setLen(read)
  375. else:
  376. result = newString(size)
  377. let read = readInto(addr result[0], size, socket, flags)
  378. result.setLen(read)
  379. proc send*(socket: AsyncSocket, buf: pointer, size: int,
  380. flags = {SocketFlag.SafeDisconn}) {.async.} =
  381. ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
  382. ## data has been sent.
  383. assert socket != nil
  384. assert(not socket.closed, "Cannot `send` on a closed socket")
  385. if socket.isSsl:
  386. when defineSsl:
  387. sslLoop(socket, flags,
  388. sslWrite(socket.sslHandle, cast[cstring](buf), size.cint))
  389. await sendPendingSslData(socket, flags)
  390. else:
  391. await send(socket.fd.AsyncFD, buf, size, flags)
  392. proc send*(socket: AsyncSocket, data: string,
  393. flags = {SocketFlag.SafeDisconn}) {.async.} =
  394. ## Sends ``data`` to ``socket``. The returned future will complete once all
  395. ## data has been sent.
  396. assert socket != nil
  397. if socket.isSsl:
  398. when defineSsl:
  399. var copy = data
  400. sslLoop(socket, flags,
  401. sslWrite(socket.sslHandle, addr copy[0], copy.len.cint))
  402. await sendPendingSslData(socket, flags)
  403. else:
  404. await send(socket.fd.AsyncFD, data, flags)
  405. proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn}):
  406. Future[tuple[address: string, client: AsyncSocket]] =
  407. ## Accepts a new connection. Returns a future containing the client socket
  408. ## corresponding to that connection and the remote address of the client.
  409. ## The future will complete when the connection is successfully accepted.
  410. var retFuture = newFuture[tuple[address: string, client: AsyncSocket]]("asyncnet.acceptAddr")
  411. var fut = acceptAddr(socket.fd.AsyncFD, flags)
  412. fut.callback =
  413. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  414. assert future.finished
  415. if future.failed:
  416. retFuture.fail(future.readError)
  417. else:
  418. let resultTup = (future.read.address,
  419. newAsyncSocket(future.read.client, socket.domain,
  420. socket.sockType, socket.protocol, socket.isBuffered))
  421. retFuture.complete(resultTup)
  422. return retFuture
  423. proc accept*(socket: AsyncSocket,
  424. flags = {SocketFlag.SafeDisconn}): Future[AsyncSocket] =
  425. ## Accepts a new connection. Returns a future containing the client socket
  426. ## corresponding to that connection.
  427. ## The future will complete when the connection is successfully accepted.
  428. var retFut = newFuture[AsyncSocket]("asyncnet.accept")
  429. var fut = acceptAddr(socket, flags)
  430. fut.callback =
  431. proc (future: Future[tuple[address: string, client: AsyncSocket]]) =
  432. assert future.finished
  433. if future.failed:
  434. retFut.fail(future.readError)
  435. else:
  436. retFut.complete(future.read.client)
  437. return retFut
  438. proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
  439. flags = {SocketFlag.SafeDisconn}, maxLength = MaxLineLength) {.async.} =
  440. ## Reads a line of data from ``socket`` into ``resString``.
  441. ##
  442. ## If a full line is read ``\r\L`` is not
  443. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  444. ## will be set to it.
  445. ##
  446. ## If the socket is disconnected, ``line`` will be set to ``""``.
  447. ##
  448. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  449. ## is read) then line will be set to ``""``.
  450. ## The partial line **will be lost**.
  451. ##
  452. ## The ``maxLength`` parameter determines the maximum amount of characters
  453. ## that can be read. ``resString`` will be truncated after that.
  454. ##
  455. ## **Warning**: The ``Peek`` flag is not yet implemented.
  456. ##
  457. ## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the
  458. ## protocol uses ``\r\L`` to delimit a new line.
  459. assert SocketFlag.Peek notin flags ## TODO:
  460. result = newFuture[void]("asyncnet.recvLineInto")
  461. # TODO: Make the async transformation check for FutureVar params and complete
  462. # them when the result future is completed.
  463. # Can we replace the result future with the FutureVar?
  464. template addNLIfEmpty(): untyped =
  465. if resString.mget.len == 0:
  466. resString.mget.add("\c\L")
  467. if socket.isBuffered:
  468. if socket.bufLen == 0:
  469. let res = socket.readIntoBuf(flags)
  470. if res == 0:
  471. resString.complete()
  472. return
  473. var lastR = false
  474. while true:
  475. if socket.currPos >= socket.bufLen:
  476. let res = socket.readIntoBuf(flags)
  477. if res == 0:
  478. resString.mget.setLen(0)
  479. resString.complete()
  480. return
  481. case socket.buffer[socket.currPos]
  482. of '\r':
  483. lastR = true
  484. addNLIfEmpty()
  485. of '\L':
  486. addNLIfEmpty()
  487. socket.currPos.inc()
  488. resString.complete()
  489. return
  490. else:
  491. if lastR:
  492. socket.currPos.inc()
  493. resString.complete()
  494. return
  495. else:
  496. resString.mget.add socket.buffer[socket.currPos]
  497. socket.currPos.inc()
  498. # Verify that this isn't a DOS attack: #3847.
  499. if resString.mget.len > maxLength: break
  500. else:
  501. var c = ""
  502. while true:
  503. c = await recv(socket, 1, flags)
  504. if c.len == 0:
  505. resString.mget.setLen(0)
  506. resString.complete()
  507. return
  508. if c == "\r":
  509. c = await recv(socket, 1, flags) # Skip \L
  510. assert c == "\L"
  511. addNLIfEmpty()
  512. resString.complete()
  513. return
  514. elif c == "\L":
  515. addNLIfEmpty()
  516. resString.complete()
  517. return
  518. resString.mget.add c
  519. # Verify that this isn't a DOS attack: #3847.
  520. if resString.mget.len > maxLength: break
  521. resString.complete()
  522. proc recvLine*(socket: AsyncSocket,
  523. flags = {SocketFlag.SafeDisconn},
  524. maxLength = MaxLineLength): Future[string] {.async.} =
  525. ## Reads a line of data from ``socket``. Returned future will complete once
  526. ## a full line is read or an error occurs.
  527. ##
  528. ## If a full line is read ``\r\L`` is not
  529. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  530. ## will be set to it.
  531. ##
  532. ## If the socket is disconnected, ``line`` will be set to ``""``.
  533. ##
  534. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  535. ## is read) then line will be set to ``""``.
  536. ## The partial line **will be lost**.
  537. ##
  538. ## The ``maxLength`` parameter determines the maximum amount of characters
  539. ## that can be read. The result is truncated after that.
  540. ##
  541. ## **Warning**: The ``Peek`` flag is not yet implemented.
  542. ##
  543. ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol
  544. ## uses ``\r\L`` to delimit a new line.
  545. assert SocketFlag.Peek notin flags ## TODO:
  546. # TODO: Optimise this
  547. var resString = newFutureVar[string]("asyncnet.recvLine")
  548. resString.mget() = ""
  549. await socket.recvLineInto(resString, flags, maxLength)
  550. result = resString.mget()
  551. proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} =
  552. ## Marks ``socket`` as accepting connections.
  553. ## ``Backlog`` specifies the maximum length of the
  554. ## queue of pending connections.
  555. ##
  556. ## Raises an OSError error upon failure.
  557. if listen(socket.fd, backlog) < 0'i32: raiseOSError(osLastError())
  558. proc bindAddr*(socket: AsyncSocket, port = Port(0), address = "") {.
  559. tags: [ReadIOEffect].} =
  560. ## Binds ``address``:``port`` to the socket.
  561. ##
  562. ## If ``address`` is "" then ADDR_ANY will be bound.
  563. var realaddr = address
  564. if realaddr == "":
  565. case socket.domain
  566. of AF_INET6: realaddr = "::"
  567. of AF_INET: realaddr = "0.0.0.0"
  568. else:
  569. raise newException(ValueError,
  570. "Unknown socket address family and no address specified to bindAddr")
  571. var aiList = getAddrInfo(realaddr, port, socket.domain)
  572. if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32:
  573. freeAddrInfo(aiList)
  574. raiseOSError(osLastError())
  575. freeAddrInfo(aiList)
  576. proc close*(socket: AsyncSocket) =
  577. ## Closes the socket.
  578. defer:
  579. socket.fd.AsyncFD.closeSocket()
  580. when defineSsl:
  581. if socket.isSSL:
  582. let res = SslShutdown(socket.sslHandle)
  583. SSLFree(socket.sslHandle)
  584. if res == 0:
  585. discard
  586. elif res != 1:
  587. raiseSslError()
  588. socket.closed = true # TODO: Add extra debugging checks for this.
  589. when defineSsl:
  590. proc wrapSocket*(ctx: SslContext, socket: AsyncSocket) =
  591. ## Wraps a socket in an SSL context. This function effectively turns
  592. ## ``socket`` into an SSL socket.
  593. ##
  594. ## **Disclaimer**: This code is not well tested, may be very unsafe and
  595. ## prone to security vulnerabilities.
  596. socket.isSsl = true
  597. socket.sslContext = ctx
  598. socket.sslHandle = SSLNew(socket.sslContext.context)
  599. if socket.sslHandle == nil:
  600. raiseSslError()
  601. socket.bioIn = bioNew(bio_s_mem())
  602. socket.bioOut = bioNew(bio_s_mem())
  603. sslSetBio(socket.sslHandle, socket.bioIn, socket.bioOut)
  604. proc wrapConnectedSocket*(ctx: SslContext, socket: AsyncSocket,
  605. handshake: SslHandshakeType,
  606. hostname: string = "") =
  607. ## Wraps a connected socket in an SSL context. This function effectively
  608. ## turns ``socket`` into an SSL socket.
  609. ## ``hostname`` should be specified so that the client knows which hostname
  610. ## the server certificate should be validated against.
  611. ##
  612. ## This should be called on a connected socket, and will perform
  613. ## an SSL handshake immediately.
  614. ##
  615. ## **Disclaimer**: This code is not well tested, may be very unsafe and
  616. ## prone to security vulnerabilities.
  617. wrapSocket(ctx, socket)
  618. case handshake
  619. of handshakeAsClient:
  620. if hostname.len > 0 and not isIpAddress(hostname):
  621. # Set the SNI address for this connection. This call can fail if
  622. # we're not using TLSv1+.
  623. discard SSL_set_tlsext_host_name(socket.sslHandle, hostname)
  624. sslSetConnectState(socket.sslHandle)
  625. of handshakeAsServer:
  626. sslSetAcceptState(socket.sslHandle)
  627. proc getSockOpt*(socket: AsyncSocket, opt: SOBool, level = SOL_SOCKET): bool {.
  628. tags: [ReadIOEffect].} =
  629. ## Retrieves option ``opt`` as a boolean value.
  630. var res = getSockOptInt(socket.fd, cint(level), toCInt(opt))
  631. result = res != 0
  632. proc setSockOpt*(socket: AsyncSocket, opt: SOBool, value: bool,
  633. level = SOL_SOCKET) {.tags: [WriteIOEffect].} =
  634. ## Sets option ``opt`` to a boolean value specified by ``value``.
  635. var valuei = cint(if value: 1 else: 0)
  636. setSockOptInt(socket.fd, cint(level), toCInt(opt), valuei)
  637. proc isSsl*(socket: AsyncSocket): bool =
  638. ## Determines whether ``socket`` is a SSL socket.
  639. socket.isSsl
  640. proc getFd*(socket: AsyncSocket): SocketHandle =
  641. ## Returns the socket's file descriptor.
  642. return socket.fd
  643. proc isClosed*(socket: AsyncSocket): bool =
  644. ## Determines whether the socket has been closed.
  645. return socket.closed
  646. when not defined(testing) and isMainModule:
  647. type
  648. TestCases = enum
  649. HighClient, LowClient, LowServer
  650. const test = HighClient
  651. when test == HighClient:
  652. proc main() {.async.} =
  653. var sock = newAsyncSocket()
  654. await sock.connect("irc.freenode.net", Port(6667))
  655. while true:
  656. let line = await sock.recvLine()
  657. if line == "":
  658. echo("Disconnected")
  659. break
  660. else:
  661. echo("Got line: ", line)
  662. asyncCheck main()
  663. elif test == LowClient:
  664. var sock = newAsyncSocket()
  665. var f = connect(sock, "irc.freenode.net", Port(6667))
  666. f.callback =
  667. proc (future: Future[void]) =
  668. echo("Connected in future!")
  669. for i in 0 .. 50:
  670. var recvF = recv(sock, 10)
  671. recvF.callback =
  672. proc (future: Future[string]) =
  673. echo("Read ", future.read.len, ": ", future.read.repr)
  674. elif test == LowServer:
  675. var sock = newAsyncSocket()
  676. sock.bindAddr(Port(6667))
  677. sock.listen()
  678. proc onAccept(future: Future[AsyncSocket]) =
  679. let client = future.read
  680. echo "Accepted ", client.fd.cint
  681. var t = send(client, "test\c\L")
  682. t.callback =
  683. proc (future: Future[void]) =
  684. echo("Send")
  685. client.close()
  686. var f = accept(sock)
  687. f.callback = onAccept
  688. var f = accept(sock)
  689. f.callback = onAccept
  690. runForever()