123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005 |
- #
- #
- # Nim's Runtime Library
- # (c) Copyright 2017 Dominik Picheta
- #
- # See the file "copying.txt", included in this
- # distribution, for details about the copyright.
- #
- ## This module implements a high-level asynchronous sockets API based on the
- ## asynchronous dispatcher defined in the `asyncdispatch` module.
- ##
- ## Asynchronous IO in Nim
- ## ======================
- ##
- ## Async IO in Nim consists of multiple layers (from highest to lowest):
- ##
- ## * `asyncnet` module
- ##
- ## * Async await
- ##
- ## * `asyncdispatch` module (event loop)
- ##
- ## * `selectors` module
- ##
- ## Each builds on top of the layers below it. The selectors module is an
- ## abstraction for the various system `select()` mechanisms such as epoll or
- ## kqueue. If you wish you can use it directly, and some people have done so
- ## `successfully <http://goran.krampe.se/2014/10/25/nim-socketserver/>`_.
- ## But you must be aware that on Windows it only supports
- ## `select()`.
- ##
- ## The async dispatcher implements the proactor pattern and also has an
- ## implementation of IOCP. It implements the proactor pattern for other
- ## OS' via the selectors module. Futures are also implemented here, and
- ## indeed all the procedures return a future.
- ##
- ## The final layer is the async await transformation. This allows you to
- ## write asynchronous code in a synchronous style and works similar to
- ## C#'s await. The transformation works by converting any async procedures
- ## into an iterator.
- ##
- ## This is all single threaded, fully non-blocking and does give you a
- ## lot of control. In theory you should be able to work with any of these
- ## layers interchangeably (as long as you only care about non-Windows
- ## platforms).
- ##
- ## For most applications using `asyncnet` is the way to go as it builds
- ## over all the layers, providing some extra features such as buffering.
- ##
- ## SSL
- ## ===
- ##
- ## SSL can be enabled by compiling with the `-d:ssl` flag.
- ##
- ## You must create a new SSL context with the `newContext` function defined
- ## in the `net` module. You may then call `wrapSocket` on your socket using
- ## the newly created SSL context to get an SSL socket.
- ##
- ## Examples
- ## ========
- ##
- ## Chat server
- ## -----------
- ##
- ## The following example demonstrates a simple chat server.
- ##
- ## ```Nim
- ## import std/[asyncnet, asyncdispatch]
- ##
- ## var clients {.threadvar.}: seq[AsyncSocket]
- ##
- ## proc processClient(client: AsyncSocket) {.async.} =
- ## while true:
- ## let line = await client.recvLine()
- ## if line.len == 0: break
- ## for c in clients:
- ## await c.send(line & "\c\L")
- ##
- ## proc serve() {.async.} =
- ## clients = @[]
- ## var server = newAsyncSocket()
- ## server.setSockOpt(OptReuseAddr, true)
- ## server.bindAddr(Port(12345))
- ## server.listen()
- ##
- ## while true:
- ## let client = await server.accept()
- ## clients.add client
- ##
- ## asyncCheck processClient(client)
- ##
- ## asyncCheck serve()
- ## runForever()
- ## ```
- import std/private/since
- when defined(nimPreviewSlimSystem):
- import std/[assertions, syncio]
- import asyncdispatch, nativesockets, net, os
- export SOBool
- # TODO: Remove duplication introduced by PR #4683.
- const defineSsl = defined(ssl) or defined(nimdoc)
- const useNimNetLite = defined(nimNetLite) or defined(freertos) or defined(zephyr) or
- defined(nuttx)
- when defineSsl:
- import openssl
- type
- # TODO: I would prefer to just do:
- # AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
- AsyncSocketDesc = object
- fd: SocketHandle
- closed: bool ## determines whether this socket has been closed
- isBuffered: bool ## determines whether this socket is buffered.
- buffer: array[0..BufferSize, char]
- currPos: int # current index in buffer
- bufLen: int # current length of buffer
- isSsl: bool
- when defineSsl:
- sslHandle: SslPtr
- sslContext: SslContext
- bioIn: BIO
- bioOut: BIO
- sslNoShutdown: bool
- domain: Domain
- sockType: SockType
- protocol: Protocol
- AsyncSocket* = ref AsyncSocketDesc
- proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
- sockType: SockType = SOCK_STREAM,
- protocol: Protocol = IPPROTO_TCP,
- buffered = true,
- inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
- ## Creates a new `AsyncSocket` based on the supplied params.
- ##
- ## The supplied `fd`'s non-blocking state will be enabled implicitly.
- ##
- ## If `inheritable` is false (the default), the supplied `fd` will not
- ## be inheritable by child processes.
- ##
- ## **Note**: This procedure will **NOT** register `fd` with the global
- ## async dispatcher. You need to do this manually. If you have used
- ## `newAsyncNativeSocket` to create `fd` then it's already registered.
- assert fd != osInvalidSocket.AsyncFD
- new(result)
- result.fd = fd.SocketHandle
- fd.SocketHandle.setBlocking(false)
- if not fd.SocketHandle.setInheritable(inheritable):
- raiseOSError(osLastError())
- result.isBuffered = buffered
- result.domain = domain
- result.sockType = sockType
- result.protocol = protocol
- if buffered:
- result.currPos = 0
- proc newAsyncSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM,
- protocol: Protocol = IPPROTO_TCP, buffered = true,
- inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
- ## Creates a new asynchronous socket.
- ##
- ## This procedure will also create a brand new file descriptor for
- ## this socket.
- ##
- ## If `inheritable` is false (the default), the new file descriptor will not
- ## be inheritable by child processes.
- let fd = createAsyncNativeSocket(domain, sockType, protocol, inheritable)
- if fd.SocketHandle == osInvalidSocket:
- raiseOSError(osLastError())
- result = newAsyncSocket(fd, domain, sockType, protocol, buffered, inheritable)
- proc getLocalAddr*(socket: AsyncSocket): (string, Port) =
- ## Get the socket's local address and port number.
- ##
- ## This is high-level interface for `getsockname`:idx:.
- getLocalAddr(socket.fd, socket.domain)
- when not useNimNetLite:
- proc getPeerAddr*(socket: AsyncSocket): (string, Port) =
- ## Get the socket's peer address and port number.
- ##
- ## This is high-level interface for `getpeername`:idx:.
- getPeerAddr(socket.fd, socket.domain)
- proc newAsyncSocket*(domain, sockType, protocol: cint,
- buffered = true,
- inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
- ## Creates a new asynchronous socket.
- ##
- ## This procedure will also create a brand new file descriptor for
- ## this socket.
- ##
- ## If `inheritable` is false (the default), the new file descriptor will not
- ## be inheritable by child processes.
- let fd = createAsyncNativeSocket(domain, sockType, protocol, inheritable)
- if fd.SocketHandle == osInvalidSocket:
- raiseOSError(osLastError())
- result = newAsyncSocket(fd, Domain(domain), SockType(sockType),
- Protocol(protocol), buffered, inheritable)
- when defineSsl:
- proc getSslError(socket: AsyncSocket, err: cint): cint =
- assert socket.isSsl
- assert err < 0
- var ret = SSL_get_error(socket.sslHandle, err.cint)
- case ret
- of SSL_ERROR_ZERO_RETURN:
- raiseSSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
- of SSL_ERROR_WANT_CONNECT, SSL_ERROR_WANT_ACCEPT:
- return ret
- of SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ:
- return ret
- of SSL_ERROR_WANT_X509_LOOKUP:
- raiseSSLError("Function for x509 lookup has been called.")
- of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
- socket.sslNoShutdown = true
- raiseSSLError()
- else: raiseSSLError("Unknown Error")
- proc sendPendingSslData(socket: AsyncSocket,
- flags: set[SocketFlag]) {.async.} =
- let len = bioCtrlPending(socket.bioOut)
- if len > 0:
- var data = newString(len)
- let read = bioRead(socket.bioOut, cast[cstring](addr data[0]), len)
- assert read != 0
- if read < 0:
- raiseSSLError()
- data.setLen(read)
- await socket.fd.AsyncFD.send(data, flags)
- proc appeaseSsl(socket: AsyncSocket, flags: set[SocketFlag],
- sslError: cint): owned(Future[bool]) {.async.} =
- ## Returns `true` if `socket` is still connected, otherwise `false`.
- result = true
- case sslError
- of SSL_ERROR_WANT_WRITE:
- await sendPendingSslData(socket, flags)
- of SSL_ERROR_WANT_READ:
- var data = await recv(socket.fd.AsyncFD, BufferSize, flags)
- let length = len(data)
- if length > 0:
- let ret = bioWrite(socket.bioIn, cast[cstring](addr data[0]), length.cint)
- if ret < 0:
- raiseSSLError()
- elif length == 0:
- # connection not properly closed by remote side or connection dropped
- SSL_set_shutdown(socket.sslHandle, SSL_RECEIVED_SHUTDOWN)
- result = false
- else:
- raiseSSLError("Cannot appease SSL.")
- template sslLoop(socket: AsyncSocket, flags: set[SocketFlag],
- op: untyped) =
- var opResult {.inject.} = -1.cint
- while opResult < 0:
- ErrClearError()
- # Call the desired operation.
- opResult = op
- let err =
- if opResult < 0:
- getSslError(socket, opResult.cint)
- else:
- SSL_ERROR_NONE
- # Send any remaining pending SSL data.
- await sendPendingSslData(socket, flags)
- # If the operation failed, try to see if SSL has some data to read
- # or write.
- if opResult < 0:
- let fut = appeaseSsl(socket, flags, err.cint)
- yield fut
- if not fut.read():
- # Socket disconnected.
- if SocketFlag.SafeDisconn in flags:
- opResult = 0.cint
- break
- else:
- raiseSSLError("Socket has been disconnected")
- proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
- buffered = true): owned(Future[AsyncSocket]) {.async.} =
- ## Establishes connection to the specified `address`:`port` pair via the
- ## specified protocol. The procedure iterates through possible
- ## resolutions of the `address` until it succeeds, meaning that it
- ## seamlessly works with both IPv4 and IPv6.
- ## Returns AsyncSocket ready to send or receive data.
- let asyncFd = await asyncdispatch.dial(address, port, protocol)
- let sockType = protocol.toSockType()
- let domain = getSockDomain(asyncFd.SocketHandle)
- result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)
- proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
- ## Connects `socket` to server at `address:port`.
- ##
- ## Returns a `Future` which will complete when the connection succeeds
- ## or an error occurs.
- await connect(socket.fd.AsyncFD, address, port, socket.domain)
- if socket.isSsl:
- when defineSsl:
- if not isIpAddress(address):
- # Set the SNI address for this connection. This call can fail if
- # we're not using TLSv1+.
- discard SSL_set_tlsext_host_name(socket.sslHandle, address)
- let flags = {SocketFlag.SafeDisconn}
- sslSetConnectState(socket.sslHandle)
- sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
- template readInto(buf: pointer, size: int, socket: AsyncSocket,
- flags: set[SocketFlag]): int =
- ## Reads **up to** `size` bytes from `socket` into `buf`. Note that
- ## this is a template and not a proc.
- assert(not socket.closed, "Cannot `recv` on a closed socket")
- var res = 0
- if socket.isSsl:
- when defineSsl:
- # SSL mode.
- sslLoop(socket, flags,
- sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
- res = opResult
- else:
- # Not in SSL mode.
- res = await asyncdispatch.recvInto(socket.fd.AsyncFD, buf, size, flags)
- res
- template readIntoBuf(socket: AsyncSocket,
- flags: set[SocketFlag]): int =
- var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
- socket.currPos = 0
- socket.bufLen = size
- size
- proc recvInto*(socket: AsyncSocket, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) {.async.} =
- ## Reads **up to** `size` bytes from `socket` into `buf`.
- ##
- ## For buffered sockets this function will attempt to read all the requested
- ## data. It will read this data in `BufferSize` chunks.
- ##
- ## For unbuffered sockets this function makes no effort to read
- ## all the data requested. It will return as much data as the operating system
- ## gives it.
- ##
- ## If socket is disconnected during the
- ## recv operation then the future may complete with only a part of the
- ## requested data.
- ##
- ## If socket is disconnected and no data is available
- ## to be read then the future will complete with a value of `0`.
- if socket.isBuffered:
- let originalBufPos = socket.currPos
- if socket.bufLen == 0:
- let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
- if res == 0:
- return 0
- var read = 0
- var cbuf = cast[cstring](buf)
- while read < size:
- if socket.currPos >= socket.bufLen:
- if SocketFlag.Peek in flags:
- # We don't want to get another buffer if we're peeking.
- break
- let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
- if res == 0:
- break
- let chunk = min(socket.bufLen-socket.currPos, size-read)
- copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
- read.inc(chunk)
- socket.currPos.inc(chunk)
- if SocketFlag.Peek in flags:
- # Restore old buffer cursor position.
- socket.currPos = originalBufPos
- result = read
- else:
- result = readInto(buf, size, socket, flags)
- proc recv*(socket: AsyncSocket, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[string]) {.async.} =
- ## Reads **up to** `size` bytes from `socket`.
- ##
- ## For buffered sockets this function will attempt to read all the requested
- ## data. It will read this data in `BufferSize` chunks.
- ##
- ## For unbuffered sockets this function makes no effort to read
- ## all the data requested. It will return as much data as the operating system
- ## gives it.
- ##
- ## If socket is disconnected during the
- ## recv operation then the future may complete with only a part of the
- ## requested data.
- ##
- ## If socket is disconnected and no data is available
- ## to be read then the future will complete with a value of `""`.
- if socket.isBuffered:
- result = newString(size)
- when not defined(nimSeqsV2):
- shallow(result)
- let originalBufPos = socket.currPos
- if socket.bufLen == 0:
- let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
- if res == 0:
- result.setLen(0)
- return
- var read = 0
- while read < size:
- if socket.currPos >= socket.bufLen:
- if SocketFlag.Peek in flags:
- # We don't want to get another buffer if we're peeking.
- break
- let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
- if res == 0:
- break
- let chunk = min(socket.bufLen-socket.currPos, size-read)
- copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
- read.inc(chunk)
- socket.currPos.inc(chunk)
- if SocketFlag.Peek in flags:
- # Restore old buffer cursor position.
- socket.currPos = originalBufPos
- result.setLen(read)
- else:
- result = newString(size)
- let read = readInto(addr result[0], size, socket, flags)
- result.setLen(read)
- proc send*(socket: AsyncSocket, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}) {.async.} =
- ## Sends `size` bytes from `buf` to `socket`. The returned future will complete once all
- ## data has been sent.
- assert socket != nil
- assert(not socket.closed, "Cannot `send` on a closed socket")
- if socket.isSsl:
- when defineSsl:
- sslLoop(socket, flags,
- sslWrite(socket.sslHandle, cast[cstring](buf), size.cint))
- await sendPendingSslData(socket, flags)
- else:
- await send(socket.fd.AsyncFD, buf, size, flags)
- proc send*(socket: AsyncSocket, data: string,
- flags = {SocketFlag.SafeDisconn}) {.async.} =
- ## Sends `data` to `socket`. The returned future will complete once all
- ## data has been sent.
- assert socket != nil
- if socket.isSsl:
- when defineSsl:
- var copy = data
- sslLoop(socket, flags,
- sslWrite(socket.sslHandle, cast[cstring](addr copy[0]), copy.len.cint))
- await sendPendingSslData(socket, flags)
- else:
- await send(socket.fd.AsyncFD, data, flags)
- proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn},
- inheritable = defined(nimInheritHandles)):
- owned(Future[tuple[address: string, client: AsyncSocket]]) =
- ## Accepts a new connection. Returns a future containing the client socket
- ## corresponding to that connection and the remote address of the client.
- ##
- ## If `inheritable` is false (the default), the resulting client socket will
- ## not be inheritable by child processes.
- ##
- ## The future will complete when the connection is successfully accepted.
- var retFuture = newFuture[tuple[address: string, client: AsyncSocket]]("asyncnet.acceptAddr")
- var fut = acceptAddr(socket.fd.AsyncFD, flags, inheritable)
- fut.callback =
- proc (future: Future[tuple[address: string, client: AsyncFD]]) =
- assert future.finished
- if future.failed:
- retFuture.fail(future.readError)
- else:
- let resultTup = (future.read.address,
- newAsyncSocket(future.read.client, socket.domain,
- socket.sockType, socket.protocol, socket.isBuffered, inheritable))
- retFuture.complete(resultTup)
- return retFuture
- proc accept*(socket: AsyncSocket,
- flags = {SocketFlag.SafeDisconn}): owned(Future[AsyncSocket]) =
- ## Accepts a new connection. Returns a future containing the client socket
- ## corresponding to that connection.
- ## If `inheritable` is false (the default), the resulting client socket will
- ## not be inheritable by child processes.
- ## The future will complete when the connection is successfully accepted.
- var retFut = newFuture[AsyncSocket]("asyncnet.accept")
- var fut = acceptAddr(socket, flags)
- fut.callback =
- proc (future: Future[tuple[address: string, client: AsyncSocket]]) =
- assert future.finished
- if future.failed:
- retFut.fail(future.readError)
- else:
- retFut.complete(future.read.client)
- return retFut
- proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
- flags = {SocketFlag.SafeDisconn}, maxLength = MaxLineLength) {.async.} =
- ## Reads a line of data from `socket` into `resString`.
- ##
- ## If a full line is read `\r\L` is not
- ## added to `line`, however if solely `\r\L` is read then `line`
- ## will be set to it.
- ##
- ## If the socket is disconnected, `line` will be set to `""`.
- ##
- ## If the socket is disconnected in the middle of a line (before `\r\L`
- ## is read) then line will be set to `""`.
- ## The partial line **will be lost**.
- ##
- ## The `maxLength` parameter determines the maximum amount of characters
- ## that can be read. `resString` will be truncated after that.
- ##
- ## .. warning:: The `Peek` flag is not yet implemented.
- ##
- ## .. warning:: `recvLineInto` on unbuffered sockets assumes that the protocol uses `\r\L` to delimit a new line.
- assert SocketFlag.Peek notin flags ## TODO:
- result = newFuture[void]("asyncnet.recvLineInto")
- # TODO: Make the async transformation check for FutureVar params and complete
- # them when the result future is completed.
- # Can we replace the result future with the FutureVar?
- template addNLIfEmpty(): untyped =
- if resString.mget.len == 0:
- resString.mget.add("\c\L")
- if socket.isBuffered:
- if socket.bufLen == 0:
- let res = socket.readIntoBuf(flags)
- if res == 0:
- resString.complete()
- return
- var lastR = false
- while true:
- if socket.currPos >= socket.bufLen:
- let res = socket.readIntoBuf(flags)
- if res == 0:
- resString.mget.setLen(0)
- resString.complete()
- return
- case socket.buffer[socket.currPos]
- of '\r':
- lastR = true
- addNLIfEmpty()
- of '\L':
- addNLIfEmpty()
- socket.currPos.inc()
- resString.complete()
- return
- else:
- if lastR:
- socket.currPos.inc()
- resString.complete()
- return
- else:
- resString.mget.add socket.buffer[socket.currPos]
- socket.currPos.inc()
- # Verify that this isn't a DOS attack: #3847.
- if resString.mget.len > maxLength: break
- else:
- var c = ""
- while true:
- c = await recv(socket, 1, flags)
- if c.len == 0:
- resString.mget.setLen(0)
- resString.complete()
- return
- if c == "\r":
- c = await recv(socket, 1, flags) # Skip \L
- assert c == "\L"
- addNLIfEmpty()
- resString.complete()
- return
- elif c == "\L":
- addNLIfEmpty()
- resString.complete()
- return
- resString.mget.add c
- # Verify that this isn't a DOS attack: #3847.
- if resString.mget.len > maxLength: break
- resString.complete()
- proc recvLine*(socket: AsyncSocket,
- flags = {SocketFlag.SafeDisconn},
- maxLength = MaxLineLength): owned(Future[string]) {.async.} =
- ## Reads a line of data from `socket`. Returned future will complete once
- ## a full line is read or an error occurs.
- ##
- ## If a full line is read `\r\L` is not
- ## added to `line`, however if solely `\r\L` is read then `line`
- ## will be set to it.
- ##
- ## If the socket is disconnected, `line` will be set to `""`.
- ##
- ## If the socket is disconnected in the middle of a line (before `\r\L`
- ## is read) then line will be set to `""`.
- ## The partial line **will be lost**.
- ##
- ## The `maxLength` parameter determines the maximum amount of characters
- ## that can be read. The result is truncated after that.
- ##
- ## .. warning:: The `Peek` flag is not yet implemented.
- ##
- ## .. warning:: `recvLine` on unbuffered sockets assumes that the protocol uses `\r\L` to delimit a new line.
- assert SocketFlag.Peek notin flags ## TODO:
- # TODO: Optimise this
- var resString = newFutureVar[string]("asyncnet.recvLine")
- resString.mget() = ""
- await socket.recvLineInto(resString, flags, maxLength)
- result = resString.mget()
- proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [
- ReadIOEffect].} =
- ## Marks `socket` as accepting connections.
- ## `Backlog` specifies the maximum length of the
- ## queue of pending connections.
- ##
- ## Raises an OSError error upon failure.
- if listen(socket.fd, backlog) < 0'i32: raiseOSError(osLastError())
- proc bindAddr*(socket: AsyncSocket, port = Port(0), address = "") {.
- tags: [ReadIOEffect].} =
- ## Binds `address`:`port` to the socket.
- ##
- ## If `address` is "" then ADDR_ANY will be bound.
- var realaddr = address
- if realaddr == "":
- case socket.domain
- of AF_INET6: realaddr = "::"
- of AF_INET: realaddr = "0.0.0.0"
- else:
- raise newException(ValueError,
- "Unknown socket address family and no address specified to bindAddr")
- var aiList = getAddrInfo(realaddr, port, socket.domain)
- if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.SockLen) < 0'i32:
- freeAddrInfo(aiList)
- raiseOSError(osLastError())
- freeAddrInfo(aiList)
- proc hasDataBuffered*(s: AsyncSocket): bool {.since: (1, 5).} =
- ## Determines whether an AsyncSocket has data buffered.
- # xxx dedup with std/net
- s.isBuffered and s.bufLen > 0 and s.currPos != s.bufLen
- when defined(posix) and not useNimNetLite:
- proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
- ## Binds Unix socket to `path`.
- ## This only works on Unix-style systems: Mac OS X, BSD and Linux
- when not defined(nimdoc):
- let retFuture = newFuture[void]("connectUnix")
- result = retFuture
- proc cb(fd: AsyncFD): bool =
- let ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
- if ret == 0:
- retFuture.complete()
- return true
- elif ret == EINTR:
- return false
- else:
- retFuture.fail(newOSError(OSErrorCode(ret)))
- return true
- var socketAddr = makeUnixAddr(path)
- let ret = socket.fd.connect(cast[ptr SockAddr](addr socketAddr),
- (sizeof(socketAddr.sun_family) + path.len).SockLen)
- if ret == 0:
- # Request to connect completed immediately.
- retFuture.complete()
- else:
- let lastError = osLastError()
- if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
- addWrite(AsyncFD(socket.fd), cb)
- else:
- retFuture.fail(newOSError(lastError))
- proc bindUnix*(socket: AsyncSocket, path: string) {.
- tags: [ReadIOEffect].} =
- ## Binds Unix socket to `path`.
- ## This only works on Unix-style systems: Mac OS X, BSD and Linux
- when not defined(nimdoc):
- var socketAddr = makeUnixAddr(path)
- if socket.fd.bindAddr(cast[ptr SockAddr](addr socketAddr),
- (sizeof(socketAddr.sun_family) + path.len).SockLen) != 0'i32:
- raiseOSError(osLastError())
- elif defined(nimdoc):
- proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
- ## Binds Unix socket to `path`.
- ## This only works on Unix-style systems: Mac OS X, BSD and Linux
- discard
- proc bindUnix*(socket: AsyncSocket, path: string) =
- ## Binds Unix socket to `path`.
- ## This only works on Unix-style systems: Mac OS X, BSD and Linux
- discard
- proc close*(socket: AsyncSocket) =
- ## Closes the socket.
- if socket.closed: return
- defer:
- socket.fd.AsyncFD.closeSocket()
- socket.closed = true # TODO: Add extra debugging checks for this.
- when defineSsl:
- if socket.isSsl:
- let res =
- # Don't call SSL_shutdown if the connection has not been fully
- # established, see:
- # https://github.com/openssl/openssl/issues/710#issuecomment-253897666
- if not socket.sslNoShutdown and SSL_in_init(socket.sslHandle) == 0:
- ErrClearError()
- SSL_shutdown(socket.sslHandle)
- else:
- 0
- SSL_free(socket.sslHandle)
- if res == 0:
- discard
- elif res != 1:
- raiseSSLError()
- when defineSsl:
- proc sslHandle*(self: AsyncSocket): SslPtr =
- ## Retrieve the ssl pointer of `socket`.
- ## Useful for interfacing with `openssl`.
- self.sslHandle
-
- proc wrapSocket*(ctx: SslContext, socket: AsyncSocket) =
- ## Wraps a socket in an SSL context. This function effectively turns
- ## `socket` into an SSL socket.
- ##
- ## **Disclaimer**: This code is not well tested, may be very unsafe and
- ## prone to security vulnerabilities.
- socket.isSsl = true
- socket.sslContext = ctx
- socket.sslHandle = SSL_new(socket.sslContext.context)
- if socket.sslHandle == nil:
- raiseSSLError()
- socket.bioIn = bioNew(bioSMem())
- socket.bioOut = bioNew(bioSMem())
- sslSetBio(socket.sslHandle, socket.bioIn, socket.bioOut)
- socket.sslNoShutdown = true
- proc wrapConnectedSocket*(ctx: SslContext, socket: AsyncSocket,
- handshake: SslHandshakeType,
- hostname: string = "") =
- ## Wraps a connected socket in an SSL context. This function effectively
- ## turns `socket` into an SSL socket.
- ## `hostname` should be specified so that the client knows which hostname
- ## the server certificate should be validated against.
- ##
- ## This should be called on a connected socket, and will perform
- ## an SSL handshake immediately.
- ##
- ## **Disclaimer**: This code is not well tested, may be very unsafe and
- ## prone to security vulnerabilities.
- wrapSocket(ctx, socket)
- case handshake
- of handshakeAsClient:
- if hostname.len > 0 and not isIpAddress(hostname):
- # Set the SNI address for this connection. This call can fail if
- # we're not using TLSv1+.
- discard SSL_set_tlsext_host_name(socket.sslHandle, hostname)
- sslSetConnectState(socket.sslHandle)
- of handshakeAsServer:
- sslSetAcceptState(socket.sslHandle)
- proc getPeerCertificates*(socket: AsyncSocket): seq[Certificate] {.since: (1, 1).} =
- ## Returns the certificate chain received by the peer we are connected to
- ## through the given socket.
- ## The handshake must have been completed and the certificate chain must
- ## have been verified successfully or else an empty sequence is returned.
- ## The chain is ordered from leaf certificate to root certificate.
- if not socket.isSsl:
- result = newSeq[Certificate]()
- else:
- result = getPeerCertificates(socket.sslHandle)
- proc getSockOpt*(socket: AsyncSocket, opt: SOBool, level = SOL_SOCKET): bool {.
- tags: [ReadIOEffect].} =
- ## Retrieves option `opt` as a boolean value.
- var res = getSockOptInt(socket.fd, cint(level), toCInt(opt))
- result = res != 0
- proc setSockOpt*(socket: AsyncSocket, opt: SOBool, value: bool,
- level = SOL_SOCKET) {.tags: [WriteIOEffect].} =
- ## Sets option `opt` to a boolean value specified by `value`.
- var valuei = cint(if value: 1 else: 0)
- setSockOptInt(socket.fd, cint(level), toCInt(opt), valuei)
- proc isSsl*(socket: AsyncSocket): bool =
- ## Determines whether `socket` is a SSL socket.
- socket.isSsl
- proc getFd*(socket: AsyncSocket): SocketHandle =
- ## Returns the socket's file descriptor.
- return socket.fd
- proc isClosed*(socket: AsyncSocket): bool =
- ## Determines whether the socket has been closed.
- return socket.closed
- proc sendTo*(socket: AsyncSocket, address: string, port: Port, data: string,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void])
- {.async, since: (1, 3).} =
- ## This proc sends `data` to the specified `address`, which may be an IP
- ## address or a hostname. If a hostname is specified this function will try
- ## each IP of that hostname. The returned future will complete once all data
- ## has been sent.
- ##
- ## If an error occurs an OSError exception will be raised.
- ##
- ## This proc is normally used with connectionless sockets (UDP sockets).
- assert(socket.protocol != IPPROTO_TCP,
- "Cannot `sendTo` on a TCP socket. Use `send` instead")
- assert(not socket.closed, "Cannot `sendTo` on a closed socket")
- let aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
- socket.protocol)
- var
- it = aiList
- success = false
- lastException: ref Exception
- while it != nil:
- let fut = sendTo(socket.fd.AsyncFD, cstring(data), len(data), it.ai_addr,
- it.ai_addrlen.SockLen, flags)
- yield fut
- if not fut.failed:
- success = true
- break
- lastException = fut.readError()
- it = it.ai_next
- freeAddrInfo(aiList)
- if not success:
- if lastException != nil:
- raise lastException
- else:
- raise newException(IOError, "Couldn't resolve address: " & address)
- proc recvFrom*(socket: AsyncSocket, data: FutureVar[string], size: int,
- address: FutureVar[string], port: FutureVar[Port],
- flags = {SocketFlag.SafeDisconn}): owned(Future[int])
- {.async, since: (1, 3).} =
- ## Receives a datagram data from `socket` into `data`, which must be at
- ## least of size `size`. The address and port of datagram's sender will be
- ## stored into `address` and `port`, respectively. Returned future will
- ## complete once one datagram has been received, and will return size of
- ## packet received.
- ##
- ## If an error occurs an OSError exception will be raised.
- ##
- ## This proc is normally used with connectionless sockets (UDP sockets).
- ##
- ## **Notes**
- ## * `data` must be initialized to the length of `size`.
- ## * `address` must be initialized to 46 in length.
- template adaptRecvFromToDomain(domain: Domain) =
- var lAddr = sizeof(sAddr).SockLen
- result = await recvFromInto(AsyncFD(getFd(socket)), cstring(data.mget()), size,
- cast[ptr SockAddr](addr sAddr), addr lAddr,
- flags)
- data.mget().setLen(result)
- data.complete()
- getAddrString(cast[ptr SockAddr](addr sAddr), address.mget())
- address.complete()
- when domain == AF_INET6:
- port.complete(ntohs(sAddr.sin6_port).Port)
- else:
- port.complete(ntohs(sAddr.sin_port).Port)
- assert(socket.protocol != IPPROTO_TCP,
- "Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead")
- assert(not socket.closed, "Cannot `recvFrom` on a closed socket")
- assert(size == len(data.mget()),
- "`date` was not initialized correctly. `size` != `len(data.mget())`")
- assert(46 == len(address.mget()),
- "`address` was not initialized correctly. 46 != `len(address.mget())`")
- case socket.domain
- of AF_INET6:
- var sAddr: Sockaddr_in6
- adaptRecvFromToDomain(AF_INET6)
- of AF_INET:
- var sAddr: Sockaddr_in
- adaptRecvFromToDomain(AF_INET)
- else:
- raise newException(ValueError, "Unknown socket address family")
- proc recvFrom*(socket: AsyncSocket, size: int,
- flags = {SocketFlag.SafeDisconn}):
- owned(Future[tuple[data: string, address: string, port: Port]])
- {.async, since: (1, 3).} =
- ## Receives a datagram data from `socket`, which must be at least of size
- ## `size`. Returned future will complete once one datagram has been received
- ## and will return tuple with: data of packet received; and address and port
- ## of datagram's sender.
- ##
- ## If an error occurs an OSError exception will be raised.
- ##
- ## This proc is normally used with connectionless sockets (UDP sockets).
- var
- data = newFutureVar[string]()
- address = newFutureVar[string]()
- port = newFutureVar[Port]()
- data.mget().setLen(size)
- address.mget().setLen(46)
- let read = await recvFrom(socket, data, size, address, port, flags)
- result = (data.mget(), address.mget(), port.mget())
- when not defined(testing) and isMainModule:
- type
- TestCases = enum
- HighClient, LowClient, LowServer
- const test = HighClient
- when test == HighClient:
- proc main() {.async.} =
- var sock = newAsyncSocket()
- await sock.connect("irc.freenode.net", Port(6667))
- while true:
- let line = await sock.recvLine()
- if line == "":
- echo("Disconnected")
- break
- else:
- echo("Got line: ", line)
- asyncCheck main()
- elif test == LowClient:
- var sock = newAsyncSocket()
- var f = connect(sock, "irc.freenode.net", Port(6667))
- f.callback =
- proc (future: Future[void]) =
- echo("Connected in future!")
- for i in 0 .. 50:
- var recvF = recv(sock, 10)
- recvF.callback =
- proc (future: Future[string]) =
- echo("Read ", future.read.len, ": ", future.read.repr)
- elif test == LowServer:
- var sock = newAsyncSocket()
- sock.bindAddr(Port(6667))
- sock.listen()
- proc onAccept(future: Future[AsyncSocket]) =
- let client = future.read
- echo "Accepted ", client.fd.cint
- var t = send(client, "test\c\L")
- t.callback =
- proc (future: Future[void]) =
- echo("Send")
- client.close()
- var f = accept(sock)
- f.callback = onAccept
- var f = accept(sock)
- f.callback = onAccept
- runForever()
|