asyncfile.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous file reading and writing.
  10. ##
  11. ## .. code-block:: Nim
  12. ## import asyncfile, asyncdispatch, os
  13. ##
  14. ## proc main() {.async.} =
  15. ## var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite)
  16. ## await file.write("test")
  17. ## file.setFilePos(0)
  18. ## let data = await file.readAll()
  19. ## doAssert data == "test"
  20. ## file.close()
  21. ##
  22. ## waitFor main()
  23. import asyncdispatch, os
  24. # TODO: Fix duplication introduced by PR #4683.
  25. when defined(windows) or defined(nimdoc):
  26. import winlean
  27. else:
  28. import posix
  29. type
  30. AsyncFile* = ref object
  31. fd: AsyncFd
  32. offset: int64
  33. when defined(windows) or defined(nimdoc):
  34. proc getDesiredAccess(mode: FileMode): int32 =
  35. case mode
  36. of fmRead:
  37. result = GENERIC_READ
  38. of fmWrite, fmAppend:
  39. result = GENERIC_WRITE
  40. of fmReadWrite, fmReadWriteExisting:
  41. result = GENERIC_READ or GENERIC_WRITE
  42. proc getCreationDisposition(mode: FileMode, filename: string): int32 =
  43. case mode
  44. of fmRead, fmReadWriteExisting:
  45. OPEN_EXISTING
  46. of fmReadWrite, fmWrite:
  47. CREATE_ALWAYS
  48. of fmAppend:
  49. OPEN_ALWAYS
  50. else:
  51. proc getPosixFlags(mode: FileMode): cint =
  52. case mode
  53. of fmRead:
  54. result = O_RDONLY
  55. of fmWrite:
  56. result = O_WRONLY or O_CREAT or O_TRUNC
  57. of fmAppend:
  58. result = O_WRONLY or O_CREAT or O_APPEND
  59. of fmReadWrite:
  60. result = O_RDWR or O_CREAT or O_TRUNC
  61. of fmReadWriteExisting:
  62. result = O_RDWR
  63. result = result or O_NONBLOCK
  64. proc getFileSize*(f: AsyncFile): int64 =
  65. ## Retrieves the specified file's size.
  66. when defined(windows) or defined(nimdoc):
  67. var high: DWord
  68. let low = getFileSize(f.fd.Handle, addr high)
  69. if low == INVALID_FILE_SIZE:
  70. raiseOSError(osLastError())
  71. result = (high shl 32) or low
  72. else:
  73. let curPos = lseek(f.fd.cint, 0, SEEK_CUR)
  74. result = lseek(f.fd.cint, 0, SEEK_END)
  75. f.offset = lseek(f.fd.cint, curPos, SEEK_SET)
  76. assert(f.offset == curPos)
  77. proc newAsyncFile*(fd: AsyncFd): AsyncFile =
  78. ## Creates `AsyncFile` with a previously opened file descriptor `fd`.
  79. new result
  80. result.fd = fd
  81. register(fd)
  82. proc openAsync*(filename: string, mode = fmRead): AsyncFile =
  83. ## Opens a file specified by the path in ``filename`` using
  84. ## the specified FileMode ``mode`` asynchronously.
  85. when defined(windows) or defined(nimdoc):
  86. let flags = FILE_FLAG_OVERLAPPED or FILE_ATTRIBUTE_NORMAL
  87. let desiredAccess = getDesiredAccess(mode)
  88. let creationDisposition = getCreationDisposition(mode, filename)
  89. when useWinUnicode:
  90. let fd = createFileW(newWideCString(filename), desiredAccess,
  91. FILE_SHARE_READ,
  92. nil, creationDisposition, flags, 0)
  93. else:
  94. let fd = createFileA(filename, desiredAccess,
  95. FILE_SHARE_READ,
  96. nil, creationDisposition, flags, 0)
  97. if fd == INVALID_HANDLE_VALUE:
  98. raiseOSError(osLastError())
  99. result = newAsyncFile(fd.AsyncFd)
  100. if mode == fmAppend:
  101. result.offset = getFileSize(result)
  102. else:
  103. let flags = getPosixFlags(mode)
  104. # RW (Owner), RW (Group), R (Other)
  105. let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH
  106. let fd = open(filename, flags, perm)
  107. if fd == -1:
  108. raiseOSError(osLastError())
  109. result = newAsyncFile(fd.AsyncFd)
  110. proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
  111. ## Read ``size`` bytes from the specified file asynchronously starting at
  112. ## the current position of the file pointer.
  113. ##
  114. ## If the file pointer is past the end of the file then zero is returned
  115. ## and no bytes are read into ``buf``
  116. var retFuture = newFuture[int]("asyncfile.readBuffer")
  117. when defined(windows) or defined(nimdoc):
  118. var ol = PCustomOverlapped()
  119. GC_ref(ol)
  120. ol.data = CompletionData(fd: f.fd, cb:
  121. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  122. if not retFuture.finished:
  123. if errcode == OSErrorCode(-1):
  124. assert bytesCount > 0
  125. assert bytesCount <= size
  126. f.offset.inc bytesCount
  127. retFuture.complete(bytesCount)
  128. else:
  129. if errcode.int32 == ERROR_HANDLE_EOF:
  130. retFuture.complete(0)
  131. else:
  132. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  133. )
  134. ol.offset = DWord(f.offset and 0xffffffff)
  135. ol.offsetHigh = DWord(f.offset shr 32)
  136. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  137. let ret = readFile(f.fd.Handle, buf, size.int32, nil,
  138. cast[POVERLAPPED](ol))
  139. if not ret.bool:
  140. let err = osLastError()
  141. if err.int32 != ERROR_IO_PENDING:
  142. GC_unref(ol)
  143. if err.int32 == ERROR_HANDLE_EOF:
  144. # This happens in Windows Server 2003
  145. retFuture.complete(0)
  146. else:
  147. retFuture.fail(newException(OSError, osErrorMsg(err)))
  148. else:
  149. # Request completed immediately.
  150. var bytesRead: DWord
  151. let overlappedRes = getOverlappedResult(f.fd.Handle,
  152. cast[POverlapped](ol), bytesRead, false.WinBool)
  153. if not overlappedRes.bool:
  154. let err = osLastError()
  155. if err.int32 == ERROR_HANDLE_EOF:
  156. retFuture.complete(0)
  157. else:
  158. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  159. else:
  160. assert bytesRead > 0
  161. assert bytesRead <= size
  162. f.offset.inc bytesRead
  163. retFuture.complete(bytesRead)
  164. else:
  165. proc cb(fd: AsyncFD): bool =
  166. result = true
  167. let res = read(fd.cint, cast[cstring](buf), size.cint)
  168. if res < 0:
  169. let lastError = osLastError()
  170. if lastError.int32 != EAGAIN:
  171. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  172. else:
  173. result = false # We still want this callback to be called.
  174. elif res == 0:
  175. # EOF
  176. retFuture.complete(0)
  177. else:
  178. f.offset.inc(res)
  179. retFuture.complete(res)
  180. if not cb(f.fd):
  181. addRead(f.fd, cb)
  182. return retFuture
  183. proc read*(f: AsyncFile, size: int): Future[string] =
  184. ## Read ``size`` bytes from the specified file asynchronously starting at
  185. ## the current position of the file pointer.
  186. ##
  187. ## If the file pointer is past the end of the file then an empty string is
  188. ## returned.
  189. var retFuture = newFuture[string]("asyncfile.read")
  190. when defined(windows) or defined(nimdoc):
  191. var buffer = alloc0(size)
  192. var ol = PCustomOverlapped()
  193. GC_ref(ol)
  194. ol.data = CompletionData(fd: f.fd, cb:
  195. proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
  196. if not retFuture.finished:
  197. if errcode == OSErrorCode(-1):
  198. assert bytesCount > 0
  199. assert bytesCount <= size
  200. var data = newString(bytesCount)
  201. copyMem(addr data[0], buffer, bytesCount)
  202. f.offset.inc bytesCount
  203. retFuture.complete($data)
  204. else:
  205. if errcode.int32 == ERROR_HANDLE_EOF:
  206. retFuture.complete("")
  207. else:
  208. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  209. if buffer != nil:
  210. dealloc buffer
  211. buffer = nil
  212. )
  213. ol.offset = DWord(f.offset and 0xffffffff)
  214. ol.offsetHigh = DWord(f.offset shr 32)
  215. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  216. let ret = readFile(f.fd.Handle, buffer, size.int32, nil,
  217. cast[POVERLAPPED](ol))
  218. if not ret.bool:
  219. let err = osLastError()
  220. if err.int32 != ERROR_IO_PENDING:
  221. if buffer != nil:
  222. dealloc buffer
  223. buffer = nil
  224. GC_unref(ol)
  225. if err.int32 == ERROR_HANDLE_EOF:
  226. # This happens in Windows Server 2003
  227. retFuture.complete("")
  228. else:
  229. retFuture.fail(newException(OSError, osErrorMsg(err)))
  230. else:
  231. # Request completed immediately.
  232. var bytesRead: DWord
  233. let overlappedRes = getOverlappedResult(f.fd.Handle,
  234. cast[POverlapped](ol), bytesRead, false.WinBool)
  235. if not overlappedRes.bool:
  236. let err = osLastError()
  237. if err.int32 == ERROR_HANDLE_EOF:
  238. retFuture.complete("")
  239. else:
  240. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  241. else:
  242. assert bytesRead > 0
  243. assert bytesRead <= size
  244. var data = newString(bytesRead)
  245. copyMem(addr data[0], buffer, bytesRead)
  246. f.offset.inc bytesRead
  247. retFuture.complete($data)
  248. else:
  249. var readBuffer = newString(size)
  250. proc cb(fd: AsyncFD): bool =
  251. result = true
  252. let res = read(fd.cint, addr readBuffer[0], size.cint)
  253. if res < 0:
  254. let lastError = osLastError()
  255. if lastError.int32 != EAGAIN:
  256. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  257. else:
  258. result = false # We still want this callback to be called.
  259. elif res == 0:
  260. # EOF
  261. f.offset = lseek(fd.cint, 0, SEEK_CUR)
  262. retFuture.complete("")
  263. else:
  264. readBuffer.setLen(res)
  265. f.offset.inc(res)
  266. retFuture.complete(readBuffer)
  267. if not cb(f.fd):
  268. addRead(f.fd, cb)
  269. return retFuture
  270. proc readLine*(f: AsyncFile): Future[string] {.async.} =
  271. ## Reads a single line from the specified file asynchronously.
  272. result = ""
  273. while true:
  274. var c = await read(f, 1)
  275. if c[0] == '\c':
  276. c = await read(f, 1)
  277. break
  278. if c[0] == '\L' or c == "":
  279. break
  280. else:
  281. result.add(c)
  282. proc getFilePos*(f: AsyncFile): int64 =
  283. ## Retrieves the current position of the file pointer that is
  284. ## used to read from the specified file. The file's first byte has the
  285. ## index zero.
  286. f.offset
  287. proc setFilePos*(f: AsyncFile, pos: int64) =
  288. ## Sets the position of the file pointer that is used for read/write
  289. ## operations. The file's first byte has the index zero.
  290. f.offset = pos
  291. when not defined(windows) and not defined(nimdoc):
  292. let ret = lseek(f.fd.cint, pos.Off, SEEK_SET)
  293. if ret == -1:
  294. raiseOSError(osLastError())
  295. proc readAll*(f: AsyncFile): Future[string] {.async.} =
  296. ## Reads all data from the specified file.
  297. result = ""
  298. while true:
  299. let data = await read(f, 4000)
  300. if data.len == 0:
  301. return
  302. result.add data
  303. proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
  304. ## Writes ``size`` bytes from ``buf`` to the file specified asynchronously.
  305. ##
  306. ## The returned Future will complete once all data has been written to the
  307. ## specified file.
  308. var retFuture = newFuture[void]("asyncfile.writeBuffer")
  309. when defined(windows) or defined(nimdoc):
  310. var ol = PCustomOverlapped()
  311. GC_ref(ol)
  312. ol.data = CompletionData(fd: f.fd, cb:
  313. proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) =
  314. if not retFuture.finished:
  315. if errcode == OSErrorCode(-1):
  316. assert bytesCount == size.int32
  317. retFuture.complete()
  318. else:
  319. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  320. )
  321. # passing -1 here should work according to MSDN, but doesn't. For more
  322. # information see
  323. # http://stackoverflow.com/questions/33650899/does-asynchronous-file-
  324. # appending-in-windows-preserve-order
  325. ol.offset = DWord(f.offset and 0xffffffff)
  326. ol.offsetHigh = DWord(f.offset shr 32)
  327. f.offset.inc(size)
  328. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  329. let ret = writeFile(f.fd.Handle, buf, size.int32, nil,
  330. cast[POVERLAPPED](ol))
  331. if not ret.bool:
  332. let err = osLastError()
  333. if err.int32 != ERROR_IO_PENDING:
  334. GC_unref(ol)
  335. retFuture.fail(newException(OSError, osErrorMsg(err)))
  336. else:
  337. # Request completed immediately.
  338. var bytesWritten: DWord
  339. let overlappedRes = getOverlappedResult(f.fd.Handle,
  340. cast[POverlapped](ol), bytesWritten, false.WinBool)
  341. if not overlappedRes.bool:
  342. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  343. else:
  344. assert bytesWritten == size.int32
  345. retFuture.complete()
  346. else:
  347. var written = 0
  348. proc cb(fd: AsyncFD): bool =
  349. result = true
  350. let remainderSize = size-written
  351. var cbuf = cast[cstring](buf)
  352. let res = write(fd.cint, addr cbuf[written], remainderSize.cint)
  353. if res < 0:
  354. let lastError = osLastError()
  355. if lastError.int32 != EAGAIN:
  356. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  357. else:
  358. result = false # We still want this callback to be called.
  359. else:
  360. written.inc res
  361. f.offset.inc res
  362. if res != remainderSize:
  363. result = false # We still have data to write.
  364. else:
  365. retFuture.complete()
  366. if not cb(f.fd):
  367. addWrite(f.fd, cb)
  368. return retFuture
  369. proc write*(f: AsyncFile, data: string): Future[void] =
  370. ## Writes ``data`` to the file specified asynchronously.
  371. ##
  372. ## The returned Future will complete once all data has been written to the
  373. ## specified file.
  374. var retFuture = newFuture[void]("asyncfile.write")
  375. var copy = data
  376. when defined(windows) or defined(nimdoc):
  377. var buffer = alloc0(data.len)
  378. copyMem(buffer, addr copy[0], data.len)
  379. var ol = PCustomOverlapped()
  380. GC_ref(ol)
  381. ol.data = CompletionData(fd: f.fd, cb:
  382. proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) =
  383. if not retFuture.finished:
  384. if errcode == OSErrorCode(-1):
  385. assert bytesCount == data.len.int32
  386. retFuture.complete()
  387. else:
  388. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  389. if buffer != nil:
  390. dealloc buffer
  391. buffer = nil
  392. )
  393. ol.offset = DWord(f.offset and 0xffffffff)
  394. ol.offsetHigh = DWord(f.offset shr 32)
  395. f.offset.inc(data.len)
  396. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  397. let ret = writeFile(f.fd.Handle, buffer, data.len.int32, nil,
  398. cast[POVERLAPPED](ol))
  399. if not ret.bool:
  400. let err = osLastError()
  401. if err.int32 != ERROR_IO_PENDING:
  402. if buffer != nil:
  403. dealloc buffer
  404. buffer = nil
  405. GC_unref(ol)
  406. retFuture.fail(newException(OSError, osErrorMsg(err)))
  407. else:
  408. # Request completed immediately.
  409. var bytesWritten: DWord
  410. let overlappedRes = getOverlappedResult(f.fd.Handle,
  411. cast[POverlapped](ol), bytesWritten, false.WinBool)
  412. if not overlappedRes.bool:
  413. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  414. else:
  415. assert bytesWritten == data.len.int32
  416. retFuture.complete()
  417. else:
  418. var written = 0
  419. proc cb(fd: AsyncFD): bool =
  420. result = true
  421. let remainderSize = data.len-written
  422. let res = write(fd.cint, addr copy[written], remainderSize.cint)
  423. if res < 0:
  424. let lastError = osLastError()
  425. if lastError.int32 != EAGAIN:
  426. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  427. else:
  428. result = false # We still want this callback to be called.
  429. else:
  430. written.inc res
  431. f.offset.inc res
  432. if res != remainderSize:
  433. result = false # We still have data to write.
  434. else:
  435. retFuture.complete()
  436. if not cb(f.fd):
  437. addWrite(f.fd, cb)
  438. return retFuture
  439. proc setFileSize*(f: AsyncFile, length: int64) =
  440. ## Set a file length.
  441. when defined(windows) or defined(nimdoc):
  442. var
  443. high = (length shr 32).Dword
  444. let
  445. low = (length and 0xffffffff).Dword
  446. status = setFilePointer(f.fd.Handle, low, addr high, 0)
  447. lastErr = osLastError()
  448. if (status == INVALID_SET_FILE_POINTER and lastErr.int32 != NO_ERROR) or
  449. (setEndOfFile(f.fd.Handle) == 0):
  450. raiseOSError(osLastError())
  451. else:
  452. # will truncate if Off is a 32-bit type!
  453. if ftruncate(f.fd.cint, length.Off) == -1:
  454. raiseOSError(osLastError())
  455. proc close*(f: AsyncFile) =
  456. ## Closes the file specified.
  457. unregister(f.fd)
  458. when defined(windows) or defined(nimdoc):
  459. if not closeHandle(f.fd.Handle).bool:
  460. raiseOSError(osLastError())
  461. else:
  462. if close(f.fd.cint) == -1:
  463. raiseOSError(osLastError())
  464. proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  465. ## Reads data from the specified future stream until it is completed.
  466. ## The data which is read is written to the file immediately and
  467. ## freed from memory.
  468. ##
  469. ## This procedure is perfect for saving streamed data to a file without
  470. ## wasting memory.
  471. while true:
  472. let (hasValue, value) = await fs.read()
  473. if hasValue:
  474. await f.write(value)
  475. else:
  476. break
  477. proc readToStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  478. ## Writes data to the specified future stream as the file is read.
  479. while true:
  480. let data = await read(f, 4000)
  481. if data.len == 0:
  482. break
  483. await fs.write(data)
  484. fs.complete()