asyncfile.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous file reading and writing.
  10. ##
  11. ## ```Nim
  12. ## import std/[asyncfile, asyncdispatch, os]
  13. ##
  14. ## proc main() {.async.} =
  15. ## var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite)
  16. ## await file.write("test")
  17. ## file.setFilePos(0)
  18. ## let data = await file.readAll()
  19. ## doAssert data == "test"
  20. ## file.close()
  21. ##
  22. ## waitFor main()
  23. ## ```
  24. import std/[asyncdispatch, os]
  25. when defined(nimPreviewSlimSystem):
  26. import std/[assertions, syncio]
  27. when defined(windows) or defined(nimdoc):
  28. import std/widestrs
  29. # TODO: Fix duplication introduced by PR #4683.
  30. when defined(windows) or defined(nimdoc):
  31. import std/winlean
  32. else:
  33. import std/posix
  34. type
  35. AsyncFile* = ref object
  36. fd: AsyncFD
  37. offset: int64
  38. when defined(windows) or defined(nimdoc):
  39. proc getDesiredAccess(mode: FileMode): int32 =
  40. case mode
  41. of fmRead:
  42. result = GENERIC_READ
  43. of fmWrite, fmAppend:
  44. result = GENERIC_WRITE
  45. of fmReadWrite, fmReadWriteExisting:
  46. result = GENERIC_READ or GENERIC_WRITE
  47. proc getCreationDisposition(mode: FileMode, filename: string): int32 =
  48. case mode
  49. of fmRead, fmReadWriteExisting:
  50. OPEN_EXISTING
  51. of fmReadWrite, fmWrite:
  52. CREATE_ALWAYS
  53. of fmAppend:
  54. OPEN_ALWAYS
  55. else:
  56. proc getPosixFlags(mode: FileMode): cint =
  57. case mode
  58. of fmRead:
  59. result = O_RDONLY
  60. of fmWrite:
  61. result = O_WRONLY or O_CREAT or O_TRUNC
  62. of fmAppend:
  63. result = O_WRONLY or O_CREAT or O_APPEND
  64. of fmReadWrite:
  65. result = O_RDWR or O_CREAT or O_TRUNC
  66. of fmReadWriteExisting:
  67. result = O_RDWR
  68. result = result or O_NONBLOCK
  69. proc getFileSize*(f: AsyncFile): int64 =
  70. ## Retrieves the specified file's size.
  71. when defined(windows) or defined(nimdoc):
  72. var high: DWORD
  73. let low = getFileSize(f.fd.Handle, addr high)
  74. if low == INVALID_FILE_SIZE:
  75. raiseOSError(osLastError())
  76. result = (high shl 32) or low
  77. else:
  78. let curPos = lseek(f.fd.cint, 0, SEEK_CUR)
  79. result = lseek(f.fd.cint, 0, SEEK_END)
  80. f.offset = lseek(f.fd.cint, curPos, SEEK_SET)
  81. assert(f.offset == curPos)
  82. proc newAsyncFile*(fd: AsyncFD): AsyncFile =
  83. ## Creates `AsyncFile` with a previously opened file descriptor `fd`.
  84. new result
  85. result.fd = fd
  86. register(fd)
  87. proc openAsync*(filename: string, mode = fmRead): AsyncFile =
  88. ## Opens a file specified by the path in `filename` using
  89. ## the specified FileMode `mode` asynchronously.
  90. when defined(windows) or defined(nimdoc):
  91. let flags = FILE_FLAG_OVERLAPPED or FILE_ATTRIBUTE_NORMAL
  92. let desiredAccess = getDesiredAccess(mode)
  93. let creationDisposition = getCreationDisposition(mode, filename)
  94. let fd = createFileW(newWideCString(filename), desiredAccess,
  95. FILE_SHARE_READ,
  96. nil, creationDisposition, flags, 0)
  97. if fd == INVALID_HANDLE_VALUE:
  98. raiseOSError(osLastError())
  99. result = newAsyncFile(fd.AsyncFD)
  100. if mode == fmAppend:
  101. result.offset = getFileSize(result)
  102. else:
  103. let flags = getPosixFlags(mode)
  104. # RW (Owner), RW (Group), R (Other)
  105. let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH
  106. let fd = open(filename, flags, perm)
  107. if fd == -1:
  108. raiseOSError(osLastError())
  109. result = newAsyncFile(fd.AsyncFD)
  110. proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
  111. ## Read `size` bytes from the specified file asynchronously starting at
  112. ## the current position of the file pointer.
  113. ##
  114. ## If the file pointer is past the end of the file then zero is returned
  115. ## and no bytes are read into `buf`
  116. var retFuture = newFuture[int]("asyncfile.readBuffer")
  117. when defined(windows) or defined(nimdoc):
  118. var ol = newCustom()
  119. ol.data = CompletionData(fd: f.fd, cb:
  120. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  121. if not retFuture.finished:
  122. if errcode == OSErrorCode(-1):
  123. assert bytesCount > 0
  124. assert bytesCount <= size
  125. f.offset.inc bytesCount
  126. retFuture.complete(bytesCount)
  127. else:
  128. if errcode.int32 == ERROR_HANDLE_EOF:
  129. retFuture.complete(0)
  130. else:
  131. retFuture.fail(newOSError(errcode))
  132. )
  133. ol.offset = DWORD(f.offset and 0xffffffff)
  134. ol.offsetHigh = DWORD(f.offset shr 32)
  135. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  136. let ret = readFile(f.fd.Handle, buf, size.int32, nil,
  137. cast[POVERLAPPED](ol))
  138. if not ret.bool:
  139. let err = osLastError()
  140. if err.int32 != ERROR_IO_PENDING:
  141. GC_unref(ol)
  142. if err.int32 == ERROR_HANDLE_EOF:
  143. # This happens in Windows Server 2003
  144. retFuture.complete(0)
  145. else:
  146. retFuture.fail(newOSError(err))
  147. else:
  148. # Request completed immediately.
  149. var bytesRead: DWORD
  150. let overlappedRes = getOverlappedResult(f.fd.Handle,
  151. cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
  152. if not overlappedRes.bool:
  153. let err = osLastError()
  154. if err.int32 == ERROR_HANDLE_EOF:
  155. retFuture.complete(0)
  156. else:
  157. retFuture.fail(newOSError(osLastError()))
  158. else:
  159. assert bytesRead > 0
  160. assert bytesRead <= size
  161. f.offset.inc bytesRead
  162. retFuture.complete(bytesRead)
  163. else:
  164. proc cb(fd: AsyncFD): bool =
  165. result = true
  166. let res = read(fd.cint, cast[cstring](buf), size.cint)
  167. if res < 0:
  168. let lastError = osLastError()
  169. if lastError.int32 != EAGAIN:
  170. retFuture.fail(newOSError(lastError))
  171. else:
  172. result = false # We still want this callback to be called.
  173. elif res == 0:
  174. # EOF
  175. retFuture.complete(0)
  176. else:
  177. f.offset.inc(res)
  178. retFuture.complete(res)
  179. if not cb(f.fd):
  180. addRead(f.fd, cb)
  181. return retFuture
  182. proc read*(f: AsyncFile, size: int): Future[string] =
  183. ## Read `size` bytes from the specified file asynchronously starting at
  184. ## the current position of the file pointer. `size` should be greater than zero.
  185. ##
  186. ## If the file pointer is past the end of the file then an empty string is
  187. ## returned.
  188. assert size > 0
  189. var retFuture = newFuture[string]("asyncfile.read")
  190. when defined(windows) or defined(nimdoc):
  191. var buffer = alloc0(size)
  192. var ol = newCustom()
  193. ol.data = CompletionData(fd: f.fd, cb:
  194. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  195. if not retFuture.finished:
  196. if errcode == OSErrorCode(-1):
  197. assert bytesCount > 0
  198. assert bytesCount <= size
  199. var data = newString(bytesCount)
  200. copyMem(addr data[0], buffer, bytesCount)
  201. f.offset.inc bytesCount
  202. retFuture.complete($data)
  203. else:
  204. if errcode.int32 == ERROR_HANDLE_EOF:
  205. retFuture.complete("")
  206. else:
  207. retFuture.fail(newOSError(errcode))
  208. if buffer != nil:
  209. dealloc buffer
  210. buffer = nil
  211. )
  212. ol.offset = DWORD(f.offset and 0xffffffff)
  213. ol.offsetHigh = DWORD(f.offset shr 32)
  214. # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
  215. let ret = readFile(f.fd.Handle, buffer, size.int32, nil,
  216. cast[POVERLAPPED](ol))
  217. if not ret.bool:
  218. let err = osLastError()
  219. if err.int32 != ERROR_IO_PENDING:
  220. if buffer != nil:
  221. dealloc buffer
  222. buffer = nil
  223. GC_unref(ol)
  224. if err.int32 == ERROR_HANDLE_EOF:
  225. # This happens in Windows Server 2003
  226. retFuture.complete("")
  227. else:
  228. retFuture.fail(newOSError(err))
  229. else:
  230. # Request completed immediately.
  231. var bytesRead: DWORD
  232. let overlappedRes = getOverlappedResult(f.fd.Handle,
  233. cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
  234. if not overlappedRes.bool:
  235. let err = osLastError()
  236. if err.int32 == ERROR_HANDLE_EOF:
  237. retFuture.complete("")
  238. else:
  239. retFuture.fail(newOSError(osLastError()))
  240. else:
  241. assert bytesRead > 0
  242. assert bytesRead <= size
  243. var data = newString(bytesRead)
  244. copyMem(addr data[0], buffer, bytesRead)
  245. f.offset.inc bytesRead
  246. retFuture.complete($data)
  247. else:
  248. var readBuffer = newString(size)
  249. proc cb(fd: AsyncFD): bool =
  250. result = true
  251. let res = read(fd.cint, addr readBuffer[0], size.cint)
  252. if res < 0:
  253. let lastError = osLastError()
  254. if lastError.int32 != EAGAIN:
  255. retFuture.fail(newOSError(lastError))
  256. else:
  257. result = false # We still want this callback to be called.
  258. elif res == 0:
  259. # EOF
  260. f.offset = lseek(fd.cint, 0, SEEK_CUR)
  261. retFuture.complete("")
  262. else:
  263. readBuffer.setLen(res)
  264. f.offset.inc(res)
  265. retFuture.complete(readBuffer)
  266. if not cb(f.fd):
  267. addRead(f.fd, cb)
  268. return retFuture
  269. proc readLine*(f: AsyncFile): Future[string] {.async.} =
  270. ## Reads a single line from the specified file asynchronously.
  271. result = ""
  272. while true:
  273. var c = await read(f, 1)
  274. if c.len == 0:
  275. break
  276. if c[0] == '\c':
  277. c = await read(f, 1)
  278. break
  279. if c[0] == '\L' or c == "":
  280. break
  281. else:
  282. result.add(c)
  283. proc getFilePos*(f: AsyncFile): int64 =
  284. ## Retrieves the current position of the file pointer that is
  285. ## used to read from the specified file. The file's first byte has the
  286. ## index zero.
  287. f.offset
  288. proc setFilePos*(f: AsyncFile, pos: int64) =
  289. ## Sets the position of the file pointer that is used for read/write
  290. ## operations. The file's first byte has the index zero.
  291. f.offset = pos
  292. when not defined(windows) and not defined(nimdoc):
  293. let ret = lseek(f.fd.cint, pos.Off, SEEK_SET)
  294. if ret == -1:
  295. raiseOSError(osLastError())
  296. proc readAll*(f: AsyncFile): Future[string] {.async.} =
  297. ## Reads all data from the specified file.
  298. result = ""
  299. while true:
  300. let data = await read(f, 4000)
  301. if data.len == 0:
  302. return
  303. result.add data
  304. proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
  305. ## Writes `size` bytes from `buf` to the file specified asynchronously.
  306. ##
  307. ## The returned Future will complete once all data has been written to the
  308. ## specified file.
  309. var retFuture = newFuture[void]("asyncfile.writeBuffer")
  310. when defined(windows) or defined(nimdoc):
  311. var ol = newCustom()
  312. ol.data = CompletionData(fd: f.fd, cb:
  313. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  314. if not retFuture.finished:
  315. if errcode == OSErrorCode(-1):
  316. assert bytesCount == size.int32
  317. retFuture.complete()
  318. else:
  319. retFuture.fail(newOSError(errcode))
  320. )
  321. # passing -1 here should work according to MSDN, but doesn't. For more
  322. # information see
  323. # http://stackoverflow.com/questions/33650899/does-asynchronous-file-
  324. # appending-in-windows-preserve-order
  325. ol.offset = DWORD(f.offset and 0xffffffff)
  326. ol.offsetHigh = DWORD(f.offset shr 32)
  327. f.offset.inc(size)
  328. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  329. let ret = writeFile(f.fd.Handle, buf, size.int32, nil,
  330. cast[POVERLAPPED](ol))
  331. if not ret.bool:
  332. let err = osLastError()
  333. if err.int32 != ERROR_IO_PENDING:
  334. GC_unref(ol)
  335. retFuture.fail(newOSError(err))
  336. else:
  337. # Request completed immediately.
  338. var bytesWritten: DWORD
  339. let overlappedRes = getOverlappedResult(f.fd.Handle,
  340. cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
  341. if not overlappedRes.bool:
  342. retFuture.fail(newOSError(osLastError()))
  343. else:
  344. assert bytesWritten == size.int32
  345. retFuture.complete()
  346. else:
  347. var written = 0
  348. proc cb(fd: AsyncFD): bool =
  349. result = true
  350. let remainderSize = size - written
  351. var cbuf = cast[cstring](buf)
  352. let res = write(fd.cint, addr cbuf[written], remainderSize.cint)
  353. if res < 0:
  354. let lastError = osLastError()
  355. if lastError.int32 != EAGAIN:
  356. retFuture.fail(newOSError(lastError))
  357. else:
  358. result = false # We still want this callback to be called.
  359. else:
  360. written.inc res
  361. f.offset.inc res
  362. if res != remainderSize:
  363. result = false # We still have data to write.
  364. else:
  365. retFuture.complete()
  366. if not cb(f.fd):
  367. addWrite(f.fd, cb)
  368. return retFuture
  369. proc write*(f: AsyncFile, data: string): Future[void] =
  370. ## Writes `data` to the file specified asynchronously.
  371. ##
  372. ## The returned Future will complete once all data has been written to the
  373. ## specified file.
  374. var retFuture = newFuture[void]("asyncfile.write")
  375. var copy = data
  376. when defined(windows) or defined(nimdoc):
  377. var buffer = alloc0(data.len)
  378. copyMem(buffer, copy.cstring, data.len)
  379. var ol = newCustom()
  380. ol.data = CompletionData(fd: f.fd, cb:
  381. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  382. if not retFuture.finished:
  383. if errcode == OSErrorCode(-1):
  384. assert bytesCount == data.len.int32
  385. retFuture.complete()
  386. else:
  387. retFuture.fail(newOSError(errcode))
  388. if buffer != nil:
  389. dealloc buffer
  390. buffer = nil
  391. )
  392. ol.offset = DWORD(f.offset and 0xffffffff)
  393. ol.offsetHigh = DWORD(f.offset shr 32)
  394. f.offset.inc(data.len)
  395. # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
  396. let ret = writeFile(f.fd.Handle, buffer, data.len.int32, nil,
  397. cast[POVERLAPPED](ol))
  398. if not ret.bool:
  399. let err = osLastError()
  400. if err.int32 != ERROR_IO_PENDING:
  401. if buffer != nil:
  402. dealloc buffer
  403. buffer = nil
  404. GC_unref(ol)
  405. retFuture.fail(newOSError(err))
  406. else:
  407. # Request completed immediately.
  408. var bytesWritten: DWORD
  409. let overlappedRes = getOverlappedResult(f.fd.Handle,
  410. cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
  411. if not overlappedRes.bool:
  412. retFuture.fail(newOSError(osLastError()))
  413. else:
  414. assert bytesWritten == data.len.int32
  415. retFuture.complete()
  416. else:
  417. var written = 0
  418. proc cb(fd: AsyncFD): bool =
  419. result = true
  420. let remainderSize = data.len - written
  421. let res =
  422. if data.len == 0:
  423. write(fd.cint, copy.cstring, 0)
  424. else:
  425. write(fd.cint, addr copy[written], remainderSize.cint)
  426. if res < 0:
  427. let lastError = osLastError()
  428. if lastError.int32 != EAGAIN:
  429. retFuture.fail(newOSError(lastError))
  430. else:
  431. result = false # We still want this callback to be called.
  432. else:
  433. written.inc res
  434. f.offset.inc res
  435. if res != remainderSize:
  436. result = false # We still have data to write.
  437. else:
  438. retFuture.complete()
  439. if not cb(f.fd):
  440. addWrite(f.fd, cb)
  441. return retFuture
  442. proc setFileSize*(f: AsyncFile, length: int64) =
  443. ## Set a file length.
  444. when defined(windows) or defined(nimdoc):
  445. var
  446. high = (length shr 32).DWORD
  447. let
  448. low = (length and 0xffffffff).DWORD
  449. status = setFilePointer(f.fd.Handle, low, addr high, 0)
  450. lastErr = osLastError()
  451. if (status == INVALID_SET_FILE_POINTER and lastErr.int32 != NO_ERROR) or
  452. (setEndOfFile(f.fd.Handle) == 0):
  453. raiseOSError(osLastError())
  454. else:
  455. # will truncate if Off is a 32-bit type!
  456. if ftruncate(f.fd.cint, length.Off) == -1:
  457. raiseOSError(osLastError())
  458. proc close*(f: AsyncFile) =
  459. ## Closes the file specified.
  460. unregister(f.fd)
  461. when defined(windows) or defined(nimdoc):
  462. if not closeHandle(f.fd.Handle).bool:
  463. raiseOSError(osLastError())
  464. else:
  465. if close(f.fd.cint) == -1:
  466. raiseOSError(osLastError())
  467. proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  468. ## Reads data from the specified future stream until it is completed.
  469. ## The data which is read is written to the file immediately and
  470. ## freed from memory.
  471. ##
  472. ## This procedure is perfect for saving streamed data to a file without
  473. ## wasting memory.
  474. while true:
  475. let (hasValue, value) = await fs.read()
  476. if hasValue:
  477. await f.write(value)
  478. else:
  479. break
  480. proc readToStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
  481. ## Writes data to the specified future stream as the file is read.
  482. while true:
  483. let data = await read(f, 4000)
  484. if data.len == 0:
  485. break
  486. await fs.write(data)
  487. fs.complete()