asyncio.nim 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2012 Andreas Rumpf, Dominik Picheta
  5. # See the file "copying.txt", included in this
  6. # distribution, for details about the copyright.
  7. #
  8. include "system/inclrtl"
  9. import sockets, os
  10. ##
  11. ## **Warning:** This module is deprecated since version 0.10.2.
  12. ## Use the brand new `asyncdispatch <asyncdispatch.html>`_ module together
  13. ## with the `asyncnet <asyncnet.html>`_ module.
  14. ## This module implements an asynchronous event loop together with asynchronous
  15. ## sockets which use this event loop.
  16. ## It is akin to Python's asyncore module. Many modules that use sockets
  17. ## have an implementation for this module, those modules should all have a
  18. ## ``register`` function which you should use to add the desired objects to a
  19. ## dispatcher which you created so
  20. ## that you can receive the events associated with that module's object.
  21. ##
  22. ## Once everything is registered in a dispatcher, you need to call the ``poll``
  23. ## function in a while loop.
  24. ##
  25. ## **Note:** Most modules have tasks which need to be ran regularly, this is
  26. ## why you should not call ``poll`` with a infinite timeout, or even a
  27. ## very long one. In most cases the default timeout is fine.
  28. ##
  29. ## **Note:** This module currently only supports select(), this is limited by
  30. ## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024
  31. ## sockets at a time.
  32. ##
  33. ## Most (if not all) modules that use asyncio provide a userArg which is passed
  34. ## on with the events. The type that you set userArg to must be inheriting from
  35. ## ``RootObj``!
  36. ##
  37. ## **Note:** If you want to provide async ability to your module please do not
  38. ## use the ``Delegate`` object, instead use ``AsyncSocket``. It is possible
  39. ## that in the future this type's fields will not be exported therefore breaking
  40. ## your code.
  41. ##
  42. ## **Warning:** The API of this module is unstable, and therefore is subject
  43. ## to change.
  44. ##
  45. ## Asynchronous sockets
  46. ## ====================
  47. ##
  48. ## For most purposes you do not need to worry about the ``Delegate`` type. The
  49. ## ``AsyncSocket`` is what you are after. It's a reference to
  50. ## the ``AsyncSocketObj`` object. This object defines events which you should
  51. ## overwrite by your own procedures.
  52. ##
  53. ## For server sockets the only event you need to worry about is the ``handleAccept``
  54. ## event, in your handleAccept proc you should call ``accept`` on the server
  55. ## socket which will give you the client which is connecting. You should then
  56. ## set any events that you want to use on that client and add it to your dispatcher
  57. ## using the ``register`` procedure.
  58. ##
  59. ## An example ``handleAccept`` follows:
  60. ##
  61. ## .. code-block:: nim
  62. ##
  63. ## var disp = newDispatcher()
  64. ## ...
  65. ## proc handleAccept(s: AsyncSocket) =
  66. ## echo("Accepted client.")
  67. ## var client: AsyncSocket
  68. ## new(client)
  69. ## s.accept(client)
  70. ## client.handleRead = ...
  71. ## disp.register(client)
  72. ## ...
  73. ##
  74. ## For client sockets you should only be interested in the ``handleRead`` and
  75. ## ``handleConnect`` events. The former gets called whenever the socket has
  76. ## received messages and can be read from and the latter gets called whenever
  77. ## the socket has established a connection to a server socket; from that point
  78. ## it can be safely written to.
  79. ##
  80. ## Getting a blocking client from an AsyncSocket
  81. ## =============================================
  82. ##
  83. ## If you need a asynchronous server socket but you wish to process the clients
  84. ## synchronously then you can use the ``getSocket`` converter to get
  85. ## a ``Socket`` from the ``AsyncSocket`` object, this can then be combined
  86. ## with ``accept`` like so:
  87. ##
  88. ## .. code-block:: nim
  89. ##
  90. ## proc handleAccept(s: AsyncSocket) =
  91. ## var client: Socket
  92. ## getSocket(s).accept(client)
  93. {.deprecated.}
  94. when defined(windows):
  95. from winlean import TimeVal, SocketHandle, FD_SET, FD_ZERO, TFdSet,
  96. FD_ISSET, select
  97. else:
  98. from posix import TimeVal, Time, Suseconds, SocketHandle, FD_SET, FD_ZERO,
  99. TFdSet, FD_ISSET, select
  100. type
  101. DelegateObj* = object
  102. fd*: SocketHandle
  103. deleVal*: RootRef
  104. handleRead*: proc (h: RootRef) {.nimcall, gcsafe.}
  105. handleWrite*: proc (h: RootRef) {.nimcall, gcsafe.}
  106. handleError*: proc (h: RootRef) {.nimcall, gcsafe.}
  107. hasDataBuffered*: proc (h: RootRef): bool {.nimcall, gcsafe.}
  108. open*: bool
  109. task*: proc (h: RootRef) {.nimcall, gcsafe.}
  110. mode*: FileMode
  111. Delegate* = ref DelegateObj
  112. Dispatcher* = ref DispatcherObj
  113. DispatcherObj = object
  114. delegates: seq[Delegate]
  115. AsyncSocket* = ref AsyncSocketObj
  116. AsyncSocketObj* = object of RootObj
  117. socket: Socket
  118. info: SocketStatus
  119. handleRead*: proc (s: AsyncSocket) {.closure, gcsafe.}
  120. handleWrite: proc (s: AsyncSocket) {.closure, gcsafe.}
  121. handleConnect*: proc (s: AsyncSocket) {.closure, gcsafe.}
  122. handleAccept*: proc (s: AsyncSocket) {.closure, gcsafe.}
  123. handleTask*: proc (s: AsyncSocket) {.closure, gcsafe.}
  124. lineBuffer: TaintedString ## Temporary storage for ``readLine``
  125. sendBuffer: string ## Temporary storage for ``send``
  126. sslNeedAccept: bool
  127. proto: Protocol
  128. deleg: Delegate
  129. SocketStatus* = enum
  130. SockIdle, SockConnecting, SockConnected, SockListening, SockClosed,
  131. SockUDPBound
  132. {.deprecated: [TDelegate: DelegateObj, PDelegate: Delegate,
  133. TInfo: SocketStatus, PAsyncSocket: AsyncSocket, TAsyncSocket: AsyncSocketObj,
  134. TDispatcher: DispatcherObj, PDispatcher: Dispatcher,
  135. ].}
  136. proc newDelegate*(): Delegate =
  137. ## Creates a new delegate.
  138. new(result)
  139. result.handleRead = (proc (h: RootRef) = discard)
  140. result.handleWrite = (proc (h: RootRef) = discard)
  141. result.handleError = (proc (h: RootRef) = discard)
  142. result.hasDataBuffered = (proc (h: RootRef): bool = return false)
  143. result.task = (proc (h: RootRef) = discard)
  144. result.mode = fmRead
  145. proc newAsyncSocket(): AsyncSocket =
  146. new(result)
  147. result.info = SockIdle
  148. result.handleRead = (proc (s: AsyncSocket) = discard)
  149. result.handleWrite = nil
  150. result.handleConnect = (proc (s: AsyncSocket) = discard)
  151. result.handleAccept = (proc (s: AsyncSocket) = discard)
  152. result.handleTask = (proc (s: AsyncSocket) = discard)
  153. result.lineBuffer = "".TaintedString
  154. result.sendBuffer = ""
  155. proc asyncSocket*(domain: Domain = AF_INET, typ: SockType = SOCK_STREAM,
  156. protocol: Protocol = IPPROTO_TCP,
  157. buffered = true): AsyncSocket =
  158. ## Initialises an AsyncSocket object. If a socket cannot be initialised
  159. ## OSError is raised.
  160. result = newAsyncSocket()
  161. result.socket = socket(domain, typ, protocol, buffered)
  162. result.proto = protocol
  163. if result.socket == invalidSocket: raiseOSError(osLastError())
  164. result.socket.setBlocking(false)
  165. proc toAsyncSocket*(sock: Socket, state: SocketStatus = SockConnected): AsyncSocket =
  166. ## Wraps an already initialized ``Socket`` into a AsyncSocket.
  167. ## This is useful if you want to use an already connected Socket as an
  168. ## asynchronous AsyncSocket in asyncio's event loop.
  169. ##
  170. ## ``state`` may be overriden, i.e. if ``sock`` is not connected it should be
  171. ## adjusted properly. By default it will be assumed that the socket is
  172. ## connected. Please note this is only applicable to TCP client sockets, if
  173. ## ``sock`` is a different type of socket ``state`` needs to be adjusted!!!
  174. ##
  175. ## ================ ================================================================
  176. ## Value Meaning
  177. ## ================ ================================================================
  178. ## SockIdle Socket has only just been initialised, not connected or closed.
  179. ## SockConnected Socket is connected to a server.
  180. ## SockConnecting Socket is in the process of connecting to a server.
  181. ## SockListening Socket is a server socket and is listening for connections.
  182. ## SockClosed Socket has been closed.
  183. ## SockUDPBound Socket is a UDP socket which is listening for data.
  184. ## ================ ================================================================
  185. ##
  186. ## **Warning**: If ``state`` is set incorrectly the resulting ``AsyncSocket``
  187. ## object may not work properly.
  188. ##
  189. ## **Note**: This will set ``sock`` to be non-blocking.
  190. result = newAsyncSocket()
  191. result.socket = sock
  192. result.proto = if state == SockUDPBound: IPPROTO_UDP else: IPPROTO_TCP
  193. result.socket.setBlocking(false)
  194. result.info = state
  195. proc asyncSockHandleRead(h: RootRef) =
  196. when defined(ssl):
  197. if AsyncSocket(h).socket.isSSL and not
  198. AsyncSocket(h).socket.gotHandshake:
  199. return
  200. if AsyncSocket(h).info != SockListening:
  201. if AsyncSocket(h).info != SockConnecting:
  202. AsyncSocket(h).handleRead(AsyncSocket(h))
  203. else:
  204. AsyncSocket(h).handleAccept(AsyncSocket(h))
  205. proc close*(sock: AsyncSocket) {.gcsafe.}
  206. proc asyncSockHandleWrite(h: RootRef) =
  207. when defined(ssl):
  208. if AsyncSocket(h).socket.isSSL and not
  209. AsyncSocket(h).socket.gotHandshake:
  210. return
  211. if AsyncSocket(h).info == SockConnecting:
  212. AsyncSocket(h).handleConnect(AsyncSocket(h))
  213. AsyncSocket(h).info = SockConnected
  214. # Stop receiving write events if there is no handleWrite event.
  215. if AsyncSocket(h).handleWrite == nil:
  216. AsyncSocket(h).deleg.mode = fmRead
  217. else:
  218. AsyncSocket(h).deleg.mode = fmReadWrite
  219. else:
  220. if AsyncSocket(h).sendBuffer != "":
  221. let sock = AsyncSocket(h)
  222. try:
  223. let bytesSent = sock.socket.sendAsync(sock.sendBuffer)
  224. if bytesSent == 0:
  225. # Apparently the socket cannot be written to. Even though select
  226. # just told us that it can be... This used to be an assert. Just
  227. # do nothing instead.
  228. discard
  229. elif bytesSent != sock.sendBuffer.len:
  230. sock.sendBuffer = sock.sendBuffer[bytesSent .. ^1]
  231. elif bytesSent == sock.sendBuffer.len:
  232. sock.sendBuffer = ""
  233. if AsyncSocket(h).handleWrite != nil:
  234. AsyncSocket(h).handleWrite(AsyncSocket(h))
  235. except OSError:
  236. # Most likely the socket closed before the full buffer could be sent to it.
  237. sock.close() # TODO: Provide a handleError for users?
  238. else:
  239. if AsyncSocket(h).handleWrite != nil:
  240. AsyncSocket(h).handleWrite(AsyncSocket(h))
  241. else:
  242. AsyncSocket(h).deleg.mode = fmRead
  243. when defined(ssl):
  244. proc asyncSockDoHandshake(h: RootRef) {.gcsafe.} =
  245. if AsyncSocket(h).socket.isSSL and not
  246. AsyncSocket(h).socket.gotHandshake:
  247. if AsyncSocket(h).sslNeedAccept:
  248. var d = ""
  249. let ret = AsyncSocket(h).socket.acceptAddrSSL(AsyncSocket(h).socket, d)
  250. assert ret != AcceptNoClient
  251. if ret == AcceptSuccess:
  252. AsyncSocket(h).info = SockConnected
  253. else:
  254. # handshake will set socket's ``sslNoHandshake`` field.
  255. discard AsyncSocket(h).socket.handshake()
  256. proc asyncSockTask(h: RootRef) =
  257. when defined(ssl):
  258. h.asyncSockDoHandshake()
  259. AsyncSocket(h).handleTask(AsyncSocket(h))
  260. proc toDelegate(sock: AsyncSocket): Delegate =
  261. result = newDelegate()
  262. result.deleVal = sock
  263. result.fd = getFD(sock.socket)
  264. # We need this to get write events, just to know when the socket connects.
  265. result.mode = fmReadWrite
  266. result.handleRead = asyncSockHandleRead
  267. result.handleWrite = asyncSockHandleWrite
  268. result.task = asyncSockTask
  269. # TODO: Errors?
  270. #result.handleError = (proc (h: PObject) = assert(false))
  271. result.hasDataBuffered =
  272. proc (h: RootRef): bool {.nimcall.} =
  273. return AsyncSocket(h).socket.hasDataBuffered()
  274. sock.deleg = result
  275. if sock.info notin {SockIdle, SockClosed}:
  276. sock.deleg.open = true
  277. else:
  278. sock.deleg.open = false
  279. proc connect*(sock: AsyncSocket, name: string, port = Port(0),
  280. af: Domain = AF_INET) =
  281. ## Begins connecting ``sock`` to ``name``:``port``.
  282. sock.socket.connectAsync(name, port, af)
  283. sock.info = SockConnecting
  284. if sock.deleg != nil:
  285. sock.deleg.open = true
  286. proc close*(sock: AsyncSocket) =
  287. ## Closes ``sock``. Terminates any current connections.
  288. sock.socket.close()
  289. sock.info = SockClosed
  290. if sock.deleg != nil:
  291. sock.deleg.open = false
  292. proc bindAddr*(sock: AsyncSocket, port = Port(0), address = "") =
  293. ## Equivalent to ``sockets.bindAddr``.
  294. sock.socket.bindAddr(port, address)
  295. if sock.proto == IPPROTO_UDP:
  296. sock.info = SockUDPBound
  297. if sock.deleg != nil:
  298. sock.deleg.open = true
  299. proc listen*(sock: AsyncSocket) =
  300. ## Equivalent to ``sockets.listen``.
  301. sock.socket.listen()
  302. sock.info = SockListening
  303. if sock.deleg != nil:
  304. sock.deleg.open = true
  305. proc acceptAddr*(server: AsyncSocket, client: var AsyncSocket,
  306. address: var string) =
  307. ## Equivalent to ``sockets.acceptAddr``. This procedure should be called in
  308. ## a ``handleAccept`` event handler **only** once.
  309. ##
  310. ## **Note**: ``client`` needs to be initialised.
  311. assert(client != nil)
  312. client = newAsyncSocket()
  313. var c: Socket
  314. new(c)
  315. when defined(ssl):
  316. if server.socket.isSSL:
  317. var ret = server.socket.acceptAddrSSL(c, address)
  318. # The following shouldn't happen because when this function is called
  319. # it is guaranteed that there is a client waiting.
  320. # (This should be called in handleAccept)
  321. assert(ret != AcceptNoClient)
  322. if ret == AcceptNoHandshake:
  323. client.sslNeedAccept = true
  324. else:
  325. client.sslNeedAccept = false
  326. client.info = SockConnected
  327. else:
  328. server.socket.acceptAddr(c, address)
  329. client.sslNeedAccept = false
  330. client.info = SockConnected
  331. else:
  332. server.socket.acceptAddr(c, address)
  333. client.sslNeedAccept = false
  334. client.info = SockConnected
  335. if c == invalidSocket: raiseSocketError(server.socket)
  336. c.setBlocking(false) # TODO: Needs to be tested.
  337. # deleg.open is set in ``toDelegate``.
  338. client.socket = c
  339. client.lineBuffer = "".TaintedString
  340. client.sendBuffer = ""
  341. client.info = SockConnected
  342. proc accept*(server: AsyncSocket, client: var AsyncSocket) =
  343. ## Equivalent to ``sockets.accept``.
  344. var dummyAddr = ""
  345. server.acceptAddr(client, dummyAddr)
  346. proc acceptAddr*(server: AsyncSocket): tuple[sock: AsyncSocket,
  347. address: string] {.deprecated.} =
  348. ## Equivalent to ``sockets.acceptAddr``.
  349. ##
  350. ## **Deprecated since version 0.9.0:** Please use the function above.
  351. var client = newAsyncSocket()
  352. var address: string = ""
  353. acceptAddr(server, client, address)
  354. return (client, address)
  355. proc accept*(server: AsyncSocket): AsyncSocket {.deprecated.} =
  356. ## Equivalent to ``sockets.accept``.
  357. ##
  358. ## **Deprecated since version 0.9.0:** Please use the function above.
  359. new(result)
  360. var address = ""
  361. server.acceptAddr(result, address)
  362. proc newDispatcher*(): Dispatcher =
  363. new(result)
  364. result.delegates = @[]
  365. proc register*(d: Dispatcher, deleg: Delegate) =
  366. ## Registers delegate ``deleg`` with dispatcher ``d``.
  367. d.delegates.add(deleg)
  368. proc register*(d: Dispatcher, sock: AsyncSocket): Delegate {.discardable.} =
  369. ## Registers async socket ``sock`` with dispatcher ``d``.
  370. result = sock.toDelegate()
  371. d.register(result)
  372. proc unregister*(d: Dispatcher, deleg: Delegate) =
  373. ## Unregisters deleg ``deleg`` from dispatcher ``d``.
  374. for i in 0..len(d.delegates)-1:
  375. if d.delegates[i] == deleg:
  376. d.delegates.del(i)
  377. return
  378. raise newException(IndexError, "Could not find delegate.")
  379. proc isWriteable*(s: AsyncSocket): bool =
  380. ## Determines whether socket ``s`` is ready to be written to.
  381. var writeSock = @[s.socket]
  382. return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
  383. converter getSocket*(s: AsyncSocket): Socket =
  384. return s.socket
  385. proc isConnected*(s: AsyncSocket): bool =
  386. ## Determines whether ``s`` is connected.
  387. return s.info == SockConnected
  388. proc isListening*(s: AsyncSocket): bool =
  389. ## Determines whether ``s`` is listening for incoming connections.
  390. return s.info == SockListening
  391. proc isConnecting*(s: AsyncSocket): bool =
  392. ## Determines whether ``s`` is connecting.
  393. return s.info == SockConnecting
  394. proc isClosed*(s: AsyncSocket): bool =
  395. ## Determines whether ``s`` has been closed.
  396. return s.info == SockClosed
  397. proc isSendDataBuffered*(s: AsyncSocket): bool =
  398. ## Determines whether ``s`` has data waiting to be sent, i.e. whether this
  399. ## socket's sendBuffer contains data.
  400. return s.sendBuffer.len != 0
  401. proc setHandleWrite*(s: AsyncSocket,
  402. handleWrite: proc (s: AsyncSocket) {.closure, gcsafe.}) =
  403. ## Setter for the ``handleWrite`` event.
  404. ##
  405. ## To remove this event you should use the ``delHandleWrite`` function.
  406. ## It is advised to use that function instead of just setting the event to
  407. ## ``proc (s: AsyncSocket) = nil`` as that would mean that that function
  408. ## would be called constantly.
  409. s.deleg.mode = fmReadWrite
  410. s.handleWrite = handleWrite
  411. proc delHandleWrite*(s: AsyncSocket) =
  412. ## Removes the ``handleWrite`` event handler on ``s``.
  413. s.handleWrite = nil
  414. {.push warning[deprecated]: off.}
  415. proc recvLine*(s: AsyncSocket, line: var TaintedString): bool {.deprecated.} =
  416. ## Behaves similar to ``sockets.recvLine``, however it handles non-blocking
  417. ## sockets properly. This function guarantees that ``line`` is a full line,
  418. ## if this function can only retrieve some data; it will save this data and
  419. ## add it to the result when a full line is retrieved.
  420. ##
  421. ## Unlike ``sockets.recvLine`` this function will raise an OSError or SslError
  422. ## exception if an error occurs.
  423. ##
  424. ## **Deprecated since version 0.9.2**: This function has been deprecated in
  425. ## favour of readLine.
  426. setLen(line.string, 0)
  427. var dataReceived = "".TaintedString
  428. var ret = s.socket.recvLineAsync(dataReceived)
  429. case ret
  430. of RecvFullLine:
  431. if s.lineBuffer.len > 0:
  432. string(line).add(s.lineBuffer.string)
  433. setLen(s.lineBuffer.string, 0)
  434. string(line).add(dataReceived.string)
  435. if string(line) == "":
  436. line = "\c\L".TaintedString
  437. result = true
  438. of RecvPartialLine:
  439. string(s.lineBuffer).add(dataReceived.string)
  440. result = false
  441. of RecvDisconnected:
  442. result = true
  443. of RecvFail:
  444. s.raiseSocketError(async = true)
  445. result = false
  446. {.pop.}
  447. proc readLine*(s: AsyncSocket, line: var TaintedString): bool =
  448. ## Behaves similar to ``sockets.readLine``, however it handles non-blocking
  449. ## sockets properly. This function guarantees that ``line`` is a full line,
  450. ## if this function can only retrieve some data; it will save this data and
  451. ## add it to the result when a full line is retrieved, when this happens
  452. ## False will be returned. True will only be returned if a full line has been
  453. ## retrieved or the socket has been disconnected in which case ``line`` will
  454. ## be set to "".
  455. ##
  456. ## This function will raise an OSError exception when a socket error occurs.
  457. setLen(line.string, 0)
  458. var dataReceived = "".TaintedString
  459. var ret = s.socket.readLineAsync(dataReceived)
  460. case ret
  461. of ReadFullLine:
  462. if s.lineBuffer.len > 0:
  463. string(line).add(s.lineBuffer.string)
  464. setLen(s.lineBuffer.string, 0)
  465. string(line).add(dataReceived.string)
  466. if string(line) == "":
  467. line = "\c\L".TaintedString
  468. result = true
  469. of ReadPartialLine:
  470. string(s.lineBuffer).add(dataReceived.string)
  471. result = false
  472. of ReadNone:
  473. result = false
  474. of ReadDisconnected:
  475. result = true
  476. proc send*(sock: AsyncSocket, data: string) =
  477. ## Sends ``data`` to socket ``sock``. This is basically a nicer implementation
  478. ## of ``sockets.sendAsync``.
  479. ##
  480. ## If ``data`` cannot be sent immediately it will be buffered and sent
  481. ## when ``sock`` becomes writeable (during the ``handleWrite`` event).
  482. ## It's possible that only a part of ``data`` will be sent immediately, while
  483. ## the rest of it will be buffered and sent later.
  484. if sock.sendBuffer.len != 0:
  485. sock.sendBuffer.add(data)
  486. return
  487. let bytesSent = sock.socket.sendAsync(data)
  488. assert bytesSent >= 0
  489. if bytesSent == 0:
  490. sock.sendBuffer.add(data)
  491. sock.deleg.mode = fmReadWrite
  492. elif bytesSent != data.len:
  493. sock.sendBuffer.add(data[bytesSent .. ^1])
  494. sock.deleg.mode = fmReadWrite
  495. proc timeValFromMilliseconds(timeout = 500): Timeval =
  496. if timeout != -1:
  497. var seconds = timeout div 1000
  498. when defined(posix):
  499. result.tv_sec = seconds.Time
  500. result.tv_usec = ((timeout - seconds * 1000) * 1000).Suseconds
  501. else:
  502. result.tv_sec = seconds.int32
  503. result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
  504. proc createFdSet(fd: var TFdSet, s: seq[Delegate], m: var int) =
  505. FD_ZERO(fd)
  506. for i in items(s):
  507. m = max(m, int(i.fd))
  508. FD_SET(i.fd, fd)
  509. proc pruneSocketSet(s: var seq[Delegate], fd: var TFdSet) =
  510. var i = 0
  511. var L = s.len
  512. while i < L:
  513. if FD_ISSET(s[i].fd, fd) != 0'i32:
  514. s[i] = s[L-1]
  515. dec(L)
  516. else:
  517. inc(i)
  518. setLen(s, L)
  519. proc select(readfds, writefds, exceptfds: var seq[Delegate],
  520. timeout = 500): int =
  521. var tv {.noInit.}: Timeval = timeValFromMilliseconds(timeout)
  522. var rd, wr, ex: TFdSet
  523. var m = 0
  524. createFdSet(rd, readfds, m)
  525. createFdSet(wr, writefds, m)
  526. createFdSet(ex, exceptfds, m)
  527. if timeout != -1:
  528. result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), addr(tv)))
  529. else:
  530. result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), nil))
  531. pruneSocketSet(readfds, (rd))
  532. pruneSocketSet(writefds, (wr))
  533. pruneSocketSet(exceptfds, (ex))
  534. proc poll*(d: Dispatcher, timeout: int = 500): bool =
  535. ## This function checks for events on all the delegates in the `PDispatcher`.
  536. ## It then proceeds to call the correct event handler.
  537. ##
  538. ## This function returns ``True`` if there are file descriptors that are still
  539. ## open, otherwise ``False``. File descriptors that have been
  540. ## closed are immediately removed from the dispatcher automatically.
  541. ##
  542. ## **Note:** Each delegate has a task associated with it. This gets called
  543. ## after each select() call, if you set timeout to ``-1`` the tasks will
  544. ## only be executed after one or more file descriptors becomes readable or
  545. ## writeable.
  546. result = true
  547. var readDg, writeDg, errorDg: seq[Delegate] = @[]
  548. var len = d.delegates.len
  549. var dc = 0
  550. while dc < len:
  551. let deleg = d.delegates[dc]
  552. if (deleg.mode != fmWrite or deleg.mode != fmAppend) and deleg.open:
  553. readDg.add(deleg)
  554. if (deleg.mode != fmRead) and deleg.open:
  555. writeDg.add(deleg)
  556. if deleg.open:
  557. errorDg.add(deleg)
  558. inc dc
  559. else:
  560. # File/socket has been closed. Remove it from dispatcher.
  561. d.delegates[dc] = d.delegates[len-1]
  562. dec len
  563. d.delegates.setLen(len)
  564. var hasDataBufferedCount = 0
  565. for d in d.delegates:
  566. if d.hasDataBuffered(d.deleVal):
  567. hasDataBufferedCount.inc()
  568. d.handleRead(d.deleVal)
  569. if hasDataBufferedCount > 0: return true
  570. if readDg.len() == 0 and writeDg.len() == 0:
  571. ## TODO: Perhaps this shouldn't return if errorDg has something?
  572. return false
  573. if select(readDg, writeDg, errorDg, timeout) != 0:
  574. for i in 0..len(d.delegates)-1:
  575. if i > len(d.delegates)-1: break # One delegate might've been removed.
  576. let deleg = d.delegates[i]
  577. if not deleg.open: continue # This delegate might've been closed.
  578. if (deleg.mode != fmWrite or deleg.mode != fmAppend) and
  579. deleg notin readDg:
  580. deleg.handleRead(deleg.deleVal)
  581. if (deleg.mode != fmRead) and deleg notin writeDg:
  582. deleg.handleWrite(deleg.deleVal)
  583. if deleg notin errorDg:
  584. deleg.handleError(deleg.deleVal)
  585. # Execute tasks
  586. for i in items(d.delegates):
  587. i.task(i.deleVal)
  588. proc len*(disp: Dispatcher): int =
  589. ## Retrieves the amount of delegates in ``disp``.
  590. return disp.delegates.len
  591. when not defined(testing) and isMainModule:
  592. proc testConnect(s: AsyncSocket, no: int) =
  593. echo("Connected! " & $no)
  594. proc testRead(s: AsyncSocket, no: int) =
  595. echo("Reading! " & $no)
  596. var data = ""
  597. if not s.readLine(data): return
  598. if data == "":
  599. echo("Closing connection. " & $no)
  600. s.close()
  601. echo(data)
  602. echo("Finished reading! " & $no)
  603. proc testAccept(s: AsyncSocket, disp: Dispatcher, no: int) =
  604. echo("Accepting client! " & $no)
  605. var client: AsyncSocket
  606. new(client)
  607. var address = ""
  608. s.acceptAddr(client, address)
  609. echo("Accepted ", address)
  610. client.handleRead =
  611. proc (s: AsyncSocket) =
  612. testRead(s, 2)
  613. disp.register(client)
  614. proc main =
  615. var d = newDispatcher()
  616. var s = asyncSocket()
  617. s.connect("amber.tenthbit.net", Port(6667))
  618. s.handleConnect =
  619. proc (s: AsyncSocket) =
  620. testConnect(s, 1)
  621. s.handleRead =
  622. proc (s: AsyncSocket) =
  623. testRead(s, 1)
  624. d.register(s)
  625. var server = asyncSocket()
  626. server.handleAccept =
  627. proc (s: AsyncSocket) =
  628. testAccept(s, d, 78)
  629. server.bindAddr(Port(5555))
  630. server.listen()
  631. d.register(server)
  632. while d.poll(-1): discard
  633. main()