asyncfile.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous file reading and writing.
  10. ##
  11. ## .. code-block:: Nim
  12. ## import std/[asyncfile, asyncdispatch, os]
  13. ##
  14. ## proc main() {.async.} =
  15. ## var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite)
  16. ## await file.write("test")
  17. ## file.setFilePos(0)
  18. ## let data = await file.readAll()
  19. ## doAssert data == "test"
  20. ## file.close()
  21. ##
  22. ## waitFor main()
  23. import asyncdispatch, os
  24. when defined(nimPreviewSlimSystem):
  25. import std/[assertions, syncio]
  26. when defined(windows):
  27. import std/widestrs
  28. # TODO: Fix duplication introduced by PR #4683.
  29. when defined(windows) or defined(nimdoc):
  30. import winlean
  31. else:
  32. import posix
  33. type
  34. AsyncFile* = ref object
  35. fd: AsyncFD
  36. offset: int64
  37. when defined(windows) or defined(nimdoc):
  38. proc getDesiredAccess(mode: FileMode): int32 =
  39. case mode
  40. of fmRead:
  41. result = GENERIC_READ
  42. of fmWrite, fmAppend:
  43. result = GENERIC_WRITE
  44. of fmReadWrite, fmReadWriteExisting:
  45. result = GENERIC_READ or GENERIC_WRITE
  46. proc getCreationDisposition(mode: FileMode, filename: string): int32 =
  47. case mode
  48. of fmRead, fmReadWriteExisting:
  49. OPEN_EXISTING
  50. of fmReadWrite, fmWrite:
  51. CREATE_ALWAYS
  52. of fmAppend:
  53. OPEN_ALWAYS
  54. else:
  55. proc getPosixFlags(mode: FileMode): cint =
  56. case mode
  57. of fmRead:
  58. result = O_RDONLY
  59. of fmWrite:
  60. result = O_WRONLY or O_CREAT or O_TRUNC
  61. of fmAppend:
  62. result = O_WRONLY or O_CREAT or O_APPEND
  63. of fmReadWrite:
  64. result = O_RDWR or O_CREAT or O_TRUNC
  65. of fmReadWriteExisting:
  66. result = O_RDWR
  67. result = result or O_NONBLOCK
  68. proc getFileSize*(f: AsyncFile): int64 =
  69. ## Retrieves the specified file's size.
  70. when defined(windows) or defined(nimdoc):
  71. var high: DWORD
  72. let low = getFileSize(f.fd.Handle, addr high)
  73. if low == INVALID_FILE_SIZE:
  74. raiseOSError(osLastError())
  75. result = (high shl 32) or low
  76. else:
  77. let curPos = lseek(f.fd.cint, 0, SEEK_CUR)
  78. result = lseek(f.fd.cint, 0, SEEK_END)
  79. f.offset = lseek(f.fd.cint, curPos, SEEK_SET)
  80. assert(f.offset == curPos)
  81. proc newAsyncFile*(fd: AsyncFD): AsyncFile =
  82. ## Creates `AsyncFile` with a previously opened file descriptor `fd`.
  83. new result
  84. result.fd = fd
  85. register(fd)
  86. proc openAsync*(filename: string, mode = fmRead): AsyncFile =
  87. ## Opens a file specified by the path in `filename` using
  88. ## the specified FileMode `mode` asynchronously.
  89. when defined(windows) or defined(nimdoc):
  90. let flags = FILE_FLAG_OVERLAPPED or FILE_ATTRIBUTE_NORMAL
  91. let desiredAccess = getDesiredAccess(mode)
  92. let creationDisposition = getCreationDisposition(mode, filename)
  93. when useWinUnicode:
  94. let fd = createFileW(newWideCString(filename), desiredAccess,
  95. FILE_SHARE_READ,
  96. nil, creationDisposition, flags, 0)
  97. else:
  98. let fd = createFileA(filename, desiredAccess,
  99. FILE_SHARE_READ,
  100. nil, creationDisposition, flags, 0)
  101. if fd == INVALID_HANDLE_VALUE:
  102. raiseOSError(osLastError())
  103. result = newAsyncFile(fd.AsyncFD)
  104. if mode == fmAppend:
  105. result.offset = getFileSize(result)
  106. else:
  107. let flags = getPosixFlags(mode)
  108. # RW (Owner), RW (Group), R (Other)
  109. let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH
  110. let fd = open(filename, flags, perm)
  111. if fd == -1:
  112. raiseOSError(osLastError())
  113. result = newAsyncFile(fd.AsyncFD)
  114. proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
  115. ## Read `size` bytes from the specified file asynchronously starting at
  116. ## the current position of the file pointer.
  117. ##
  118. ## If the file pointer is past the end of the file then zero is returned
  119. ## and no bytes are read into `buf`
  120. var retFuture = newFuture[int]("asyncfile.readBuffer")
  121. when defined(windows) or defined(nimdoc):
  122. var ol = newCustom()
  123. ol.data = CompletionData(fd: f.fd, cb:
  124. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  125. if not retFuture.finished:
  126. if errcode == OSErrorCode(-1):
  127. assert bytesCount > 0
  128. assert bytesCount <= size
  129. f.offset.inc bytesCount
  130. retFuture.complete(bytesCount)
  131. else:
  132. if errcode.int32 == ERROR_HANDLE_EOF:
  133. retFuture.complete(0)
  134. else:
  135. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  136. )
  137. ol.offset = DWORD(f.offset and 0xffffffff)
  138. ol.offsetHigh = DWORD(f.offset shr 32)
  139. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  140. let ret = readFile(f.fd.Handle, buf, size.int32, nil,
  141. cast[POVERLAPPED](ol))
  142. if not ret.bool:
  143. let err = osLastError()
  144. if err.int32 != ERROR_IO_PENDING:
  145. GC_unref(ol)
  146. if err.int32 == ERROR_HANDLE_EOF:
  147. # This happens in Windows Server 2003
  148. retFuture.complete(0)
  149. else:
  150. retFuture.fail(newException(OSError, osErrorMsg(err)))
  151. else:
  152. # Request completed immediately.
  153. var bytesRead: DWORD
  154. let overlappedRes = getOverlappedResult(f.fd.Handle,
  155. cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
  156. if not overlappedRes.bool:
  157. let err = osLastError()
  158. if err.int32 == ERROR_HANDLE_EOF:
  159. retFuture.complete(0)
  160. else:
  161. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  162. else:
  163. assert bytesRead > 0
  164. assert bytesRead <= size
  165. f.offset.inc bytesRead
  166. retFuture.complete(bytesRead)
  167. else:
  168. proc cb(fd: AsyncFD): bool =
  169. result = true
  170. let res = read(fd.cint, cast[cstring](buf), size.cint)
  171. if res < 0:
  172. let lastError = osLastError()
  173. if lastError.int32 != EAGAIN:
  174. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  175. else:
  176. result = false # We still want this callback to be called.
  177. elif res == 0:
  178. # EOF
  179. retFuture.complete(0)
  180. else:
  181. f.offset.inc(res)
  182. retFuture.complete(res)
  183. if not cb(f.fd):
  184. addRead(f.fd, cb)
  185. return retFuture
  186. proc read*(f: AsyncFile, size: int): Future[string] =
  187. ## Read `size` bytes from the specified file asynchronously starting at
  188. ## the current position of the file pointer. `size` should be greater than zero.
  189. ##
  190. ## If the file pointer is past the end of the file then an empty string is
  191. ## returned.
  192. assert size > 0
  193. var retFuture = newFuture[string]("asyncfile.read")
  194. when defined(windows) or defined(nimdoc):
  195. var buffer = alloc0(size)
  196. var ol = newCustom()
  197. ol.data = CompletionData(fd: f.fd, cb:
  198. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  199. if not retFuture.finished:
  200. if errcode == OSErrorCode(-1):
  201. assert bytesCount > 0
  202. assert bytesCount <= size
  203. var data = newString(bytesCount)
  204. copyMem(addr data[0], buffer, bytesCount)
  205. f.offset.inc bytesCount
  206. retFuture.complete($data)
  207. else:
  208. if errcode.int32 == ERROR_HANDLE_EOF:
  209. retFuture.complete("")
  210. else:
  211. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  212. if buffer != nil:
  213. dealloc buffer
  214. buffer = nil
  215. )
  216. ol.offset = DWORD(f.offset and 0xffffffff)
  217. ol.offsetHigh = DWORD(f.offset shr 32)
  218. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  219. let ret = readFile(f.fd.Handle, buffer, size.int32, nil,
  220. cast[POVERLAPPED](ol))
  221. if not ret.bool:
  222. let err = osLastError()
  223. if err.int32 != ERROR_IO_PENDING:
  224. if buffer != nil:
  225. dealloc buffer
  226. buffer = nil
  227. GC_unref(ol)
  228. if err.int32 == ERROR_HANDLE_EOF:
  229. # This happens in Windows Server 2003
  230. retFuture.complete("")
  231. else:
  232. retFuture.fail(newException(OSError, osErrorMsg(err)))
  233. else:
  234. # Request completed immediately.
  235. var bytesRead: DWORD
  236. let overlappedRes = getOverlappedResult(f.fd.Handle,
  237. cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
  238. if not overlappedRes.bool:
  239. let err = osLastError()
  240. if err.int32 == ERROR_HANDLE_EOF:
  241. retFuture.complete("")
  242. else:
  243. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  244. else:
  245. assert bytesRead > 0
  246. assert bytesRead <= size
  247. var data = newString(bytesRead)
  248. copyMem(addr data[0], buffer, bytesRead)
  249. f.offset.inc bytesRead
  250. retFuture.complete($data)
  251. else:
  252. var readBuffer = newString(size)
  253. proc cb(fd: AsyncFD): bool =
  254. result = true
  255. let res = read(fd.cint, addr readBuffer[0], size.cint)
  256. if res < 0:
  257. let lastError = osLastError()
  258. if lastError.int32 != EAGAIN:
  259. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  260. else:
  261. result = false # We still want this callback to be called.
  262. elif res == 0:
  263. # EOF
  264. f.offset = lseek(fd.cint, 0, SEEK_CUR)
  265. retFuture.complete("")
  266. else:
  267. readBuffer.setLen(res)
  268. f.offset.inc(res)
  269. retFuture.complete(readBuffer)
  270. if not cb(f.fd):
  271. addRead(f.fd, cb)
  272. return retFuture
  273. proc readLine*(f: AsyncFile): Future[string] {.async.} =
  274. ## Reads a single line from the specified file asynchronously.
  275. result = ""
  276. while true:
  277. var c = await read(f, 1)
  278. if c[0] == '\c':
  279. c = await read(f, 1)
  280. break
  281. if c[0] == '\L' or c == "":
  282. break
  283. else:
  284. result.add(c)
  285. proc getFilePos*(f: AsyncFile): int64 =
  286. ## Retrieves the current position of the file pointer that is
  287. ## used to read from the specified file. The file's first byte has the
  288. ## index zero.
  289. f.offset
  290. proc setFilePos*(f: AsyncFile, pos: int64) =
  291. ## Sets the position of the file pointer that is used for read/write
  292. ## operations. The file's first byte has the index zero.
  293. f.offset = pos
  294. when not defined(windows) and not defined(nimdoc):
  295. let ret = lseek(f.fd.cint, pos.Off, SEEK_SET)
  296. if ret == -1:
  297. raiseOSError(osLastError())
  298. proc readAll*(f: AsyncFile): Future[string] {.async.} =
  299. ## Reads all data from the specified file.
  300. result = ""
  301. while true:
  302. let data = await read(f, 4000)
  303. if data.len == 0:
  304. return
  305. result.add data
  306. proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
  307. ## Writes `size` bytes from `buf` to the file specified asynchronously.
  308. ##
  309. ## The returned Future will complete once all data has been written to the
  310. ## specified file.
  311. var retFuture = newFuture[void]("asyncfile.writeBuffer")
  312. when defined(windows) or defined(nimdoc):
  313. var ol = newCustom()
  314. ol.data = CompletionData(fd: f.fd, cb:
  315. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  316. if not retFuture.finished:
  317. if errcode == OSErrorCode(-1):
  318. assert bytesCount == size.int32
  319. retFuture.complete()
  320. else:
  321. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  322. )
  323. # passing -1 here should work according to MSDN, but doesn't. For more
  324. # information see
  325. # http://stackoverflow.com/questions/33650899/does-asynchronous-file-
  326. # appending-in-windows-preserve-order
  327. ol.offset = DWORD(f.offset and 0xffffffff)
  328. ol.offsetHigh = DWORD(f.offset shr 32)
  329. f.offset.inc(size)
  330. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  331. let ret = writeFile(f.fd.Handle, buf, size.int32, nil,
  332. cast[POVERLAPPED](ol))
  333. if not ret.bool:
  334. let err = osLastError()
  335. if err.int32 != ERROR_IO_PENDING:
  336. GC_unref(ol)
  337. retFuture.fail(newException(OSError, osErrorMsg(err)))
  338. else:
  339. # Request completed immediately.
  340. var bytesWritten: DWORD
  341. let overlappedRes = getOverlappedResult(f.fd.Handle,
  342. cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
  343. if not overlappedRes.bool:
  344. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  345. else:
  346. assert bytesWritten == size.int32
  347. retFuture.complete()
  348. else:
  349. var written = 0
  350. proc cb(fd: AsyncFD): bool =
  351. result = true
  352. let remainderSize = size - written
  353. var cbuf = cast[cstring](buf)
  354. let res = write(fd.cint, addr cbuf[written], remainderSize.cint)
  355. if res < 0:
  356. let lastError = osLastError()
  357. if lastError.int32 != EAGAIN:
  358. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  359. else:
  360. result = false # We still want this callback to be called.
  361. else:
  362. written.inc res
  363. f.offset.inc res
  364. if res != remainderSize:
  365. result = false # We still have data to write.
  366. else:
  367. retFuture.complete()
  368. if not cb(f.fd):
  369. addWrite(f.fd, cb)
  370. return retFuture
  371. proc write*(f: AsyncFile, data: string): Future[void] =
  372. ## Writes `data` to the file specified asynchronously.
  373. ##
  374. ## The returned Future will complete once all data has been written to the
  375. ## specified file.
  376. var retFuture = newFuture[void]("asyncfile.write")
  377. var copy = data
  378. when defined(windows) or defined(nimdoc):
  379. var buffer = alloc0(data.len)
  380. copyMem(buffer, copy.cstring, data.len)
  381. var ol = newCustom()
  382. ol.data = CompletionData(fd: f.fd, cb:
  383. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  384. if not retFuture.finished:
  385. if errcode == OSErrorCode(-1):
  386. assert bytesCount == data.len.int32
  387. retFuture.complete()
  388. else:
  389. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  390. if buffer != nil:
  391. dealloc buffer
  392. buffer = nil
  393. )
  394. ol.offset = DWORD(f.offset and 0xffffffff)
  395. ol.offsetHigh = DWORD(f.offset shr 32)
  396. f.offset.inc(data.len)
  397. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  398. let ret = writeFile(f.fd.Handle, buffer, data.len.int32, nil,
  399. cast[POVERLAPPED](ol))
  400. if not ret.bool:
  401. let err = osLastError()
  402. if err.int32 != ERROR_IO_PENDING:
  403. if buffer != nil:
  404. dealloc buffer
  405. buffer = nil
  406. GC_unref(ol)
  407. retFuture.fail(newException(OSError, osErrorMsg(err)))
  408. else:
  409. # Request completed immediately.
  410. var bytesWritten: DWORD
  411. let overlappedRes = getOverlappedResult(f.fd.Handle,
  412. cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
  413. if not overlappedRes.bool:
  414. retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
  415. else:
  416. assert bytesWritten == data.len.int32
  417. retFuture.complete()
  418. else:
  419. var written = 0
  420. proc cb(fd: AsyncFD): bool =
  421. result = true
  422. let remainderSize = data.len - written
  423. let res =
  424. if data.len == 0:
  425. write(fd.cint, copy.cstring, 0)
  426. else:
  427. write(fd.cint, addr copy[written], remainderSize.cint)
  428. if res < 0:
  429. let lastError = osLastError()
  430. if lastError.int32 != EAGAIN:
  431. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  432. else:
  433. result = false # We still want this callback to be called.
  434. else:
  435. written.inc res
  436. f.offset.inc res
  437. if res != remainderSize:
  438. result = false # We still have data to write.
  439. else:
  440. retFuture.complete()
  441. if not cb(f.fd):
  442. addWrite(f.fd, cb)
  443. return retFuture
  444. proc setFileSize*(f: AsyncFile, length: int64) =
  445. ## Set a file length.
  446. when defined(windows) or defined(nimdoc):
  447. var
  448. high = (length shr 32).DWORD
  449. let
  450. low = (length and 0xffffffff).DWORD
  451. status = setFilePointer(f.fd.Handle, low, addr high, 0)
  452. lastErr = osLastError()
  453. if (status == INVALID_SET_FILE_POINTER and lastErr.int32 != NO_ERROR) or
  454. (setEndOfFile(f.fd.Handle) == 0):
  455. raiseOSError(osLastError())
  456. else:
  457. # will truncate if Off is a 32-bit type!
  458. if ftruncate(f.fd.cint, length.Off) == -1:
  459. raiseOSError(osLastError())
  460. proc close*(f: AsyncFile) =
  461. ## Closes the file specified.
  462. unregister(f.fd)
  463. when defined(windows) or defined(nimdoc):
  464. if not closeHandle(f.fd.Handle).bool:
  465. raiseOSError(osLastError())
  466. else:
  467. if close(f.fd.cint) == -1:
  468. raiseOSError(osLastError())
  469. proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  470. ## Reads data from the specified future stream until it is completed.
  471. ## The data which is read is written to the file immediately and
  472. ## freed from memory.
  473. ##
  474. ## This procedure is perfect for saving streamed data to a file without
  475. ## wasting memory.
  476. while true:
  477. let (hasValue, value) = await fs.read()
  478. if hasValue:
  479. await f.write(value)
  480. else:
  481. break
  482. proc readToStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  483. ## Writes data to the specified future stream as the file is read.
  484. while true:
  485. let data = await read(f, 4000)
  486. if data.len == 0:
  487. break
  488. await fs.write(data)
  489. fs.complete()