socketstreams.nim 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2021 Nim contributors
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module provides an implementation of the streams interface for sockets.
  10. ## It contains two separate implementations, a
  11. ## `ReadSocketStream <#ReadSocketStream>`_ and a
  12. ## `WriteSocketStream <#WriteSocketStream>`_.
  13. ##
  14. ## The `ReadSocketStream` only supports reading, peeking, and seeking.
  15. ## It reads into a buffer, so even by
  16. ## seeking backwards it will only read the same position a single time from the
  17. ## underlying socket. To clear the buffer and free the data read into it you
  18. ## can call `resetStream`, this will also reset the position back to 0 but
  19. ## won't do anything to the underlying socket.
  20. ##
  21. ## The `WriteSocketStream` allows both reading and writing, but it performs the
  22. ## reads on the internal buffer. So by writing to the buffer you can then read
  23. ## back what was written but without receiving anything from the socket. You
  24. ## can also set the position and overwrite parts of the buffer, and to send
  25. ## anything over the socket you need to call `flush` at which point you can't
  26. ## write anything to the buffer before the point of the flush (but it can still
  27. ## be read). Again to empty the underlying buffer you need to call
  28. ## `resetStream`.
  29. ##
  30. ## Examples
  31. ## ========
  32. ##
  33. ## ```Nim
  34. ## import std/socketstreams
  35. ##
  36. ## var
  37. ## socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
  38. ## stream = newReadSocketStream(socket)
  39. ## socket.sendTo("127.0.0.1", Port(12345), "SOME REQUEST")
  40. ## echo stream.readLine() # Will call `recv`
  41. ## stream.setPosition(0)
  42. ## echo stream.readLine() # Will return the read line from the buffer
  43. ## stream.resetStream() # Buffer is now empty, position is 0
  44. ## echo stream.readLine() # Will call `recv` again
  45. ## stream.close() # Closes the socket
  46. ## ```
  47. ##
  48. ## ```Nim
  49. ## import std/socketstreams
  50. ##
  51. ## var socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
  52. ## socket.connect("127.0.0.1", Port(12345))
  53. ## var sendStream = newWriteSocketStream(socket)
  54. ## sendStream.write "NOM"
  55. ## sendStream.setPosition(1)
  56. ## echo sendStream.peekStr(2) # OM
  57. ## sendStream.write "I"
  58. ## sendStream.setPosition(0)
  59. ## echo sendStream.readStr(3) # NIM
  60. ## echo sendStream.getPosition() # 3
  61. ## sendStream.flush() # This actually performs the writing to the socket
  62. ## sendStream.setPosition(1)
  63. ## sendStream.write "I" # Throws an error as we can't write into an already sent buffer
  64. ## ```
  65. import std/[net, streams]
  66. type
  67. ReadSocketStream* = ref ReadSocketStreamObj
  68. ReadSocketStreamObj* = object of StreamObj
  69. data: Socket
  70. pos: int
  71. buf: seq[byte]
  72. WriteSocketStream* = ref WriteSocketStreamObj
  73. WriteSocketStreamObj* = object of ReadSocketStreamObj
  74. lastFlush: int
  75. proc rsAtEnd(s: Stream): bool =
  76. return false
  77. proc rsSetPosition(s: Stream, pos: int) =
  78. var s = ReadSocketStream(s)
  79. s.pos = pos
  80. proc rsGetPosition(s: Stream): int =
  81. var s = ReadSocketStream(s)
  82. return s.pos
  83. proc rsPeekData(s: Stream, buffer: pointer, bufLen: int): int =
  84. result = 0
  85. let s = ReadSocketStream(s)
  86. if bufLen > 0:
  87. let oldLen = s.buf.len
  88. s.buf.setLen(max(s.pos + bufLen, s.buf.len))
  89. if s.pos + bufLen > oldLen:
  90. result = s.data.recv(s.buf[oldLen].addr, s.buf.len - oldLen)
  91. if result > 0:
  92. result += oldLen - s.pos
  93. else:
  94. result = bufLen
  95. copyMem(buffer, s.buf[s.pos].addr, result)
  96. proc rsReadData(s: Stream, buffer: pointer, bufLen: int): int =
  97. result = s.rsPeekData(buffer, bufLen)
  98. var s = ReadSocketStream(s)
  99. s.pos += bufLen
  100. proc rsReadDataStr(s: Stream, buffer: var string, slice: Slice[int]): int =
  101. var s = ReadSocketStream(s)
  102. result = slice.b + 1 - slice.a
  103. if result > 0:
  104. result = s.rsReadData(buffer[slice.a].addr, result)
  105. inc(s.pos, result)
  106. else:
  107. result = 0
  108. proc wsWriteData(s: Stream, buffer: pointer, bufLen: int) =
  109. var s = WriteSocketStream(s)
  110. if s.pos < s.lastFlush:
  111. raise newException(IOError, "Unable to write into buffer that has already been sent")
  112. if s.buf.len < s.pos + bufLen:
  113. s.buf.setLen(s.pos + bufLen)
  114. copyMem(s.buf[s.pos].addr, buffer, bufLen)
  115. s.pos += bufLen
  116. proc wsPeekData(s: Stream, buffer: pointer, bufLen: int): int =
  117. var s = WriteSocketStream(s)
  118. result = bufLen
  119. if result > 0:
  120. if s.pos > s.buf.len or s.pos == s.buf.len or s.pos + bufLen > s.buf.len:
  121. raise newException(IOError, "Unable to read past end of write buffer")
  122. else:
  123. copyMem(buffer, s.buf[s.pos].addr, bufLen)
  124. proc wsReadData(s: Stream, buffer: pointer, bufLen: int): int =
  125. result = s.wsPeekData(buffer, bufLen)
  126. var s = ReadSocketStream(s)
  127. s.pos += bufLen
  128. proc wsAtEnd(s: Stream): bool =
  129. var s = WriteSocketStream(s)
  130. return s.pos == s.buf.len
  131. proc wsFlush(s: Stream) =
  132. var s = WriteSocketStream(s)
  133. discard s.data.send(s.buf[s.lastFlush].addr, s.buf.len - s.lastFlush)
  134. s.lastFlush = s.buf.len
  135. proc rsClose(s: Stream) =
  136. {.cast(raises: [IOError, OSError]), cast(tags: []).}: # todo fixme maybe do something?
  137. var s = ReadSocketStream(s)
  138. s.data.close()
  139. proc newReadSocketStream*(s: Socket): owned ReadSocketStream =
  140. result = ReadSocketStream(data: s, pos: 0,
  141. closeImpl: rsClose,
  142. atEndImpl: rsAtEnd,
  143. setPositionImpl: rsSetPosition,
  144. getPositionImpl: rsGetPosition,
  145. readDataImpl: rsReadData,
  146. peekDataImpl: rsPeekData,
  147. readDataStrImpl: rsReadDataStr)
  148. proc resetStream*(s: ReadSocketStream) =
  149. s.buf = @[]
  150. s.pos = 0
  151. proc newWriteSocketStream*(s: Socket): owned WriteSocketStream =
  152. result = WriteSocketStream(data: s, pos: 0,
  153. closeImpl: rsClose,
  154. atEndImpl: wsAtEnd,
  155. setPositionImpl: rsSetPosition,
  156. getPositionImpl: rsGetPosition,
  157. writeDataImpl: wsWriteData,
  158. readDataImpl: wsReadData,
  159. peekDataImpl: wsPeekData,
  160. flushImpl: wsFlush)
  161. proc resetStream*(s: WriteSocketStream) =
  162. s.buf = @[]
  163. s.pos = 0
  164. s.lastFlush = 0