asyncio.nim 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2012 Andreas Rumpf, Dominik Picheta
  5. # See the file "copying.txt", included in this
  6. # distribution, for details about the copyright.
  7. #
  8. include "system/inclrtl"
  9. import sockets, os
  10. ##
  11. ## **Warning:** This module is deprecated since version 0.10.2.
  12. ## Use the brand new `asyncdispatch <asyncdispatch.html>`_ module together
  13. ## with the `asyncnet <asyncnet.html>`_ module.
  14. ## This module implements an asynchronous event loop together with asynchronous
  15. ## sockets which use this event loop.
  16. ## It is akin to Python's asyncore module. Many modules that use sockets
  17. ## have an implementation for this module, those modules should all have a
  18. ## ``register`` function which you should use to add the desired objects to a
  19. ## dispatcher which you created so
  20. ## that you can receive the events associated with that module's object.
  21. ##
  22. ## Once everything is registered in a dispatcher, you need to call the ``poll``
  23. ## function in a while loop.
  24. ##
  25. ## **Note:** Most modules have tasks which need to be ran regularly, this is
  26. ## why you should not call ``poll`` with a infinite timeout, or even a
  27. ## very long one. In most cases the default timeout is fine.
  28. ##
  29. ## **Note:** This module currently only supports select(), this is limited by
  30. ## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024
  31. ## sockets at a time.
  32. ##
  33. ## Most (if not all) modules that use asyncio provide a userArg which is passed
  34. ## on with the events. The type that you set userArg to must be inheriting from
  35. ## ``RootObj``!
  36. ##
  37. ## **Note:** If you want to provide async ability to your module please do not
  38. ## use the ``Delegate`` object, instead use ``AsyncSocket``. It is possible
  39. ## that in the future this type's fields will not be exported therefore breaking
  40. ## your code.
  41. ##
  42. ## **Warning:** The API of this module is unstable, and therefore is subject
  43. ## to change.
  44. ##
  45. ## Asynchronous sockets
  46. ## ====================
  47. ##
  48. ## For most purposes you do not need to worry about the ``Delegate`` type. The
  49. ## ``AsyncSocket`` is what you are after. It's a reference to
  50. ## the ``AsyncSocketObj`` object. This object defines events which you should
  51. ## overwrite by your own procedures.
  52. ##
  53. ## For server sockets the only event you need to worry about is the ``handleAccept``
  54. ## event, in your handleAccept proc you should call ``accept`` on the server
  55. ## socket which will give you the client which is connecting. You should then
  56. ## set any events that you want to use on that client and add it to your dispatcher
  57. ## using the ``register`` procedure.
  58. ##
  59. ## An example ``handleAccept`` follows:
  60. ##
  61. ## .. code-block:: nim
  62. ##
  63. ## var disp = newDispatcher()
  64. ## ...
  65. ## proc handleAccept(s: AsyncSocket) =
  66. ## echo("Accepted client.")
  67. ## var client: AsyncSocket
  68. ## new(client)
  69. ## s.accept(client)
  70. ## client.handleRead = ...
  71. ## disp.register(client)
  72. ## ...
  73. ##
  74. ## For client sockets you should only be interested in the ``handleRead`` and
  75. ## ``handleConnect`` events. The former gets called whenever the socket has
  76. ## received messages and can be read from and the latter gets called whenever
  77. ## the socket has established a connection to a server socket; from that point
  78. ## it can be safely written to.
  79. ##
  80. ## Getting a blocking client from an AsyncSocket
  81. ## =============================================
  82. ##
  83. ## If you need a asynchronous server socket but you wish to process the clients
  84. ## synchronously then you can use the ``getSocket`` converter to get
  85. ## a ``Socket`` from the ``AsyncSocket`` object, this can then be combined
  86. ## with ``accept`` like so:
  87. ##
  88. ## .. code-block:: nim
  89. ##
  90. ## proc handleAccept(s: AsyncSocket) =
  91. ## var client: Socket
  92. ## getSocket(s).accept(client)
  93. {.deprecated.}
  94. when defined(windows):
  95. from winlean import TimeVal, SocketHandle, FD_SET, FD_ZERO, TFdSet,
  96. FD_ISSET, select
  97. else:
  98. from posix import TimeVal, Time, Suseconds, SocketHandle, FD_SET, FD_ZERO,
  99. TFdSet, FD_ISSET, select
  100. type
  101. DelegateObj* = object
  102. fd*: SocketHandle
  103. deleVal*: RootRef
  104. handleRead*: proc (h: RootRef) {.nimcall, gcsafe.}
  105. handleWrite*: proc (h: RootRef) {.nimcall, gcsafe.}
  106. handleError*: proc (h: RootRef) {.nimcall, gcsafe.}
  107. hasDataBuffered*: proc (h: RootRef): bool {.nimcall, gcsafe.}
  108. open*: bool
  109. task*: proc (h: RootRef) {.nimcall, gcsafe.}
  110. mode*: FileMode
  111. Delegate* = ref DelegateObj
  112. Dispatcher* = ref DispatcherObj
  113. DispatcherObj = object
  114. delegates: seq[Delegate]
  115. AsyncSocket* = ref AsyncSocketObj
  116. AsyncSocketObj* = object of RootObj
  117. socket: Socket
  118. info: SocketStatus
  119. handleRead*: proc (s: AsyncSocket) {.closure, gcsafe.}
  120. handleWrite: proc (s: AsyncSocket) {.closure, gcsafe.}
  121. handleConnect*: proc (s: AsyncSocket) {.closure, gcsafe.}
  122. handleAccept*: proc (s: AsyncSocket) {.closure, gcsafe.}
  123. handleTask*: proc (s: AsyncSocket) {.closure, gcsafe.}
  124. lineBuffer: TaintedString ## Temporary storage for ``readLine``
  125. sendBuffer: string ## Temporary storage for ``send``
  126. sslNeedAccept: bool
  127. proto: Protocol
  128. deleg: Delegate
  129. SocketStatus* = enum
  130. SockIdle, SockConnecting, SockConnected, SockListening, SockClosed,
  131. SockUDPBound
  132. proc newDelegate*(): Delegate =
  133. ## Creates a new delegate.
  134. new(result)
  135. result.handleRead = (proc (h: RootRef) = discard)
  136. result.handleWrite = (proc (h: RootRef) = discard)
  137. result.handleError = (proc (h: RootRef) = discard)
  138. result.hasDataBuffered = (proc (h: RootRef): bool = return false)
  139. result.task = (proc (h: RootRef) = discard)
  140. result.mode = fmRead
  141. proc newAsyncSocket(): AsyncSocket =
  142. new(result)
  143. result.info = SockIdle
  144. result.handleRead = (proc (s: AsyncSocket) = discard)
  145. result.handleWrite = nil
  146. result.handleConnect = (proc (s: AsyncSocket) = discard)
  147. result.handleAccept = (proc (s: AsyncSocket) = discard)
  148. result.handleTask = (proc (s: AsyncSocket) = discard)
  149. result.lineBuffer = "".TaintedString
  150. result.sendBuffer = ""
  151. proc asyncSocket*(domain: Domain = AF_INET, typ: SockType = SOCK_STREAM,
  152. protocol: Protocol = IPPROTO_TCP,
  153. buffered = true): AsyncSocket =
  154. ## Initialises an AsyncSocket object. If a socket cannot be initialised
  155. ## OSError is raised.
  156. result = newAsyncSocket()
  157. result.socket = socket(domain, typ, protocol, buffered)
  158. result.proto = protocol
  159. if result.socket == invalidSocket: raiseOSError(osLastError())
  160. result.socket.setBlocking(false)
  161. proc toAsyncSocket*(sock: Socket, state: SocketStatus = SockConnected): AsyncSocket =
  162. ## Wraps an already initialized ``Socket`` into a AsyncSocket.
  163. ## This is useful if you want to use an already connected Socket as an
  164. ## asynchronous AsyncSocket in asyncio's event loop.
  165. ##
  166. ## ``state`` may be overriden, i.e. if ``sock`` is not connected it should be
  167. ## adjusted properly. By default it will be assumed that the socket is
  168. ## connected. Please note this is only applicable to TCP client sockets, if
  169. ## ``sock`` is a different type of socket ``state`` needs to be adjusted!!!
  170. ##
  171. ## ================ ================================================================
  172. ## Value Meaning
  173. ## ================ ================================================================
  174. ## SockIdle Socket has only just been initialised, not connected or closed.
  175. ## SockConnected Socket is connected to a server.
  176. ## SockConnecting Socket is in the process of connecting to a server.
  177. ## SockListening Socket is a server socket and is listening for connections.
  178. ## SockClosed Socket has been closed.
  179. ## SockUDPBound Socket is a UDP socket which is listening for data.
  180. ## ================ ================================================================
  181. ##
  182. ## **Warning**: If ``state`` is set incorrectly the resulting ``AsyncSocket``
  183. ## object may not work properly.
  184. ##
  185. ## **Note**: This will set ``sock`` to be non-blocking.
  186. result = newAsyncSocket()
  187. result.socket = sock
  188. result.proto = if state == SockUDPBound: IPPROTO_UDP else: IPPROTO_TCP
  189. result.socket.setBlocking(false)
  190. result.info = state
  191. proc asyncSockHandleRead(h: RootRef) =
  192. when defined(ssl):
  193. if AsyncSocket(h).socket.isSSL and not
  194. AsyncSocket(h).socket.gotHandshake:
  195. return
  196. if AsyncSocket(h).info != SockListening:
  197. if AsyncSocket(h).info != SockConnecting:
  198. AsyncSocket(h).handleRead(AsyncSocket(h))
  199. else:
  200. AsyncSocket(h).handleAccept(AsyncSocket(h))
  201. proc close*(sock: AsyncSocket) {.gcsafe.}
  202. proc asyncSockHandleWrite(h: RootRef) =
  203. when defined(ssl):
  204. if AsyncSocket(h).socket.isSSL and not
  205. AsyncSocket(h).socket.gotHandshake:
  206. return
  207. if AsyncSocket(h).info == SockConnecting:
  208. AsyncSocket(h).handleConnect(AsyncSocket(h))
  209. AsyncSocket(h).info = SockConnected
  210. # Stop receiving write events if there is no handleWrite event.
  211. if AsyncSocket(h).handleWrite == nil:
  212. AsyncSocket(h).deleg.mode = fmRead
  213. else:
  214. AsyncSocket(h).deleg.mode = fmReadWrite
  215. else:
  216. if AsyncSocket(h).sendBuffer != "":
  217. let sock = AsyncSocket(h)
  218. try:
  219. let bytesSent = sock.socket.sendAsync(sock.sendBuffer)
  220. if bytesSent == 0:
  221. # Apparently the socket cannot be written to. Even though select
  222. # just told us that it can be... This used to be an assert. Just
  223. # do nothing instead.
  224. discard
  225. elif bytesSent != sock.sendBuffer.len:
  226. sock.sendBuffer = sock.sendBuffer[bytesSent .. ^1]
  227. elif bytesSent == sock.sendBuffer.len:
  228. sock.sendBuffer = ""
  229. if AsyncSocket(h).handleWrite != nil:
  230. AsyncSocket(h).handleWrite(AsyncSocket(h))
  231. except OSError:
  232. # Most likely the socket closed before the full buffer could be sent to it.
  233. sock.close() # TODO: Provide a handleError for users?
  234. else:
  235. if AsyncSocket(h).handleWrite != nil:
  236. AsyncSocket(h).handleWrite(AsyncSocket(h))
  237. else:
  238. AsyncSocket(h).deleg.mode = fmRead
  239. when defined(ssl):
  240. proc asyncSockDoHandshake(h: RootRef) {.gcsafe.} =
  241. if AsyncSocket(h).socket.isSSL and not
  242. AsyncSocket(h).socket.gotHandshake:
  243. if AsyncSocket(h).sslNeedAccept:
  244. var d = ""
  245. let ret = AsyncSocket(h).socket.acceptAddrSSL(AsyncSocket(h).socket, d)
  246. assert ret != AcceptNoClient
  247. if ret == AcceptSuccess:
  248. AsyncSocket(h).info = SockConnected
  249. else:
  250. # handshake will set socket's ``sslNoHandshake`` field.
  251. discard AsyncSocket(h).socket.handshake()
  252. proc asyncSockTask(h: RootRef) =
  253. when defined(ssl):
  254. h.asyncSockDoHandshake()
  255. AsyncSocket(h).handleTask(AsyncSocket(h))
  256. proc toDelegate(sock: AsyncSocket): Delegate =
  257. result = newDelegate()
  258. result.deleVal = sock
  259. result.fd = getFD(sock.socket)
  260. # We need this to get write events, just to know when the socket connects.
  261. result.mode = fmReadWrite
  262. result.handleRead = asyncSockHandleRead
  263. result.handleWrite = asyncSockHandleWrite
  264. result.task = asyncSockTask
  265. # TODO: Errors?
  266. #result.handleError = (proc (h: PObject) = assert(false))
  267. result.hasDataBuffered =
  268. proc (h: RootRef): bool {.nimcall.} =
  269. return AsyncSocket(h).socket.hasDataBuffered()
  270. sock.deleg = result
  271. if sock.info notin {SockIdle, SockClosed}:
  272. sock.deleg.open = true
  273. else:
  274. sock.deleg.open = false
  275. proc connect*(sock: AsyncSocket, name: string, port = Port(0),
  276. af: Domain = AF_INET) =
  277. ## Begins connecting ``sock`` to ``name``:``port``.
  278. sock.socket.connectAsync(name, port, af)
  279. sock.info = SockConnecting
  280. if sock.deleg != nil:
  281. sock.deleg.open = true
  282. proc close*(sock: AsyncSocket) =
  283. ## Closes ``sock``. Terminates any current connections.
  284. sock.socket.close()
  285. sock.info = SockClosed
  286. if sock.deleg != nil:
  287. sock.deleg.open = false
  288. proc bindAddr*(sock: AsyncSocket, port = Port(0), address = "") =
  289. ## Equivalent to ``sockets.bindAddr``.
  290. sock.socket.bindAddr(port, address)
  291. if sock.proto == IPPROTO_UDP:
  292. sock.info = SockUDPBound
  293. if sock.deleg != nil:
  294. sock.deleg.open = true
  295. proc listen*(sock: AsyncSocket) =
  296. ## Equivalent to ``sockets.listen``.
  297. sock.socket.listen()
  298. sock.info = SockListening
  299. if sock.deleg != nil:
  300. sock.deleg.open = true
  301. proc acceptAddr*(server: AsyncSocket, client: var AsyncSocket,
  302. address: var string) =
  303. ## Equivalent to ``sockets.acceptAddr``. This procedure should be called in
  304. ## a ``handleAccept`` event handler **only** once.
  305. ##
  306. ## **Note**: ``client`` needs to be initialised.
  307. assert(client != nil)
  308. client = newAsyncSocket()
  309. var c: Socket
  310. new(c)
  311. when defined(ssl):
  312. if server.socket.isSSL:
  313. var ret = server.socket.acceptAddrSSL(c, address)
  314. # The following shouldn't happen because when this function is called
  315. # it is guaranteed that there is a client waiting.
  316. # (This should be called in handleAccept)
  317. assert(ret != AcceptNoClient)
  318. if ret == AcceptNoHandshake:
  319. client.sslNeedAccept = true
  320. else:
  321. client.sslNeedAccept = false
  322. client.info = SockConnected
  323. else:
  324. server.socket.acceptAddr(c, address)
  325. client.sslNeedAccept = false
  326. client.info = SockConnected
  327. else:
  328. server.socket.acceptAddr(c, address)
  329. client.sslNeedAccept = false
  330. client.info = SockConnected
  331. if c == invalidSocket: raiseSocketError(server.socket)
  332. c.setBlocking(false) # TODO: Needs to be tested.
  333. # deleg.open is set in ``toDelegate``.
  334. client.socket = c
  335. client.lineBuffer = "".TaintedString
  336. client.sendBuffer = ""
  337. client.info = SockConnected
  338. proc accept*(server: AsyncSocket, client: var AsyncSocket) =
  339. ## Equivalent to ``sockets.accept``.
  340. var dummyAddr = ""
  341. server.acceptAddr(client, dummyAddr)
  342. proc acceptAddr*(server: AsyncSocket): tuple[sock: AsyncSocket,
  343. address: string] {.deprecated.} =
  344. ## Equivalent to ``sockets.acceptAddr``.
  345. ##
  346. ## **Deprecated since version 0.9.0:** Please use the function above.
  347. var client = newAsyncSocket()
  348. var address: string = ""
  349. acceptAddr(server, client, address)
  350. return (client, address)
  351. proc accept*(server: AsyncSocket): AsyncSocket {.deprecated.} =
  352. ## Equivalent to ``sockets.accept``.
  353. ##
  354. ## **Deprecated since version 0.9.0:** Please use the function above.
  355. new(result)
  356. var address = ""
  357. server.acceptAddr(result, address)
  358. proc newDispatcher*(): Dispatcher =
  359. new(result)
  360. result.delegates = @[]
  361. proc register*(d: Dispatcher, deleg: Delegate) =
  362. ## Registers delegate ``deleg`` with dispatcher ``d``.
  363. d.delegates.add(deleg)
  364. proc register*(d: Dispatcher, sock: AsyncSocket): Delegate {.discardable.} =
  365. ## Registers async socket ``sock`` with dispatcher ``d``.
  366. result = sock.toDelegate()
  367. d.register(result)
  368. proc unregister*(d: Dispatcher, deleg: Delegate) =
  369. ## Unregisters deleg ``deleg`` from dispatcher ``d``.
  370. for i in 0..len(d.delegates)-1:
  371. if d.delegates[i] == deleg:
  372. d.delegates.del(i)
  373. return
  374. raise newException(IndexError, "Could not find delegate.")
  375. proc isWriteable*(s: AsyncSocket): bool =
  376. ## Determines whether socket ``s`` is ready to be written to.
  377. var writeSock = @[s.socket]
  378. return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
  379. converter getSocket*(s: AsyncSocket): Socket =
  380. return s.socket
  381. proc isConnected*(s: AsyncSocket): bool =
  382. ## Determines whether ``s`` is connected.
  383. return s.info == SockConnected
  384. proc isListening*(s: AsyncSocket): bool =
  385. ## Determines whether ``s`` is listening for incoming connections.
  386. return s.info == SockListening
  387. proc isConnecting*(s: AsyncSocket): bool =
  388. ## Determines whether ``s`` is connecting.
  389. return s.info == SockConnecting
  390. proc isClosed*(s: AsyncSocket): bool =
  391. ## Determines whether ``s`` has been closed.
  392. return s.info == SockClosed
  393. proc isSendDataBuffered*(s: AsyncSocket): bool =
  394. ## Determines whether ``s`` has data waiting to be sent, i.e. whether this
  395. ## socket's sendBuffer contains data.
  396. return s.sendBuffer.len != 0
  397. proc setHandleWrite*(s: AsyncSocket,
  398. handleWrite: proc (s: AsyncSocket) {.closure, gcsafe.}) =
  399. ## Setter for the ``handleWrite`` event.
  400. ##
  401. ## To remove this event you should use the ``delHandleWrite`` function.
  402. ## It is advised to use that function instead of just setting the event to
  403. ## ``proc (s: AsyncSocket) = nil`` as that would mean that that function
  404. ## would be called constantly.
  405. s.deleg.mode = fmReadWrite
  406. s.handleWrite = handleWrite
  407. proc delHandleWrite*(s: AsyncSocket) =
  408. ## Removes the ``handleWrite`` event handler on ``s``.
  409. s.handleWrite = nil
  410. {.push warning[deprecated]: off.}
  411. proc recvLine*(s: AsyncSocket, line: var TaintedString): bool {.deprecated.} =
  412. ## Behaves similar to ``sockets.recvLine``, however it handles non-blocking
  413. ## sockets properly. This function guarantees that ``line`` is a full line,
  414. ## if this function can only retrieve some data; it will save this data and
  415. ## add it to the result when a full line is retrieved.
  416. ##
  417. ## Unlike ``sockets.recvLine`` this function will raise an OSError or SslError
  418. ## exception if an error occurs.
  419. ##
  420. ## **Deprecated since version 0.9.2**: This function has been deprecated in
  421. ## favour of readLine.
  422. setLen(line.string, 0)
  423. var dataReceived = "".TaintedString
  424. var ret = s.socket.recvLineAsync(dataReceived)
  425. case ret
  426. of RecvFullLine:
  427. if s.lineBuffer.len > 0:
  428. string(line).add(s.lineBuffer.string)
  429. setLen(s.lineBuffer.string, 0)
  430. string(line).add(dataReceived.string)
  431. if string(line) == "":
  432. line = "\c\L".TaintedString
  433. result = true
  434. of RecvPartialLine:
  435. string(s.lineBuffer).add(dataReceived.string)
  436. result = false
  437. of RecvDisconnected:
  438. result = true
  439. of RecvFail:
  440. s.raiseSocketError(async = true)
  441. result = false
  442. {.pop.}
  443. proc readLine*(s: AsyncSocket, line: var TaintedString): bool =
  444. ## Behaves similar to ``sockets.readLine``, however it handles non-blocking
  445. ## sockets properly. This function guarantees that ``line`` is a full line,
  446. ## if this function can only retrieve some data; it will save this data and
  447. ## add it to the result when a full line is retrieved, when this happens
  448. ## False will be returned. True will only be returned if a full line has been
  449. ## retrieved or the socket has been disconnected in which case ``line`` will
  450. ## be set to "".
  451. ##
  452. ## This function will raise an OSError exception when a socket error occurs.
  453. setLen(line.string, 0)
  454. var dataReceived = "".TaintedString
  455. var ret = s.socket.readLineAsync(dataReceived)
  456. case ret
  457. of ReadFullLine:
  458. if s.lineBuffer.len > 0:
  459. string(line).add(s.lineBuffer.string)
  460. setLen(s.lineBuffer.string, 0)
  461. string(line).add(dataReceived.string)
  462. if string(line) == "":
  463. line = "\c\L".TaintedString
  464. result = true
  465. of ReadPartialLine:
  466. string(s.lineBuffer).add(dataReceived.string)
  467. result = false
  468. of ReadNone:
  469. result = false
  470. of ReadDisconnected:
  471. result = true
  472. proc send*(sock: AsyncSocket, data: string) =
  473. ## Sends ``data`` to socket ``sock``. This is basically a nicer implementation
  474. ## of ``sockets.sendAsync``.
  475. ##
  476. ## If ``data`` cannot be sent immediately it will be buffered and sent
  477. ## when ``sock`` becomes writeable (during the ``handleWrite`` event).
  478. ## It's possible that only a part of ``data`` will be sent immediately, while
  479. ## the rest of it will be buffered and sent later.
  480. if sock.sendBuffer.len != 0:
  481. sock.sendBuffer.add(data)
  482. return
  483. let bytesSent = sock.socket.sendAsync(data)
  484. assert bytesSent >= 0
  485. if bytesSent == 0:
  486. sock.sendBuffer.add(data)
  487. sock.deleg.mode = fmReadWrite
  488. elif bytesSent != data.len:
  489. sock.sendBuffer.add(data[bytesSent .. ^1])
  490. sock.deleg.mode = fmReadWrite
  491. proc timeValFromMilliseconds(timeout = 500): Timeval =
  492. if timeout != -1:
  493. var seconds = timeout div 1000
  494. when defined(posix):
  495. result.tv_sec = seconds.Time
  496. result.tv_usec = ((timeout - seconds * 1000) * 1000).Suseconds
  497. else:
  498. result.tv_sec = seconds.int32
  499. result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
  500. proc createFdSet(fd: var TFdSet, s: seq[Delegate], m: var int) =
  501. FD_ZERO(fd)
  502. for i in items(s):
  503. m = max(m, int(i.fd))
  504. FD_SET(i.fd, fd)
  505. proc pruneSocketSet(s: var seq[Delegate], fd: var TFdSet) =
  506. var i = 0
  507. var L = s.len
  508. while i < L:
  509. if FD_ISSET(s[i].fd, fd) != 0'i32:
  510. s[i] = s[L-1]
  511. dec(L)
  512. else:
  513. inc(i)
  514. setLen(s, L)
  515. proc select(readfds, writefds, exceptfds: var seq[Delegate],
  516. timeout = 500): int =
  517. var tv {.noInit.}: Timeval = timeValFromMilliseconds(timeout)
  518. var rd, wr, ex: TFdSet
  519. var m = 0
  520. createFdSet(rd, readfds, m)
  521. createFdSet(wr, writefds, m)
  522. createFdSet(ex, exceptfds, m)
  523. if timeout != -1:
  524. result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), addr(tv)))
  525. else:
  526. result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), nil))
  527. pruneSocketSet(readfds, (rd))
  528. pruneSocketSet(writefds, (wr))
  529. pruneSocketSet(exceptfds, (ex))
  530. proc poll*(d: Dispatcher, timeout: int = 500): bool =
  531. ## This function checks for events on all the delegates in the `PDispatcher`.
  532. ## It then proceeds to call the correct event handler.
  533. ##
  534. ## This function returns ``True`` if there are file descriptors that are still
  535. ## open, otherwise ``False``. File descriptors that have been
  536. ## closed are immediately removed from the dispatcher automatically.
  537. ##
  538. ## **Note:** Each delegate has a task associated with it. This gets called
  539. ## after each select() call, if you set timeout to ``-1`` the tasks will
  540. ## only be executed after one or more file descriptors becomes readable or
  541. ## writeable.
  542. result = true
  543. var readDg, writeDg, errorDg: seq[Delegate] = @[]
  544. var len = d.delegates.len
  545. var dc = 0
  546. while dc < len:
  547. let deleg = d.delegates[dc]
  548. if (deleg.mode != fmWrite or deleg.mode != fmAppend) and deleg.open:
  549. readDg.add(deleg)
  550. if (deleg.mode != fmRead) and deleg.open:
  551. writeDg.add(deleg)
  552. if deleg.open:
  553. errorDg.add(deleg)
  554. inc dc
  555. else:
  556. # File/socket has been closed. Remove it from dispatcher.
  557. d.delegates[dc] = d.delegates[len-1]
  558. dec len
  559. d.delegates.setLen(len)
  560. var hasDataBufferedCount = 0
  561. for d in d.delegates:
  562. if d.hasDataBuffered(d.deleVal):
  563. hasDataBufferedCount.inc()
  564. d.handleRead(d.deleVal)
  565. if hasDataBufferedCount > 0: return true
  566. if readDg.len() == 0 and writeDg.len() == 0:
  567. ## TODO: Perhaps this shouldn't return if errorDg has something?
  568. return false
  569. if select(readDg, writeDg, errorDg, timeout) != 0:
  570. for i in 0..len(d.delegates)-1:
  571. if i > len(d.delegates)-1: break # One delegate might've been removed.
  572. let deleg = d.delegates[i]
  573. if not deleg.open: continue # This delegate might've been closed.
  574. if (deleg.mode != fmWrite or deleg.mode != fmAppend) and
  575. deleg notin readDg:
  576. deleg.handleRead(deleg.deleVal)
  577. if (deleg.mode != fmRead) and deleg notin writeDg:
  578. deleg.handleWrite(deleg.deleVal)
  579. if deleg notin errorDg:
  580. deleg.handleError(deleg.deleVal)
  581. # Execute tasks
  582. for i in items(d.delegates):
  583. i.task(i.deleVal)
  584. proc len*(disp: Dispatcher): int =
  585. ## Retrieves the amount of delegates in ``disp``.
  586. return disp.delegates.len
  587. when not defined(testing) and isMainModule:
  588. proc testConnect(s: AsyncSocket, no: int) =
  589. echo("Connected! " & $no)
  590. proc testRead(s: AsyncSocket, no: int) =
  591. echo("Reading! " & $no)
  592. var data = ""
  593. if not s.readLine(data): return
  594. if data == "":
  595. echo("Closing connection. " & $no)
  596. s.close()
  597. echo(data)
  598. echo("Finished reading! " & $no)
  599. proc testAccept(s: AsyncSocket, disp: Dispatcher, no: int) =
  600. echo("Accepting client! " & $no)
  601. var client: AsyncSocket
  602. new(client)
  603. var address = ""
  604. s.acceptAddr(client, address)
  605. echo("Accepted ", address)
  606. client.handleRead =
  607. proc (s: AsyncSocket) =
  608. testRead(s, 2)
  609. disp.register(client)
  610. proc main =
  611. var d = newDispatcher()
  612. var s = asyncSocket()
  613. s.connect("amber.tenthbit.net", Port(6667))
  614. s.handleConnect =
  615. proc (s: AsyncSocket) =
  616. testConnect(s, 1)
  617. s.handleRead =
  618. proc (s: AsyncSocket) =
  619. testRead(s, 1)
  620. d.register(s)
  621. var server = asyncSocket()
  622. server.handleAccept =
  623. proc (s: AsyncSocket) =
  624. testAccept(s, d, 78)
  625. server.bindAddr(Port(5555))
  626. server.listen()
  627. d.register(server)
  628. while d.poll(-1): discard
  629. main()