asyncfile.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous file reading and writing.
  10. ##
  11. ## ```Nim
  12. ## import std/[asyncfile, asyncdispatch, os]
  13. ##
  14. ## proc main() {.async.} =
  15. ## var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite)
  16. ## await file.write("test")
  17. ## file.setFilePos(0)
  18. ## let data = await file.readAll()
  19. ## doAssert data == "test"
  20. ## file.close()
  21. ##
  22. ## waitFor main()
  23. ## ```
  24. import asyncdispatch, os
  25. when defined(nimPreviewSlimSystem):
  26. import std/[assertions, syncio]
  27. when defined(windows) or defined(nimdoc):
  28. import std/widestrs
  29. # TODO: Fix duplication introduced by PR #4683.
  30. when defined(windows) or defined(nimdoc):
  31. import winlean
  32. else:
  33. import posix
  34. type
  35. AsyncFile* = ref object
  36. fd: AsyncFD
  37. offset: int64
  38. when defined(windows) or defined(nimdoc):
  39. proc getDesiredAccess(mode: FileMode): int32 =
  40. case mode
  41. of fmRead:
  42. result = GENERIC_READ
  43. of fmWrite, fmAppend:
  44. result = GENERIC_WRITE
  45. of fmReadWrite, fmReadWriteExisting:
  46. result = GENERIC_READ or GENERIC_WRITE
  47. proc getCreationDisposition(mode: FileMode, filename: string): int32 =
  48. case mode
  49. of fmRead, fmReadWriteExisting:
  50. OPEN_EXISTING
  51. of fmReadWrite, fmWrite:
  52. CREATE_ALWAYS
  53. of fmAppend:
  54. OPEN_ALWAYS
  55. else:
  56. proc getPosixFlags(mode: FileMode): cint =
  57. case mode
  58. of fmRead:
  59. result = O_RDONLY
  60. of fmWrite:
  61. result = O_WRONLY or O_CREAT or O_TRUNC
  62. of fmAppend:
  63. result = O_WRONLY or O_CREAT or O_APPEND
  64. of fmReadWrite:
  65. result = O_RDWR or O_CREAT or O_TRUNC
  66. of fmReadWriteExisting:
  67. result = O_RDWR
  68. result = result or O_NONBLOCK
  69. proc getFileSize*(f: AsyncFile): int64 =
  70. ## Retrieves the specified file's size.
  71. when defined(windows) or defined(nimdoc):
  72. var high: DWORD
  73. let low = getFileSize(f.fd.Handle, addr high)
  74. if low == INVALID_FILE_SIZE:
  75. raiseOSError(osLastError())
  76. result = (high shl 32) or low
  77. else:
  78. let curPos = lseek(f.fd.cint, 0, SEEK_CUR)
  79. result = lseek(f.fd.cint, 0, SEEK_END)
  80. f.offset = lseek(f.fd.cint, curPos, SEEK_SET)
  81. assert(f.offset == curPos)
  82. proc newAsyncFile*(fd: AsyncFD): AsyncFile =
  83. ## Creates `AsyncFile` with a previously opened file descriptor `fd`.
  84. new result
  85. result.fd = fd
  86. register(fd)
  87. proc openAsync*(filename: string, mode = fmRead): AsyncFile =
  88. ## Opens a file specified by the path in `filename` using
  89. ## the specified FileMode `mode` asynchronously.
  90. when defined(windows) or defined(nimdoc):
  91. let flags = FILE_FLAG_OVERLAPPED or FILE_ATTRIBUTE_NORMAL
  92. let desiredAccess = getDesiredAccess(mode)
  93. let creationDisposition = getCreationDisposition(mode, filename)
  94. let fd = createFileW(newWideCString(filename), desiredAccess,
  95. FILE_SHARE_READ,
  96. nil, creationDisposition, flags, 0)
  97. if fd == INVALID_HANDLE_VALUE:
  98. raiseOSError(osLastError())
  99. result = newAsyncFile(fd.AsyncFD)
  100. if mode == fmAppend:
  101. result.offset = getFileSize(result)
  102. else:
  103. let flags = getPosixFlags(mode)
  104. # RW (Owner), RW (Group), R (Other)
  105. let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH
  106. let fd = open(filename, flags, perm)
  107. if fd == -1:
  108. raiseOSError(osLastError())
  109. result = newAsyncFile(fd.AsyncFD)
  110. proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
  111. ## Read `size` bytes from the specified file asynchronously starting at
  112. ## the current position of the file pointer.
  113. ##
  114. ## If the file pointer is past the end of the file then zero is returned
  115. ## and no bytes are read into `buf`
  116. var retFuture = newFuture[int]("asyncfile.readBuffer")
  117. when defined(windows) or defined(nimdoc):
  118. var ol = newCustom()
  119. ol.data = CompletionData(fd: f.fd, cb:
  120. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  121. if not retFuture.finished:
  122. if errcode == OSErrorCode(-1):
  123. assert bytesCount > 0
  124. assert bytesCount <= size
  125. f.offset.inc bytesCount
  126. retFuture.complete(bytesCount)
  127. else:
  128. if errcode.int32 == ERROR_HANDLE_EOF:
  129. retFuture.complete(0)
  130. else:
  131. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  132. )
  133. ol.offset = DWORD(f.offset and 0xffffffff)
  134. ol.offsetHigh = DWORD(f.offset shr 32)
  135. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  136. let ret = readFile(f.fd.Handle, buf, size.int32, nil,
  137. cast[POVERLAPPED](ol))
  138. if not ret.bool:
  139. let err = osLastError()
  140. if err.int32 != ERROR_IO_PENDING:
  141. GC_unref(ol)
  142. if err.int32 == ERROR_HANDLE_EOF:
  143. # This happens in Windows Server 2003
  144. retFuture.complete(0)
  145. else:
  146. retFuture.fail(newException(OSError, osErrorMsg(err)))
  147. else:
  148. # Request completed immediately.
  149. var bytesRead: DWORD
  150. let overlappedRes = getOverlappedResult(f.fd.Handle,
  151. cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
  152. if not overlappedRes.bool:
  153. let err = osLastError()
  154. if err.int32 == ERROR_HANDLE_EOF:
  155. retFuture.complete(0)
  156. else:
  157. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  158. else:
  159. assert bytesRead > 0
  160. assert bytesRead <= size
  161. f.offset.inc bytesRead
  162. retFuture.complete(bytesRead)
  163. else:
  164. proc cb(fd: AsyncFD): bool =
  165. result = true
  166. let res = read(fd.cint, cast[cstring](buf), size.cint)
  167. if res < 0:
  168. let lastError = osLastError()
  169. if lastError.int32 != EAGAIN:
  170. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  171. else:
  172. result = false # We still want this callback to be called.
  173. elif res == 0:
  174. # EOF
  175. retFuture.complete(0)
  176. else:
  177. f.offset.inc(res)
  178. retFuture.complete(res)
  179. if not cb(f.fd):
  180. addRead(f.fd, cb)
  181. return retFuture
  182. proc read*(f: AsyncFile, size: int): Future[string] =
  183. ## Read `size` bytes from the specified file asynchronously starting at
  184. ## the current position of the file pointer. `size` should be greater than zero.
  185. ##
  186. ## If the file pointer is past the end of the file then an empty string is
  187. ## returned.
  188. assert size > 0
  189. var retFuture = newFuture[string]("asyncfile.read")
  190. when defined(windows) or defined(nimdoc):
  191. var buffer = alloc0(size)
  192. var ol = newCustom()
  193. ol.data = CompletionData(fd: f.fd, cb:
  194. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  195. if not retFuture.finished:
  196. if errcode == OSErrorCode(-1):
  197. assert bytesCount > 0
  198. assert bytesCount <= size
  199. var data = newString(bytesCount)
  200. copyMem(addr data[0], buffer, bytesCount)
  201. f.offset.inc bytesCount
  202. retFuture.complete($data)
  203. else:
  204. if errcode.int32 == ERROR_HANDLE_EOF:
  205. retFuture.complete("")
  206. else:
  207. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  208. if buffer != nil:
  209. dealloc buffer
  210. buffer = nil
  211. )
  212. ol.offset = DWORD(f.offset and 0xffffffff)
  213. ol.offsetHigh = DWORD(f.offset shr 32)
  214. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  215. let ret = readFile(f.fd.Handle, buffer, size.int32, nil,
  216. cast[POVERLAPPED](ol))
  217. if not ret.bool:
  218. let err = osLastError()
  219. if err.int32 != ERROR_IO_PENDING:
  220. if buffer != nil:
  221. dealloc buffer
  222. buffer = nil
  223. GC_unref(ol)
  224. if err.int32 == ERROR_HANDLE_EOF:
  225. # This happens in Windows Server 2003
  226. retFuture.complete("")
  227. else:
  228. retFuture.fail(newException(OSError, osErrorMsg(err)))
  229. else:
  230. # Request completed immediately.
  231. var bytesRead: DWORD
  232. let overlappedRes = getOverlappedResult(f.fd.Handle,
  233. cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
  234. if not overlappedRes.bool:
  235. let err = osLastError()
  236. if err.int32 == ERROR_HANDLE_EOF:
  237. retFuture.complete("")
  238. else:
  239. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  240. else:
  241. assert bytesRead > 0
  242. assert bytesRead <= size
  243. var data = newString(bytesRead)
  244. copyMem(addr data[0], buffer, bytesRead)
  245. f.offset.inc bytesRead
  246. retFuture.complete($data)
  247. else:
  248. var readBuffer = newString(size)
  249. proc cb(fd: AsyncFD): bool =
  250. result = true
  251. let res = read(fd.cint, addr readBuffer[0], size.cint)
  252. if res < 0:
  253. let lastError = osLastError()
  254. if lastError.int32 != EAGAIN:
  255. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  256. else:
  257. result = false # We still want this callback to be called.
  258. elif res == 0:
  259. # EOF
  260. f.offset = lseek(fd.cint, 0, SEEK_CUR)
  261. retFuture.complete("")
  262. else:
  263. readBuffer.setLen(res)
  264. f.offset.inc(res)
  265. retFuture.complete(readBuffer)
  266. if not cb(f.fd):
  267. addRead(f.fd, cb)
  268. return retFuture
  269. proc readLine*(f: AsyncFile): Future[string] {.async.} =
  270. ## Reads a single line from the specified file asynchronously.
  271. result = ""
  272. while true:
  273. var c = await read(f, 1)
  274. if c[0] == '\c':
  275. c = await read(f, 1)
  276. break
  277. if c[0] == '\L' or c == "":
  278. break
  279. else:
  280. result.add(c)
  281. proc getFilePos*(f: AsyncFile): int64 =
  282. ## Retrieves the current position of the file pointer that is
  283. ## used to read from the specified file. The file's first byte has the
  284. ## index zero.
  285. f.offset
  286. proc setFilePos*(f: AsyncFile, pos: int64) =
  287. ## Sets the position of the file pointer that is used for read/write
  288. ## operations. The file's first byte has the index zero.
  289. f.offset = pos
  290. when not defined(windows) and not defined(nimdoc):
  291. let ret = lseek(f.fd.cint, pos.Off, SEEK_SET)
  292. if ret == -1:
  293. raiseOSError(osLastError())
  294. proc readAll*(f: AsyncFile): Future[string] {.async.} =
  295. ## Reads all data from the specified file.
  296. result = ""
  297. while true:
  298. let data = await read(f, 4000)
  299. if data.len == 0:
  300. return
  301. result.add data
  302. proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
  303. ## Writes `size` bytes from `buf` to the file specified asynchronously.
  304. ##
  305. ## The returned Future will complete once all data has been written to the
  306. ## specified file.
  307. var retFuture = newFuture[void]("asyncfile.writeBuffer")
  308. when defined(windows) or defined(nimdoc):
  309. var ol = newCustom()
  310. ol.data = CompletionData(fd: f.fd, cb:
  311. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  312. if not retFuture.finished:
  313. if errcode == OSErrorCode(-1):
  314. assert bytesCount == size.int32
  315. retFuture.complete()
  316. else:
  317. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  318. )
  319. # passing -1 here should work according to MSDN, but doesn't. For more
  320. # information see
  321. # http://stackoverflow.com/questions/33650899/does-asynchronous-file-
  322. # appending-in-windows-preserve-order
  323. ol.offset = DWORD(f.offset and 0xffffffff)
  324. ol.offsetHigh = DWORD(f.offset shr 32)
  325. f.offset.inc(size)
  326. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  327. let ret = writeFile(f.fd.Handle, buf, size.int32, nil,
  328. cast[POVERLAPPED](ol))
  329. if not ret.bool:
  330. let err = osLastError()
  331. if err.int32 != ERROR_IO_PENDING:
  332. GC_unref(ol)
  333. retFuture.fail(newException(OSError, osErrorMsg(err)))
  334. else:
  335. # Request completed immediately.
  336. var bytesWritten: DWORD
  337. let overlappedRes = getOverlappedResult(f.fd.Handle,
  338. cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
  339. if not overlappedRes.bool:
  340. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  341. else:
  342. assert bytesWritten == size.int32
  343. retFuture.complete()
  344. else:
  345. var written = 0
  346. proc cb(fd: AsyncFD): bool =
  347. result = true
  348. let remainderSize = size - written
  349. var cbuf = cast[cstring](buf)
  350. let res = write(fd.cint, addr cbuf[written], remainderSize.cint)
  351. if res < 0:
  352. let lastError = osLastError()
  353. if lastError.int32 != EAGAIN:
  354. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  355. else:
  356. result = false # We still want this callback to be called.
  357. else:
  358. written.inc res
  359. f.offset.inc res
  360. if res != remainderSize:
  361. result = false # We still have data to write.
  362. else:
  363. retFuture.complete()
  364. if not cb(f.fd):
  365. addWrite(f.fd, cb)
  366. return retFuture
  367. proc write*(f: AsyncFile, data: string): Future[void] =
  368. ## Writes `data` to the file specified asynchronously.
  369. ##
  370. ## The returned Future will complete once all data has been written to the
  371. ## specified file.
  372. var retFuture = newFuture[void]("asyncfile.write")
  373. var copy = data
  374. when defined(windows) or defined(nimdoc):
  375. var buffer = alloc0(data.len)
  376. copyMem(buffer, copy.cstring, data.len)
  377. var ol = newCustom()
  378. ol.data = CompletionData(fd: f.fd, cb:
  379. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  380. if not retFuture.finished:
  381. if errcode == OSErrorCode(-1):
  382. assert bytesCount == data.len.int32
  383. retFuture.complete()
  384. else:
  385. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  386. if buffer != nil:
  387. dealloc buffer
  388. buffer = nil
  389. )
  390. ol.offset = DWORD(f.offset and 0xffffffff)
  391. ol.offsetHigh = DWORD(f.offset shr 32)
  392. f.offset.inc(data.len)
  393. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  394. let ret = writeFile(f.fd.Handle, buffer, data.len.int32, nil,
  395. cast[POVERLAPPED](ol))
  396. if not ret.bool:
  397. let err = osLastError()
  398. if err.int32 != ERROR_IO_PENDING:
  399. if buffer != nil:
  400. dealloc buffer
  401. buffer = nil
  402. GC_unref(ol)
  403. retFuture.fail(newException(OSError, osErrorMsg(err)))
  404. else:
  405. # Request completed immediately.
  406. var bytesWritten: DWORD
  407. let overlappedRes = getOverlappedResult(f.fd.Handle,
  408. cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
  409. if not overlappedRes.bool:
  410. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  411. else:
  412. assert bytesWritten == data.len.int32
  413. retFuture.complete()
  414. else:
  415. var written = 0
  416. proc cb(fd: AsyncFD): bool =
  417. result = true
  418. let remainderSize = data.len - written
  419. let res =
  420. if data.len == 0:
  421. write(fd.cint, copy.cstring, 0)
  422. else:
  423. write(fd.cint, addr copy[written], remainderSize.cint)
  424. if res < 0:
  425. let lastError = osLastError()
  426. if lastError.int32 != EAGAIN:
  427. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  428. else:
  429. result = false # We still want this callback to be called.
  430. else:
  431. written.inc res
  432. f.offset.inc res
  433. if res != remainderSize:
  434. result = false # We still have data to write.
  435. else:
  436. retFuture.complete()
  437. if not cb(f.fd):
  438. addWrite(f.fd, cb)
  439. return retFuture
  440. proc setFileSize*(f: AsyncFile, length: int64) =
  441. ## Set a file length.
  442. when defined(windows) or defined(nimdoc):
  443. var
  444. high = (length shr 32).DWORD
  445. let
  446. low = (length and 0xffffffff).DWORD
  447. status = setFilePointer(f.fd.Handle, low, addr high, 0)
  448. lastErr = osLastError()
  449. if (status == INVALID_SET_FILE_POINTER and lastErr.int32 != NO_ERROR) or
  450. (setEndOfFile(f.fd.Handle) == 0):
  451. raiseOSError(osLastError())
  452. else:
  453. # will truncate if Off is a 32-bit type!
  454. if ftruncate(f.fd.cint, length.Off) == -1:
  455. raiseOSError(osLastError())
  456. proc close*(f: AsyncFile) =
  457. ## Closes the file specified.
  458. unregister(f.fd)
  459. when defined(windows) or defined(nimdoc):
  460. if not closeHandle(f.fd.Handle).bool:
  461. raiseOSError(osLastError())
  462. else:
  463. if close(f.fd.cint) == -1:
  464. raiseOSError(osLastError())
  465. proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  466. ## Reads data from the specified future stream until it is completed.
  467. ## The data which is read is written to the file immediately and
  468. ## freed from memory.
  469. ##
  470. ## This procedure is perfect for saving streamed data to a file without
  471. ## wasting memory.
  472. while true:
  473. let (hasValue, value) = await fs.read()
  474. if hasValue:
  475. await f.write(value)
  476. else:
  477. break
  478. proc readToStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  479. ## Writes data to the specified future stream as the file is read.
  480. while true:
  481. let data = await read(f, 4000)
  482. if data.len == 0:
  483. break
  484. await fs.write(data)
  485. fs.complete()