asyncfile.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous file reading and writing.
  10. ##
  11. ## .. code-block:: Nim
  12. ## import asyncfile, asyncdispatch, os
  13. ##
  14. ## proc main() {.async.} =
  15. ## var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite)
  16. ## await file.write("test")
  17. ## file.setFilePos(0)
  18. ## let data = await file.readAll()
  19. ## doAssert data == "test"
  20. ## file.close()
  21. ##
  22. ## waitFor main()
  23. import asyncdispatch, os
  24. # TODO: Fix duplication introduced by PR #4683.
  25. when defined(windows) or defined(nimdoc):
  26. import winlean
  27. else:
  28. import posix
  29. type
  30. AsyncFile* = ref object
  31. fd: AsyncFD
  32. offset: int64
  33. when defined(windows) or defined(nimdoc):
  34. proc getDesiredAccess(mode: FileMode): int32 =
  35. case mode
  36. of fmRead:
  37. result = GENERIC_READ
  38. of fmWrite, fmAppend:
  39. result = GENERIC_WRITE
  40. of fmReadWrite, fmReadWriteExisting:
  41. result = GENERIC_READ or GENERIC_WRITE
  42. proc getCreationDisposition(mode: FileMode, filename: string): int32 =
  43. case mode
  44. of fmRead, fmReadWriteExisting:
  45. OPEN_EXISTING
  46. of fmReadWrite, fmWrite:
  47. CREATE_ALWAYS
  48. of fmAppend:
  49. OPEN_ALWAYS
  50. else:
  51. proc getPosixFlags(mode: FileMode): cint =
  52. case mode
  53. of fmRead:
  54. result = O_RDONLY
  55. of fmWrite:
  56. result = O_WRONLY or O_CREAT or O_TRUNC
  57. of fmAppend:
  58. result = O_WRONLY or O_CREAT or O_APPEND
  59. of fmReadWrite:
  60. result = O_RDWR or O_CREAT or O_TRUNC
  61. of fmReadWriteExisting:
  62. result = O_RDWR
  63. result = result or O_NONBLOCK
  64. proc getFileSize*(f: AsyncFile): int64 =
  65. ## Retrieves the specified file's size.
  66. when defined(windows) or defined(nimdoc):
  67. var high: DWORD
  68. let low = getFileSize(f.fd.Handle, addr high)
  69. if low == INVALID_FILE_SIZE:
  70. raiseOSError(osLastError())
  71. result = (high shl 32) or low
  72. else:
  73. let curPos = lseek(f.fd.cint, 0, SEEK_CUR)
  74. result = lseek(f.fd.cint, 0, SEEK_END)
  75. f.offset = lseek(f.fd.cint, curPos, SEEK_SET)
  76. assert(f.offset == curPos)
  77. proc newAsyncFile*(fd: AsyncFD): AsyncFile =
  78. ## Creates `AsyncFile` with a previously opened file descriptor `fd`.
  79. new result
  80. result.fd = fd
  81. register(fd)
  82. proc openAsync*(filename: string, mode = fmRead): AsyncFile =
  83. ## Opens a file specified by the path in ``filename`` using
  84. ## the specified FileMode ``mode`` asynchronously.
  85. when defined(windows) or defined(nimdoc):
  86. let flags = FILE_FLAG_OVERLAPPED or FILE_ATTRIBUTE_NORMAL
  87. let desiredAccess = getDesiredAccess(mode)
  88. let creationDisposition = getCreationDisposition(mode, filename)
  89. when useWinUnicode:
  90. let fd = createFileW(newWideCString(filename), desiredAccess,
  91. FILE_SHARE_READ,
  92. nil, creationDisposition, flags, 0)
  93. else:
  94. let fd = createFileA(filename, desiredAccess,
  95. FILE_SHARE_READ,
  96. nil, creationDisposition, flags, 0)
  97. if fd == INVALID_HANDLE_VALUE:
  98. raiseOSError(osLastError())
  99. result = newAsyncFile(fd.AsyncFD)
  100. if mode == fmAppend:
  101. result.offset = getFileSize(result)
  102. else:
  103. let flags = getPosixFlags(mode)
  104. # RW (Owner), RW (Group), R (Other)
  105. let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH
  106. let fd = open(filename, flags, perm)
  107. if fd == -1:
  108. raiseOSError(osLastError())
  109. result = newAsyncFile(fd.AsyncFD)
  110. proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
  111. ## Read ``size`` bytes from the specified file asynchronously starting at
  112. ## the current position of the file pointer.
  113. ##
  114. ## If the file pointer is past the end of the file then zero is returned
  115. ## and no bytes are read into ``buf``
  116. var retFuture = newFuture[int]("asyncfile.readBuffer")
  117. when defined(windows) or defined(nimdoc):
  118. var ol = newCustom()
  119. ol.data = CompletionData(fd: f.fd, cb:
  120. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  121. if not retFuture.finished:
  122. if errcode == OSErrorCode(-1):
  123. assert bytesCount > 0
  124. assert bytesCount <= size
  125. f.offset.inc bytesCount
  126. retFuture.complete(bytesCount)
  127. else:
  128. if errcode.int32 == ERROR_HANDLE_EOF:
  129. retFuture.complete(0)
  130. else:
  131. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  132. )
  133. ol.offset = DWORD(f.offset and 0xffffffff)
  134. ol.offsetHigh = DWORD(f.offset shr 32)
  135. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  136. let ret = readFile(f.fd.Handle, buf, size.int32, nil,
  137. cast[POVERLAPPED](ol))
  138. if not ret.bool:
  139. let err = osLastError()
  140. if err.int32 != ERROR_IO_PENDING:
  141. GC_unref(ol)
  142. if err.int32 == ERROR_HANDLE_EOF:
  143. # This happens in Windows Server 2003
  144. retFuture.complete(0)
  145. else:
  146. retFuture.fail(newException(OSError, osErrorMsg(err)))
  147. else:
  148. # Request completed immediately.
  149. var bytesRead: DWORD
  150. let overlappedRes = getOverlappedResult(f.fd.Handle,
  151. cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
  152. if not overlappedRes.bool:
  153. let err = osLastError()
  154. if err.int32 == ERROR_HANDLE_EOF:
  155. retFuture.complete(0)
  156. else:
  157. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  158. else:
  159. assert bytesRead > 0
  160. assert bytesRead <= size
  161. f.offset.inc bytesRead
  162. retFuture.complete(bytesRead)
  163. else:
  164. proc cb(fd: AsyncFD): bool =
  165. result = true
  166. let res = read(fd.cint, cast[cstring](buf), size.cint)
  167. if res < 0:
  168. let lastError = osLastError()
  169. if lastError.int32 != EAGAIN:
  170. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  171. else:
  172. result = false # We still want this callback to be called.
  173. elif res == 0:
  174. # EOF
  175. retFuture.complete(0)
  176. else:
  177. f.offset.inc(res)
  178. retFuture.complete(res)
  179. if not cb(f.fd):
  180. addRead(f.fd, cb)
  181. return retFuture
  182. proc read*(f: AsyncFile, size: int): Future[string] =
  183. ## Read ``size`` bytes from the specified file asynchronously starting at
  184. ## the current position of the file pointer.
  185. ##
  186. ## If the file pointer is past the end of the file then an empty string is
  187. ## returned.
  188. var retFuture = newFuture[string]("asyncfile.read")
  189. when defined(windows) or defined(nimdoc):
  190. var buffer = alloc0(size)
  191. var ol = newCustom()
  192. ol.data = CompletionData(fd: f.fd, cb:
  193. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  194. if not retFuture.finished:
  195. if errcode == OSErrorCode(-1):
  196. assert bytesCount > 0
  197. assert bytesCount <= size
  198. var data = newString(bytesCount)
  199. copyMem(addr data[0], buffer, bytesCount)
  200. f.offset.inc bytesCount
  201. retFuture.complete($data)
  202. else:
  203. if errcode.int32 == ERROR_HANDLE_EOF:
  204. retFuture.complete("")
  205. else:
  206. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  207. if buffer != nil:
  208. dealloc buffer
  209. buffer = nil
  210. )
  211. ol.offset = DWORD(f.offset and 0xffffffff)
  212. ol.offsetHigh = DWORD(f.offset shr 32)
  213. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  214. let ret = readFile(f.fd.Handle, buffer, size.int32, nil,
  215. cast[POVERLAPPED](ol))
  216. if not ret.bool:
  217. let err = osLastError()
  218. if err.int32 != ERROR_IO_PENDING:
  219. if buffer != nil:
  220. dealloc buffer
  221. buffer = nil
  222. GC_unref(ol)
  223. if err.int32 == ERROR_HANDLE_EOF:
  224. # This happens in Windows Server 2003
  225. retFuture.complete("")
  226. else:
  227. retFuture.fail(newException(OSError, osErrorMsg(err)))
  228. else:
  229. # Request completed immediately.
  230. var bytesRead: DWORD
  231. let overlappedRes = getOverlappedResult(f.fd.Handle,
  232. cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
  233. if not overlappedRes.bool:
  234. let err = osLastError()
  235. if err.int32 == ERROR_HANDLE_EOF:
  236. retFuture.complete("")
  237. else:
  238. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  239. else:
  240. assert bytesRead > 0
  241. assert bytesRead <= size
  242. var data = newString(bytesRead)
  243. copyMem(addr data[0], buffer, bytesRead)
  244. f.offset.inc bytesRead
  245. retFuture.complete($data)
  246. else:
  247. var readBuffer = newString(size)
  248. proc cb(fd: AsyncFD): bool =
  249. result = true
  250. let res = read(fd.cint, addr readBuffer[0], size.cint)
  251. if res < 0:
  252. let lastError = osLastError()
  253. if lastError.int32 != EAGAIN:
  254. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  255. else:
  256. result = false # We still want this callback to be called.
  257. elif res == 0:
  258. # EOF
  259. f.offset = lseek(fd.cint, 0, SEEK_CUR)
  260. retFuture.complete("")
  261. else:
  262. readBuffer.setLen(res)
  263. f.offset.inc(res)
  264. retFuture.complete(readBuffer)
  265. if not cb(f.fd):
  266. addRead(f.fd, cb)
  267. return retFuture
  268. proc readLine*(f: AsyncFile): Future[string] {.async.} =
  269. ## Reads a single line from the specified file asynchronously.
  270. result = ""
  271. while true:
  272. var c = await read(f, 1)
  273. if c[0] == '\c':
  274. c = await read(f, 1)
  275. break
  276. if c[0] == '\L' or c == "":
  277. break
  278. else:
  279. result.add(c)
  280. proc getFilePos*(f: AsyncFile): int64 =
  281. ## Retrieves the current position of the file pointer that is
  282. ## used to read from the specified file. The file's first byte has the
  283. ## index zero.
  284. f.offset
  285. proc setFilePos*(f: AsyncFile, pos: int64) =
  286. ## Sets the position of the file pointer that is used for read/write
  287. ## operations. The file's first byte has the index zero.
  288. f.offset = pos
  289. when not defined(windows) and not defined(nimdoc):
  290. let ret = lseek(f.fd.cint, pos.Off, SEEK_SET)
  291. if ret == -1:
  292. raiseOSError(osLastError())
  293. proc readAll*(f: AsyncFile): Future[string] {.async.} =
  294. ## Reads all data from the specified file.
  295. result = ""
  296. while true:
  297. let data = await read(f, 4000)
  298. if data.len == 0:
  299. return
  300. result.add data
  301. proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
  302. ## Writes ``size`` bytes from ``buf`` to the file specified asynchronously.
  303. ##
  304. ## The returned Future will complete once all data has been written to the
  305. ## specified file.
  306. var retFuture = newFuture[void]("asyncfile.writeBuffer")
  307. when defined(windows) or defined(nimdoc):
  308. var ol = newCustom()
  309. ol.data = CompletionData(fd: f.fd, cb:
  310. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  311. if not retFuture.finished:
  312. if errcode == OSErrorCode(-1):
  313. assert bytesCount == size.int32
  314. retFuture.complete()
  315. else:
  316. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  317. )
  318. # passing -1 here should work according to MSDN, but doesn't. For more
  319. # information see
  320. # http://stackoverflow.com/questions/33650899/does-asynchronous-file-
  321. # appending-in-windows-preserve-order
  322. ol.offset = DWORD(f.offset and 0xffffffff)
  323. ol.offsetHigh = DWORD(f.offset shr 32)
  324. f.offset.inc(size)
  325. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  326. let ret = writeFile(f.fd.Handle, buf, size.int32, nil,
  327. cast[POVERLAPPED](ol))
  328. if not ret.bool:
  329. let err = osLastError()
  330. if err.int32 != ERROR_IO_PENDING:
  331. GC_unref(ol)
  332. retFuture.fail(newException(OSError, osErrorMsg(err)))
  333. else:
  334. # Request completed immediately.
  335. var bytesWritten: DWORD
  336. let overlappedRes = getOverlappedResult(f.fd.Handle,
  337. cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
  338. if not overlappedRes.bool:
  339. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  340. else:
  341. assert bytesWritten == size.int32
  342. retFuture.complete()
  343. else:
  344. var written = 0
  345. proc cb(fd: AsyncFD): bool =
  346. result = true
  347. let remainderSize = size - written
  348. var cbuf = cast[cstring](buf)
  349. let res = write(fd.cint, addr cbuf[written], remainderSize.cint)
  350. if res < 0:
  351. let lastError = osLastError()
  352. if lastError.int32 != EAGAIN:
  353. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  354. else:
  355. result = false # We still want this callback to be called.
  356. else:
  357. written.inc res
  358. f.offset.inc res
  359. if res != remainderSize:
  360. result = false # We still have data to write.
  361. else:
  362. retFuture.complete()
  363. if not cb(f.fd):
  364. addWrite(f.fd, cb)
  365. return retFuture
  366. proc write*(f: AsyncFile, data: string): Future[void] =
  367. ## Writes ``data`` to the file specified asynchronously.
  368. ##
  369. ## The returned Future will complete once all data has been written to the
  370. ## specified file.
  371. var retFuture = newFuture[void]("asyncfile.write")
  372. var copy = data
  373. when defined(windows) or defined(nimdoc):
  374. var buffer = alloc0(data.len)
  375. copyMem(buffer, copy.cstring, data.len)
  376. var ol = newCustom()
  377. ol.data = CompletionData(fd: f.fd, cb:
  378. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  379. if not retFuture.finished:
  380. if errcode == OSErrorCode(-1):
  381. assert bytesCount == data.len.int32
  382. retFuture.complete()
  383. else:
  384. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  385. if buffer != nil:
  386. dealloc buffer
  387. buffer = nil
  388. )
  389. ol.offset = DWORD(f.offset and 0xffffffff)
  390. ol.offsetHigh = DWORD(f.offset shr 32)
  391. f.offset.inc(data.len)
  392. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  393. let ret = writeFile(f.fd.Handle, buffer, data.len.int32, nil,
  394. cast[POVERLAPPED](ol))
  395. if not ret.bool:
  396. let err = osLastError()
  397. if err.int32 != ERROR_IO_PENDING:
  398. if buffer != nil:
  399. dealloc buffer
  400. buffer = nil
  401. GC_unref(ol)
  402. retFuture.fail(newException(OSError, osErrorMsg(err)))
  403. else:
  404. # Request completed immediately.
  405. var bytesWritten: DWORD
  406. let overlappedRes = getOverlappedResult(f.fd.Handle,
  407. cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
  408. if not overlappedRes.bool:
  409. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  410. else:
  411. assert bytesWritten == data.len.int32
  412. retFuture.complete()
  413. else:
  414. var written = 0
  415. proc cb(fd: AsyncFD): bool =
  416. result = true
  417. let remainderSize = data.len - written
  418. let res =
  419. if data.len == 0:
  420. write(fd.cint, copy.cstring, 0)
  421. else:
  422. write(fd.cint, addr copy[written], remainderSize.cint)
  423. if res < 0:
  424. let lastError = osLastError()
  425. if lastError.int32 != EAGAIN:
  426. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  427. else:
  428. result = false # We still want this callback to be called.
  429. else:
  430. written.inc res
  431. f.offset.inc res
  432. if res != remainderSize:
  433. result = false # We still have data to write.
  434. else:
  435. retFuture.complete()
  436. if not cb(f.fd):
  437. addWrite(f.fd, cb)
  438. return retFuture
  439. proc setFileSize*(f: AsyncFile, length: int64) =
  440. ## Set a file length.
  441. when defined(windows) or defined(nimdoc):
  442. var
  443. high = (length shr 32).DWORD
  444. let
  445. low = (length and 0xffffffff).DWORD
  446. status = setFilePointer(f.fd.Handle, low, addr high, 0)
  447. lastErr = osLastError()
  448. if (status == INVALID_SET_FILE_POINTER and lastErr.int32 != NO_ERROR) or
  449. (setEndOfFile(f.fd.Handle) == 0):
  450. raiseOSError(osLastError())
  451. else:
  452. # will truncate if Off is a 32-bit type!
  453. if ftruncate(f.fd.cint, length.Off) == -1:
  454. raiseOSError(osLastError())
  455. proc close*(f: AsyncFile) =
  456. ## Closes the file specified.
  457. unregister(f.fd)
  458. when defined(windows) or defined(nimdoc):
  459. if not closeHandle(f.fd.Handle).bool:
  460. raiseOSError(osLastError())
  461. else:
  462. if close(f.fd.cint) == -1:
  463. raiseOSError(osLastError())
  464. proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  465. ## Reads data from the specified future stream until it is completed.
  466. ## The data which is read is written to the file immediately and
  467. ## freed from memory.
  468. ##
  469. ## This procedure is perfect for saving streamed data to a file without
  470. ## wasting memory.
  471. while true:
  472. let (hasValue, value) = await fs.read()
  473. if hasValue:
  474. await f.write(value)
  475. else:
  476. break
  477. proc readToStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  478. ## Writes data to the specified future stream as the file is read.
  479. while true:
  480. let data = await read(f, 4000)
  481. if data.len == 0:
  482. break
  483. await fs.write(data)
  484. fs.complete()