asyncnet.nim 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2017 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements a high-level asynchronous sockets API based on the
  10. ## asynchronous dispatcher defined in the `asyncdispatch` module.
  11. ##
  12. ## Asynchronous IO in Nim
  13. ## ======================
  14. ##
  15. ## Async IO in Nim consists of multiple layers (from highest to lowest):
  16. ##
  17. ## * `asyncnet` module
  18. ##
  19. ## * Async await
  20. ##
  21. ## * `asyncdispatch` module (event loop)
  22. ##
  23. ## * `selectors` module
  24. ##
  25. ## Each builds on top of the layers below it. The selectors module is an
  26. ## abstraction for the various system `select()` mechanisms such as epoll or
  27. ## kqueue. If you wish you can use it directly, and some people have done so
  28. ## `successfully <http://goran.krampe.se/2014/10/25/nim-socketserver/>`_.
  29. ## But you must be aware that on Windows it only supports
  30. ## `select()`.
  31. ##
  32. ## The async dispatcher implements the proactor pattern and also has an
  33. ## implementation of IOCP. It implements the proactor pattern for other
  34. ## OS' via the selectors module. Futures are also implemented here, and
  35. ## indeed all the procedures return a future.
  36. ##
  37. ## The final layer is the async await transformation. This allows you to
  38. ## write asynchronous code in a synchronous style and works similar to
  39. ## C#'s await. The transformation works by converting any async procedures
  40. ## into an iterator.
  41. ##
  42. ## This is all single threaded, fully non-blocking and does give you a
  43. ## lot of control. In theory you should be able to work with any of these
  44. ## layers interchangeably (as long as you only care about non-Windows
  45. ## platforms).
  46. ##
  47. ## For most applications using `asyncnet` is the way to go as it builds
  48. ## over all the layers, providing some extra features such as buffering.
  49. ##
  50. ## SSL
  51. ## ===
  52. ##
  53. ## SSL can be enabled by compiling with the `-d:ssl` flag.
  54. ##
  55. ## You must create a new SSL context with the `newContext` function defined
  56. ## in the `net` module. You may then call `wrapSocket` on your socket using
  57. ## the newly created SSL context to get an SSL socket.
  58. ##
  59. ## Examples
  60. ## ========
  61. ##
  62. ## Chat server
  63. ## -----------
  64. ##
  65. ## The following example demonstrates a simple chat server.
  66. ##
  67. ## ```Nim
  68. ## import std/[asyncnet, asyncdispatch]
  69. ##
  70. ## var clients {.threadvar.}: seq[AsyncSocket]
  71. ##
  72. ## proc processClient(client: AsyncSocket) {.async.} =
  73. ## while true:
  74. ## let line = await client.recvLine()
  75. ## if line.len == 0: break
  76. ## for c in clients:
  77. ## await c.send(line & "\c\L")
  78. ##
  79. ## proc serve() {.async.} =
  80. ## clients = @[]
  81. ## var server = newAsyncSocket()
  82. ## server.setSockOpt(OptReuseAddr, true)
  83. ## server.bindAddr(Port(12345))
  84. ## server.listen()
  85. ##
  86. ## while true:
  87. ## let client = await server.accept()
  88. ## clients.add client
  89. ##
  90. ## asyncCheck processClient(client)
  91. ##
  92. ## asyncCheck serve()
  93. ## runForever()
  94. ## ```
  95. import std/private/since
  96. when defined(nimPreviewSlimSystem):
  97. import std/[assertions, syncio]
  98. import std/[asyncdispatch, nativesockets, net, os]
  99. export SOBool
  100. # TODO: Remove duplication introduced by PR #4683.
  101. const defineSsl = defined(ssl) or defined(nimdoc)
  102. const useNimNetLite = defined(nimNetLite) or defined(freertos) or defined(zephyr) or
  103. defined(nuttx)
  104. when defineSsl:
  105. import std/openssl
  106. type
  107. # TODO: I would prefer to just do:
  108. # AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
  109. AsyncSocketDesc = object
  110. fd: SocketHandle
  111. closed: bool ## determines whether this socket has been closed
  112. isBuffered: bool ## determines whether this socket is buffered.
  113. buffer: array[0..BufferSize, char]
  114. currPos: int # current index in buffer
  115. bufLen: int # current length of buffer
  116. isSsl: bool
  117. when defineSsl:
  118. sslHandle: SslPtr
  119. sslContext: SslContext
  120. bioIn: BIO
  121. bioOut: BIO
  122. sslNoShutdown: bool
  123. domain: Domain
  124. sockType: SockType
  125. protocol: Protocol
  126. AsyncSocket* = ref AsyncSocketDesc
  127. proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
  128. sockType: SockType = SOCK_STREAM,
  129. protocol: Protocol = IPPROTO_TCP,
  130. buffered = true,
  131. inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
  132. ## Creates a new `AsyncSocket` based on the supplied params.
  133. ##
  134. ## The supplied `fd`'s non-blocking state will be enabled implicitly.
  135. ##
  136. ## If `inheritable` is false (the default), the supplied `fd` will not
  137. ## be inheritable by child processes.
  138. ##
  139. ## **Note**: This procedure will **NOT** register `fd` with the global
  140. ## async dispatcher. You need to do this manually. If you have used
  141. ## `newAsyncNativeSocket` to create `fd` then it's already registered.
  142. assert fd != osInvalidSocket.AsyncFD
  143. new(result)
  144. result.fd = fd.SocketHandle
  145. fd.SocketHandle.setBlocking(false)
  146. if not fd.SocketHandle.setInheritable(inheritable):
  147. raiseOSError(osLastError())
  148. result.isBuffered = buffered
  149. result.domain = domain
  150. result.sockType = sockType
  151. result.protocol = protocol
  152. if buffered:
  153. result.currPos = 0
  154. proc newAsyncSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM,
  155. protocol: Protocol = IPPROTO_TCP, buffered = true,
  156. inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
  157. ## Creates a new asynchronous socket.
  158. ##
  159. ## This procedure will also create a brand new file descriptor for
  160. ## this socket.
  161. ##
  162. ## If `inheritable` is false (the default), the new file descriptor will not
  163. ## be inheritable by child processes.
  164. let fd = createAsyncNativeSocket(domain, sockType, protocol, inheritable)
  165. if fd.SocketHandle == osInvalidSocket:
  166. raiseOSError(osLastError())
  167. result = newAsyncSocket(fd, domain, sockType, protocol, buffered, inheritable)
  168. proc getLocalAddr*(socket: AsyncSocket): (string, Port) =
  169. ## Get the socket's local address and port number.
  170. ##
  171. ## This is high-level interface for `getsockname`:idx:.
  172. getLocalAddr(socket.fd, socket.domain)
  173. when not useNimNetLite:
  174. proc getPeerAddr*(socket: AsyncSocket): (string, Port) =
  175. ## Get the socket's peer address and port number.
  176. ##
  177. ## This is high-level interface for `getpeername`:idx:.
  178. getPeerAddr(socket.fd, socket.domain)
  179. proc newAsyncSocket*(domain, sockType, protocol: cint,
  180. buffered = true,
  181. inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
  182. ## Creates a new asynchronous socket.
  183. ##
  184. ## This procedure will also create a brand new file descriptor for
  185. ## this socket.
  186. ##
  187. ## If `inheritable` is false (the default), the new file descriptor will not
  188. ## be inheritable by child processes.
  189. let fd = createAsyncNativeSocket(domain, sockType, protocol, inheritable)
  190. if fd.SocketHandle == osInvalidSocket:
  191. raiseOSError(osLastError())
  192. result = newAsyncSocket(fd, Domain(domain), SockType(sockType),
  193. Protocol(protocol), buffered, inheritable)
  194. when defineSsl:
  195. proc getSslError(socket: AsyncSocket, err: cint): cint =
  196. assert socket.isSsl
  197. assert err < 0
  198. var ret = SSL_get_error(socket.sslHandle, err.cint)
  199. case ret
  200. of SSL_ERROR_ZERO_RETURN:
  201. raiseSSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
  202. of SSL_ERROR_WANT_CONNECT, SSL_ERROR_WANT_ACCEPT:
  203. return ret
  204. of SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ:
  205. return ret
  206. of SSL_ERROR_WANT_X509_LOOKUP:
  207. raiseSSLError("Function for x509 lookup has been called.")
  208. of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
  209. socket.sslNoShutdown = true
  210. raiseSSLError()
  211. else: raiseSSLError("Unknown Error")
  212. proc sendPendingSslData(socket: AsyncSocket,
  213. flags: set[SocketFlag]) {.async.} =
  214. let len = bioCtrlPending(socket.bioOut)
  215. if len > 0:
  216. var data = newString(len)
  217. let read = bioRead(socket.bioOut, cast[cstring](addr data[0]), len)
  218. assert read != 0
  219. if read < 0:
  220. raiseSSLError()
  221. data.setLen(read)
  222. await socket.fd.AsyncFD.send(data, flags)
  223. proc appeaseSsl(socket: AsyncSocket, flags: set[SocketFlag],
  224. sslError: cint): owned(Future[bool]) {.async.} =
  225. ## Returns `true` if `socket` is still connected, otherwise `false`.
  226. result = true
  227. case sslError
  228. of SSL_ERROR_WANT_WRITE:
  229. await sendPendingSslData(socket, flags)
  230. of SSL_ERROR_WANT_READ:
  231. var data = await recv(socket.fd.AsyncFD, BufferSize, flags)
  232. let length = len(data)
  233. if length > 0:
  234. let ret = bioWrite(socket.bioIn, cast[cstring](addr data[0]), length.cint)
  235. if ret < 0:
  236. raiseSSLError()
  237. elif length == 0:
  238. # connection not properly closed by remote side or connection dropped
  239. SSL_set_shutdown(socket.sslHandle, SSL_RECEIVED_SHUTDOWN)
  240. result = false
  241. else:
  242. raiseSSLError("Cannot appease SSL.")
  243. template sslLoop(socket: AsyncSocket, flags: set[SocketFlag],
  244. op: untyped) =
  245. var opResult {.inject.} = -1.cint
  246. while opResult < 0:
  247. ErrClearError()
  248. # Call the desired operation.
  249. opResult = op
  250. let err =
  251. if opResult < 0:
  252. getSslError(socket, opResult.cint)
  253. else:
  254. SSL_ERROR_NONE
  255. # Send any remaining pending SSL data.
  256. await sendPendingSslData(socket, flags)
  257. # If the operation failed, try to see if SSL has some data to read
  258. # or write.
  259. if opResult < 0:
  260. let fut = appeaseSsl(socket, flags, err.cint)
  261. yield fut
  262. if not fut.read():
  263. # Socket disconnected.
  264. if SocketFlag.SafeDisconn in flags:
  265. opResult = 0.cint
  266. break
  267. else:
  268. raiseSSLError("Socket has been disconnected")
  269. proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
  270. buffered = true): owned(Future[AsyncSocket]) {.async.} =
  271. ## Establishes connection to the specified `address`:`port` pair via the
  272. ## specified protocol. The procedure iterates through possible
  273. ## resolutions of the `address` until it succeeds, meaning that it
  274. ## seamlessly works with both IPv4 and IPv6.
  275. ## Returns AsyncSocket ready to send or receive data.
  276. let asyncFd = await asyncdispatch.dial(address, port, protocol)
  277. let sockType = protocol.toSockType()
  278. let domain = getSockDomain(asyncFd.SocketHandle)
  279. result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)
  280. proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
  281. ## Connects `socket` to server at `address:port`.
  282. ##
  283. ## Returns a `Future` which will complete when the connection succeeds
  284. ## or an error occurs.
  285. await connect(socket.fd.AsyncFD, address, port, socket.domain)
  286. if socket.isSsl:
  287. when defineSsl:
  288. if not isIpAddress(address):
  289. # Set the SNI address for this connection. This call can fail if
  290. # we're not using TLSv1+.
  291. discard SSL_set_tlsext_host_name(socket.sslHandle, address)
  292. let flags = {SocketFlag.SafeDisconn}
  293. sslSetConnectState(socket.sslHandle)
  294. sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
  295. template readInto(buf: pointer, size: int, socket: AsyncSocket,
  296. flags: set[SocketFlag]): int =
  297. ## Reads **up to** `size` bytes from `socket` into `buf`. Note that
  298. ## this is a template and not a proc.
  299. assert(not socket.closed, "Cannot `recv` on a closed socket")
  300. var res = 0
  301. if socket.isSsl:
  302. when defineSsl:
  303. # SSL mode.
  304. sslLoop(socket, flags,
  305. sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
  306. res = opResult
  307. else:
  308. # Not in SSL mode.
  309. res = await asyncdispatch.recvInto(socket.fd.AsyncFD, buf, size, flags)
  310. res
  311. template readIntoBuf(socket: AsyncSocket,
  312. flags: set[SocketFlag]): int =
  313. var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
  314. socket.currPos = 0
  315. socket.bufLen = size
  316. size
  317. proc recvInto*(socket: AsyncSocket, buf: pointer, size: int,
  318. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) {.async.} =
  319. ## Reads **up to** `size` bytes from `socket` into `buf`.
  320. ##
  321. ## For buffered sockets this function will attempt to read all the requested
  322. ## data. It will read this data in `BufferSize` chunks.
  323. ##
  324. ## For unbuffered sockets this function makes no effort to read
  325. ## all the data requested. It will return as much data as the operating system
  326. ## gives it.
  327. ##
  328. ## If socket is disconnected during the
  329. ## recv operation then the future may complete with only a part of the
  330. ## requested data.
  331. ##
  332. ## If socket is disconnected and no data is available
  333. ## to be read then the future will complete with a value of `0`.
  334. if socket.isBuffered:
  335. let originalBufPos = socket.currPos
  336. if socket.bufLen == 0:
  337. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  338. if res == 0:
  339. return 0
  340. var read = 0
  341. var cbuf = cast[cstring](buf)
  342. while read < size:
  343. if socket.currPos >= socket.bufLen:
  344. if SocketFlag.Peek in flags:
  345. # We don't want to get another buffer if we're peeking.
  346. break
  347. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  348. if res == 0:
  349. break
  350. let chunk = min(socket.bufLen-socket.currPos, size-read)
  351. copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
  352. read.inc(chunk)
  353. socket.currPos.inc(chunk)
  354. if SocketFlag.Peek in flags:
  355. # Restore old buffer cursor position.
  356. socket.currPos = originalBufPos
  357. result = read
  358. else:
  359. result = readInto(buf, size, socket, flags)
  360. proc recv*(socket: AsyncSocket, size: int,
  361. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) {.async.} =
  362. ## Reads **up to** `size` bytes from `socket`.
  363. ##
  364. ## For buffered sockets this function will attempt to read all the requested
  365. ## data. It will read this data in `BufferSize` chunks.
  366. ##
  367. ## For unbuffered sockets this function makes no effort to read
  368. ## all the data requested. It will return as much data as the operating system
  369. ## gives it.
  370. ##
  371. ## If socket is disconnected during the
  372. ## recv operation then the future may complete with only a part of the
  373. ## requested data.
  374. ##
  375. ## If socket is disconnected and no data is available
  376. ## to be read then the future will complete with a value of `""`.
  377. if socket.isBuffered:
  378. result = newString(size)
  379. when not defined(nimSeqsV2):
  380. shallow(result)
  381. let originalBufPos = socket.currPos
  382. if socket.bufLen == 0:
  383. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  384. if res == 0:
  385. result.setLen(0)
  386. return
  387. var read = 0
  388. while read < size:
  389. if socket.currPos >= socket.bufLen:
  390. if SocketFlag.Peek in flags:
  391. # We don't want to get another buffer if we're peeking.
  392. break
  393. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  394. if res == 0:
  395. break
  396. let chunk = min(socket.bufLen-socket.currPos, size-read)
  397. copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
  398. read.inc(chunk)
  399. socket.currPos.inc(chunk)
  400. if SocketFlag.Peek in flags:
  401. # Restore old buffer cursor position.
  402. socket.currPos = originalBufPos
  403. result.setLen(read)
  404. else:
  405. result = newString(size)
  406. let read = readInto(addr result[0], size, socket, flags)
  407. result.setLen(read)
  408. proc send*(socket: AsyncSocket, buf: pointer, size: int,
  409. flags = {SocketFlag.SafeDisconn}) {.async.} =
  410. ## Sends `size` bytes from `buf` to `socket`. The returned future will complete once all
  411. ## data has been sent.
  412. assert socket != nil
  413. assert(not socket.closed, "Cannot `send` on a closed socket")
  414. if socket.isSsl:
  415. when defineSsl:
  416. sslLoop(socket, flags,
  417. sslWrite(socket.sslHandle, cast[cstring](buf), size.cint))
  418. await sendPendingSslData(socket, flags)
  419. else:
  420. await send(socket.fd.AsyncFD, buf, size, flags)
  421. proc send*(socket: AsyncSocket, data: string,
  422. flags = {SocketFlag.SafeDisconn}) {.async.} =
  423. ## Sends `data` to `socket`. The returned future will complete once all
  424. ## data has been sent.
  425. assert socket != nil
  426. if socket.isSsl:
  427. when defineSsl:
  428. var copy = data
  429. sslLoop(socket, flags,
  430. sslWrite(socket.sslHandle, cast[cstring](addr copy[0]), copy.len.cint))
  431. await sendPendingSslData(socket, flags)
  432. else:
  433. await send(socket.fd.AsyncFD, data, flags)
  434. proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn},
  435. inheritable = defined(nimInheritHandles)):
  436. owned(Future[tuple[address: string, client: AsyncSocket]]) =
  437. ## Accepts a new connection. Returns a future containing the client socket
  438. ## corresponding to that connection and the remote address of the client.
  439. ##
  440. ## If `inheritable` is false (the default), the resulting client socket will
  441. ## not be inheritable by child processes.
  442. ##
  443. ## The future will complete when the connection is successfully accepted.
  444. var retFuture = newFuture[tuple[address: string, client: AsyncSocket]]("asyncnet.acceptAddr")
  445. var fut = acceptAddr(socket.fd.AsyncFD, flags, inheritable)
  446. fut.callback =
  447. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  448. assert future.finished
  449. if future.failed:
  450. retFuture.fail(future.readError)
  451. else:
  452. let resultTup = (future.read.address,
  453. newAsyncSocket(future.read.client, socket.domain,
  454. socket.sockType, socket.protocol, socket.isBuffered, inheritable))
  455. retFuture.complete(resultTup)
  456. return retFuture
  457. proc accept*(socket: AsyncSocket,
  458. flags = {SocketFlag.SafeDisconn}): owned(Future[AsyncSocket]) =
  459. ## Accepts a new connection. Returns a future containing the client socket
  460. ## corresponding to that connection.
  461. ## If `inheritable` is false (the default), the resulting client socket will
  462. ## not be inheritable by child processes.
  463. ## The future will complete when the connection is successfully accepted.
  464. var retFut = newFuture[AsyncSocket]("asyncnet.accept")
  465. var fut = acceptAddr(socket, flags)
  466. fut.callback =
  467. proc (future: Future[tuple[address: string, client: AsyncSocket]]) =
  468. assert future.finished
  469. if future.failed:
  470. retFut.fail(future.readError)
  471. else:
  472. retFut.complete(future.read.client)
  473. return retFut
  474. proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
  475. flags = {SocketFlag.SafeDisconn}, maxLength = MaxLineLength) {.async.} =
  476. ## Reads a line of data from `socket` into `resString`.
  477. ##
  478. ## If a full line is read `\r\L` is not
  479. ## added to `line`, however if solely `\r\L` is read then `line`
  480. ## will be set to it.
  481. ##
  482. ## If the socket is disconnected, `line` will be set to `""`.
  483. ##
  484. ## If the socket is disconnected in the middle of a line (before `\r\L`
  485. ## is read) then line will be set to `""`.
  486. ## The partial line **will be lost**.
  487. ##
  488. ## The `maxLength` parameter determines the maximum amount of characters
  489. ## that can be read. `resString` will be truncated after that.
  490. ##
  491. ## .. warning:: The `Peek` flag is not yet implemented.
  492. ##
  493. ## .. warning:: `recvLineInto` on unbuffered sockets assumes that the protocol uses `\r\L` to delimit a new line.
  494. assert SocketFlag.Peek notin flags ## TODO:
  495. result = newFuture[void]("asyncnet.recvLineInto")
  496. # TODO: Make the async transformation check for FutureVar params and complete
  497. # them when the result future is completed.
  498. # Can we replace the result future with the FutureVar?
  499. template addNLIfEmpty(): untyped =
  500. if resString.mget.len == 0:
  501. resString.mget.add("\c\L")
  502. if socket.isBuffered:
  503. if socket.bufLen == 0:
  504. let res = socket.readIntoBuf(flags)
  505. if res == 0:
  506. resString.complete()
  507. return
  508. var lastR = false
  509. while true:
  510. if socket.currPos >= socket.bufLen:
  511. let res = socket.readIntoBuf(flags)
  512. if res == 0:
  513. resString.mget.setLen(0)
  514. resString.complete()
  515. return
  516. case socket.buffer[socket.currPos]
  517. of '\r':
  518. lastR = true
  519. addNLIfEmpty()
  520. of '\L':
  521. addNLIfEmpty()
  522. socket.currPos.inc()
  523. resString.complete()
  524. return
  525. else:
  526. if lastR:
  527. socket.currPos.inc()
  528. resString.complete()
  529. return
  530. else:
  531. resString.mget.add socket.buffer[socket.currPos]
  532. socket.currPos.inc()
  533. # Verify that this isn't a DOS attack: #3847.
  534. if resString.mget.len > maxLength: break
  535. else:
  536. var c = ""
  537. while true:
  538. c = await recv(socket, 1, flags)
  539. if c.len == 0:
  540. resString.mget.setLen(0)
  541. resString.complete()
  542. return
  543. if c == "\r":
  544. c = await recv(socket, 1, flags) # Skip \L
  545. assert c == "\L"
  546. addNLIfEmpty()
  547. resString.complete()
  548. return
  549. elif c == "\L":
  550. addNLIfEmpty()
  551. resString.complete()
  552. return
  553. resString.mget.add c
  554. # Verify that this isn't a DOS attack: #3847.
  555. if resString.mget.len > maxLength: break
  556. resString.complete()
  557. proc recvLine*(socket: AsyncSocket,
  558. flags = {SocketFlag.SafeDisconn},
  559. maxLength = MaxLineLength): owned(Future[string]) {.async.} =
  560. ## Reads a line of data from `socket`. Returned future will complete once
  561. ## a full line is read or an error occurs.
  562. ##
  563. ## If a full line is read `\r\L` is not
  564. ## added to `line`, however if solely `\r\L` is read then `line`
  565. ## will be set to it.
  566. ##
  567. ## If the socket is disconnected, `line` will be set to `""`.
  568. ##
  569. ## If the socket is disconnected in the middle of a line (before `\r\L`
  570. ## is read) then line will be set to `""`.
  571. ## The partial line **will be lost**.
  572. ##
  573. ## The `maxLength` parameter determines the maximum amount of characters
  574. ## that can be read. The result is truncated after that.
  575. ##
  576. ## .. warning:: The `Peek` flag is not yet implemented.
  577. ##
  578. ## .. warning:: `recvLine` on unbuffered sockets assumes that the protocol uses `\r\L` to delimit a new line.
  579. assert SocketFlag.Peek notin flags ## TODO:
  580. # TODO: Optimise this
  581. var resString = newFutureVar[string]("asyncnet.recvLine")
  582. resString.mget() = ""
  583. await socket.recvLineInto(resString, flags, maxLength)
  584. result = resString.mget()
  585. proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [
  586. ReadIOEffect].} =
  587. ## Marks `socket` as accepting connections.
  588. ## `Backlog` specifies the maximum length of the
  589. ## queue of pending connections.
  590. ##
  591. ## Raises an OSError error upon failure.
  592. if listen(socket.fd, backlog) < 0'i32: raiseOSError(osLastError())
  593. proc bindAddr*(socket: AsyncSocket, port = Port(0), address = "") {.
  594. tags: [ReadIOEffect].} =
  595. ## Binds `address`:`port` to the socket.
  596. ##
  597. ## If `address` is "" then ADDR_ANY will be bound.
  598. var realaddr = address
  599. if realaddr == "":
  600. case socket.domain
  601. of AF_INET6: realaddr = "::"
  602. of AF_INET: realaddr = "0.0.0.0"
  603. else:
  604. raise newException(ValueError,
  605. "Unknown socket address family and no address specified to bindAddr")
  606. var aiList = getAddrInfo(realaddr, port, socket.domain)
  607. if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.SockLen) < 0'i32:
  608. freeAddrInfo(aiList)
  609. raiseOSError(osLastError())
  610. freeAddrInfo(aiList)
  611. proc hasDataBuffered*(s: AsyncSocket): bool {.since: (1, 5).} =
  612. ## Determines whether an AsyncSocket has data buffered.
  613. # xxx dedup with std/net
  614. s.isBuffered and s.bufLen > 0 and s.currPos != s.bufLen
  615. when defined(posix) and not useNimNetLite:
  616. proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
  617. ## Binds Unix socket to `path`.
  618. ## This only works on Unix-style systems: Mac OS X, BSD and Linux
  619. when not defined(nimdoc):
  620. let retFuture = newFuture[void]("connectUnix")
  621. result = retFuture
  622. proc cb(fd: AsyncFD): bool =
  623. let ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
  624. if ret == 0:
  625. retFuture.complete()
  626. return true
  627. elif ret == EINTR:
  628. return false
  629. else:
  630. retFuture.fail(newOSError(OSErrorCode(ret)))
  631. return true
  632. var socketAddr = makeUnixAddr(path)
  633. let ret = socket.fd.connect(cast[ptr SockAddr](addr socketAddr),
  634. (sizeof(socketAddr.sun_family) + path.len).SockLen)
  635. if ret == 0:
  636. # Request to connect completed immediately.
  637. retFuture.complete()
  638. else:
  639. let lastError = osLastError()
  640. if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
  641. addWrite(AsyncFD(socket.fd), cb)
  642. else:
  643. retFuture.fail(newOSError(lastError))
  644. proc bindUnix*(socket: AsyncSocket, path: string) {.
  645. tags: [ReadIOEffect].} =
  646. ## Binds Unix socket to `path`.
  647. ## This only works on Unix-style systems: Mac OS X, BSD and Linux
  648. when not defined(nimdoc):
  649. var socketAddr = makeUnixAddr(path)
  650. if socket.fd.bindAddr(cast[ptr SockAddr](addr socketAddr),
  651. (sizeof(socketAddr.sun_family) + path.len).SockLen) != 0'i32:
  652. raiseOSError(osLastError())
  653. elif defined(nimdoc):
  654. proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
  655. ## Binds Unix socket to `path`.
  656. ## This only works on Unix-style systems: Mac OS X, BSD and Linux
  657. discard
  658. proc bindUnix*(socket: AsyncSocket, path: string) =
  659. ## Binds Unix socket to `path`.
  660. ## This only works on Unix-style systems: Mac OS X, BSD and Linux
  661. discard
  662. proc close*(socket: AsyncSocket) =
  663. ## Closes the socket.
  664. if socket.closed: return
  665. defer:
  666. socket.fd.AsyncFD.closeSocket()
  667. socket.closed = true # TODO: Add extra debugging checks for this.
  668. when defineSsl:
  669. if socket.isSsl:
  670. let res =
  671. # Don't call SSL_shutdown if the connection has not been fully
  672. # established, see:
  673. # https://github.com/openssl/openssl/issues/710#issuecomment-253897666
  674. if not socket.sslNoShutdown and SSL_in_init(socket.sslHandle) == 0:
  675. ErrClearError()
  676. SSL_shutdown(socket.sslHandle)
  677. else:
  678. 0
  679. SSL_free(socket.sslHandle)
  680. if res == 0:
  681. discard
  682. elif res != 1:
  683. raiseSSLError()
  684. when defineSsl:
  685. proc sslHandle*(self: AsyncSocket): SslPtr =
  686. ## Retrieve the ssl pointer of `socket`.
  687. ## Useful for interfacing with `openssl`.
  688. self.sslHandle
  689. proc wrapSocket*(ctx: SslContext, socket: AsyncSocket) =
  690. ## Wraps a socket in an SSL context. This function effectively turns
  691. ## `socket` into an SSL socket.
  692. ##
  693. ## **Disclaimer**: This code is not well tested, may be very unsafe and
  694. ## prone to security vulnerabilities.
  695. socket.isSsl = true
  696. socket.sslContext = ctx
  697. socket.sslHandle = SSL_new(socket.sslContext.context)
  698. if socket.sslHandle == nil:
  699. raiseSSLError()
  700. socket.bioIn = bioNew(bioSMem())
  701. socket.bioOut = bioNew(bioSMem())
  702. sslSetBio(socket.sslHandle, socket.bioIn, socket.bioOut)
  703. socket.sslNoShutdown = true
  704. proc wrapConnectedSocket*(ctx: SslContext, socket: AsyncSocket,
  705. handshake: SslHandshakeType,
  706. hostname: string = "") =
  707. ## Wraps a connected socket in an SSL context. This function effectively
  708. ## turns `socket` into an SSL socket.
  709. ## `hostname` should be specified so that the client knows which hostname
  710. ## the server certificate should be validated against.
  711. ##
  712. ## This should be called on a connected socket, and will perform
  713. ## an SSL handshake immediately.
  714. ##
  715. ## **Disclaimer**: This code is not well tested, may be very unsafe and
  716. ## prone to security vulnerabilities.
  717. wrapSocket(ctx, socket)
  718. case handshake
  719. of handshakeAsClient:
  720. if hostname.len > 0 and not isIpAddress(hostname):
  721. # Set the SNI address for this connection. This call can fail if
  722. # we're not using TLSv1+.
  723. discard SSL_set_tlsext_host_name(socket.sslHandle, hostname)
  724. sslSetConnectState(socket.sslHandle)
  725. of handshakeAsServer:
  726. sslSetAcceptState(socket.sslHandle)
  727. proc getPeerCertificates*(socket: AsyncSocket): seq[Certificate] {.since: (1, 1).} =
  728. ## Returns the certificate chain received by the peer we are connected to
  729. ## through the given socket.
  730. ## The handshake must have been completed and the certificate chain must
  731. ## have been verified successfully or else an empty sequence is returned.
  732. ## The chain is ordered from leaf certificate to root certificate.
  733. if not socket.isSsl:
  734. result = newSeq[Certificate]()
  735. else:
  736. result = getPeerCertificates(socket.sslHandle)
  737. proc getSockOpt*(socket: AsyncSocket, opt: SOBool, level = SOL_SOCKET): bool {.
  738. tags: [ReadIOEffect].} =
  739. ## Retrieves option `opt` as a boolean value.
  740. var res = getSockOptInt(socket.fd, cint(level), toCInt(opt))
  741. result = res != 0
  742. proc setSockOpt*(socket: AsyncSocket, opt: SOBool, value: bool,
  743. level = SOL_SOCKET) {.tags: [WriteIOEffect].} =
  744. ## Sets option `opt` to a boolean value specified by `value`.
  745. var valuei = cint(if value: 1 else: 0)
  746. setSockOptInt(socket.fd, cint(level), toCInt(opt), valuei)
  747. proc isSsl*(socket: AsyncSocket): bool =
  748. ## Determines whether `socket` is a SSL socket.
  749. socket.isSsl
  750. proc getFd*(socket: AsyncSocket): SocketHandle =
  751. ## Returns the socket's file descriptor.
  752. return socket.fd
  753. proc isClosed*(socket: AsyncSocket): bool =
  754. ## Determines whether the socket has been closed.
  755. return socket.closed
  756. proc sendTo*(socket: AsyncSocket, address: string, port: Port, data: string,
  757. flags = {SocketFlag.SafeDisconn}): owned(Future[void])
  758. {.async, since: (1, 3).} =
  759. ## This proc sends `data` to the specified `address`, which may be an IP
  760. ## address or a hostname. If a hostname is specified this function will try
  761. ## each IP of that hostname. The returned future will complete once all data
  762. ## has been sent.
  763. ##
  764. ## If an error occurs an OSError exception will be raised.
  765. ##
  766. ## This proc is normally used with connectionless sockets (UDP sockets).
  767. assert(socket.protocol != IPPROTO_TCP,
  768. "Cannot `sendTo` on a TCP socket. Use `send` instead")
  769. assert(not socket.closed, "Cannot `sendTo` on a closed socket")
  770. let aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
  771. socket.protocol)
  772. var
  773. it = aiList
  774. success = false
  775. lastException: ref Exception
  776. while it != nil:
  777. let fut = sendTo(socket.fd.AsyncFD, cstring(data), len(data), it.ai_addr,
  778. it.ai_addrlen.SockLen, flags)
  779. yield fut
  780. if not fut.failed:
  781. success = true
  782. break
  783. lastException = fut.readError()
  784. it = it.ai_next
  785. freeAddrInfo(aiList)
  786. if not success:
  787. if lastException != nil:
  788. raise lastException
  789. else:
  790. raise newException(IOError, "Couldn't resolve address: " & address)
  791. proc recvFrom*(socket: AsyncSocket, data: FutureVar[string], size: int,
  792. address: FutureVar[string], port: FutureVar[Port],
  793. flags = {SocketFlag.SafeDisconn}): owned(Future[int])
  794. {.async, since: (1, 3).} =
  795. ## Receives a datagram data from `socket` into `data`, which must be at
  796. ## least of size `size`. The address and port of datagram's sender will be
  797. ## stored into `address` and `port`, respectively. Returned future will
  798. ## complete once one datagram has been received, and will return size of
  799. ## packet received.
  800. ##
  801. ## If an error occurs an OSError exception will be raised.
  802. ##
  803. ## This proc is normally used with connectionless sockets (UDP sockets).
  804. ##
  805. ## **Notes**
  806. ## * `data` must be initialized to the length of `size`.
  807. ## * `address` must be initialized to 46 in length.
  808. template adaptRecvFromToDomain(domain: Domain) =
  809. var lAddr = sizeof(sAddr).SockLen
  810. result = await recvFromInto(AsyncFD(getFd(socket)), cstring(data.mget()), size,
  811. cast[ptr SockAddr](addr sAddr), addr lAddr,
  812. flags)
  813. data.mget().setLen(result)
  814. data.complete()
  815. getAddrString(cast[ptr SockAddr](addr sAddr), address.mget())
  816. address.complete()
  817. when domain == AF_INET6:
  818. port.complete(ntohs(sAddr.sin6_port).Port)
  819. else:
  820. port.complete(ntohs(sAddr.sin_port).Port)
  821. assert(socket.protocol != IPPROTO_TCP,
  822. "Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead")
  823. assert(not socket.closed, "Cannot `recvFrom` on a closed socket")
  824. assert(size == len(data.mget()),
  825. "`date` was not initialized correctly. `size` != `len(data.mget())`")
  826. assert(46 == len(address.mget()),
  827. "`address` was not initialized correctly. 46 != `len(address.mget())`")
  828. case socket.domain
  829. of AF_INET6:
  830. var sAddr: Sockaddr_in6
  831. adaptRecvFromToDomain(AF_INET6)
  832. of AF_INET:
  833. var sAddr: Sockaddr_in
  834. adaptRecvFromToDomain(AF_INET)
  835. else:
  836. raise newException(ValueError, "Unknown socket address family")
  837. proc recvFrom*(socket: AsyncSocket, size: int,
  838. flags = {SocketFlag.SafeDisconn}):
  839. owned(Future[tuple[data: string, address: string, port: Port]])
  840. {.async, since: (1, 3).} =
  841. ## Receives a datagram data from `socket`, which must be at least of size
  842. ## `size`. Returned future will complete once one datagram has been received
  843. ## and will return tuple with: data of packet received; and address and port
  844. ## of datagram's sender.
  845. ##
  846. ## If an error occurs an OSError exception will be raised.
  847. ##
  848. ## This proc is normally used with connectionless sockets (UDP sockets).
  849. var
  850. data = newFutureVar[string]()
  851. address = newFutureVar[string]()
  852. port = newFutureVar[Port]()
  853. data.mget().setLen(size)
  854. address.mget().setLen(46)
  855. let read = await recvFrom(socket, data, size, address, port, flags)
  856. result = (data.mget(), address.mget(), port.mget())
  857. when not defined(testing) and isMainModule:
  858. type
  859. TestCases = enum
  860. HighClient, LowClient, LowServer
  861. const test = HighClient
  862. when test == HighClient:
  863. proc main() {.async.} =
  864. var sock = newAsyncSocket()
  865. await sock.connect("irc.freenode.net", Port(6667))
  866. while true:
  867. let line = await sock.recvLine()
  868. if line == "":
  869. echo("Disconnected")
  870. break
  871. else:
  872. echo("Got line: ", line)
  873. asyncCheck main()
  874. elif test == LowClient:
  875. var sock = newAsyncSocket()
  876. var f = connect(sock, "irc.freenode.net", Port(6667))
  877. f.callback =
  878. proc (future: Future[void]) =
  879. echo("Connected in future!")
  880. for i in 0 .. 50:
  881. var recvF = recv(sock, 10)
  882. recvF.callback =
  883. proc (future: Future[string]) =
  884. echo("Read ", future.read.len, ": ", future.read.repr)
  885. elif test == LowServer:
  886. var sock = newAsyncSocket()
  887. sock.bindAddr(Port(6667))
  888. sock.listen()
  889. proc onAccept(future: Future[AsyncSocket]) =
  890. let client = future.read
  891. echo "Accepted ", client.fd.cint
  892. var t = send(client, "test\c\L")
  893. t.callback =
  894. proc (future: Future[void]) =
  895. echo("Send")
  896. client.close()
  897. var f = accept(sock)
  898. f.callback = onAccept
  899. var f = accept(sock)
  900. f.callback = onAccept
  901. runForever()