receive.go 33 KB


  1. // License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>
  2. package transfer
  3. import (
  4. "bytes"
  5. "compress/zlib"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/fs"
  10. "os"
  11. "path/filepath"
  12. "slices"
  13. "strconv"
  14. "strings"
  15. "time"
  16. "kitty"
  17. "kitty/kittens/unicode_input"
  18. "kitty/tools/cli/markup"
  19. "kitty/tools/rsync"
  20. "kitty/tools/tui"
  21. "kitty/tools/tui/loop"
  22. "kitty/tools/utils"
  23. "kitty/tools/utils/humanize"
  24. "kitty/tools/wcswidth"
  25. "golang.org/x/sys/unix"
  26. )
  27. var _ = fmt.Print
  28. type state int
  29. const (
  30. state_waiting_for_permission state = iota
  31. state_waiting_for_file_metadata
  32. state_transferring
  33. state_canceled
  34. )
  35. type output_file interface {
  36. write([]byte) (int, error)
  37. close() error
  38. tell() (int64, error)
  39. }
  40. type filesystem_file struct {
  41. f *os.File
  42. }
  43. func (ff *filesystem_file) tell() (int64, error) {
  44. return ff.f.Seek(0, io.SeekCurrent)
  45. }
  46. func (ff *filesystem_file) close() error {
  47. return ff.f.Close()
  48. }
  49. func (ff *filesystem_file) write(data []byte) (int, error) {
  50. n, err := ff.f.Write(data)
  51. if err == nil && n < len(data) {
  52. err = io.ErrShortWrite
  53. }
  54. return n, err
  55. }
  56. type patch_file struct {
  57. path string
  58. src, temp *os.File
  59. p *rsync.Patcher
  60. }
  61. func (pf *patch_file) tell() (int64, error) {
  62. if pf.temp == nil {
  63. s, err := os.Stat(pf.path)
  64. return s.Size(), err
  65. }
  66. return pf.temp.Seek(0, io.SeekCurrent)
  67. }
  68. func (pf *patch_file) close() (err error) {
  69. if pf.p == nil {
  70. return
  71. }
  72. err = pf.p.FinishDelta()
  73. pf.src.Close()
  74. pf.temp.Close()
  75. if err == nil {
  76. err = os.Rename(pf.temp.Name(), pf.src.Name())
  77. }
  78. pf.src = nil
  79. pf.temp = nil
  80. pf.p = nil
  81. return
  82. }
  83. func (pf *patch_file) write(data []byte) (int, error) {
  84. if err := pf.p.UpdateDelta(data); err == nil {
  85. return len(data), nil
  86. } else {
  87. return 0, err
  88. }
  89. }
  90. func new_patch_file(path string, p *rsync.Patcher) (ans *patch_file, err error) {
  91. ans = &patch_file{p: p, path: path}
  92. var f *os.File
  93. if f, err = os.Open(path); err != nil {
  94. return
  95. } else {
  96. ans.src = f
  97. }
  98. if f, err = os.CreateTemp(filepath.Dir(path), ""); err != nil {
  99. ans.src.Close()
  100. return
  101. } else {
  102. ans.temp = f
  103. }
  104. ans.p.StartDelta(ans.temp, ans.src)
  105. return
  106. }
  107. type remote_file struct {
  108. expected_size int64
  109. expect_diff bool
  110. patcher *rsync.Patcher
  111. transmit_started_at, done_at time.Time
  112. written_bytes int64
  113. received_bytes int64
  114. sent_bytes int64
  115. ftype FileType
  116. mtime time.Duration
  117. spec_id int
  118. permissions fs.FileMode
  119. remote_path string
  120. display_name string
  121. remote_id, remote_target string
  122. parent string
  123. expanded_local_path string
  124. file_id string
  125. decompressor utils.StreamDecompressor
  126. compression_type Compression
  127. remote_symlink_value string
  128. actual_file output_file
  129. }
  130. func (self *remote_file) close() (err error) {
  131. if self.decompressor != nil {
  132. err = self.decompressor(nil, true)
  133. self.decompressor = nil
  134. }
  135. if self.actual_file != nil {
  136. af := self.actual_file
  137. self.actual_file = nil
  138. cerr := af.close()
  139. if err == nil {
  140. err = cerr
  141. }
  142. }
  143. return
  144. }
  145. func (self *remote_file) Write(data []byte) (n int, err error) {
  146. switch self.ftype {
  147. default:
  148. return 0, fmt.Errorf("Cannot write data to files of type: %s", self.ftype)
  149. case FileType_symlink:
  150. self.remote_symlink_value += string(data)
  151. return len(data), nil
  152. case FileType_regular:
  153. if self.actual_file == nil {
  154. parent := filepath.Dir(self.expanded_local_path)
  155. if parent != "" {
  156. if err = os.MkdirAll(parent, 0o755); err != nil {
  157. return 0, err
  158. }
  159. }
  160. if self.expect_diff {
  161. if pf, err := new_patch_file(self.expanded_local_path, self.patcher); err != nil {
  162. return 0, err
  163. } else {
  164. self.actual_file = pf
  165. }
  166. } else {
  167. if ff, err := os.Create(self.expanded_local_path); err != nil {
  168. return 0, err
  169. } else {
  170. f := filesystem_file{f: ff}
  171. self.actual_file = &f
  172. }
  173. }
  174. }
  175. return self.actual_file.write(data)
  176. }
  177. }
  178. func (self *remote_file) write_data(data []byte, is_last bool) (amt_written int64, err error) {
  179. self.received_bytes += int64(len(data))
  180. var base, pos int64
  181. defer func() {
  182. if err != nil {
  183. err = fmt.Errorf("Failed writing to %s with error: %w", self.expanded_local_path, err)
  184. }
  185. }()
  186. if self.actual_file != nil {
  187. base, err = self.actual_file.tell()
  188. if err != nil {
  189. return 0, err
  190. }
  191. }
  192. err = self.decompressor(data, is_last)
  193. if is_last {
  194. self.decompressor = nil
  195. }
  196. if self.actual_file != nil && err == nil {
  197. pos, err = self.actual_file.tell()
  198. if err != nil {
  199. return 0, err
  200. }
  201. } else {
  202. pos = base
  203. }
  204. amt_written = pos - base
  205. if is_last && self.actual_file != nil {
  206. cerr := self.actual_file.close()
  207. if err == nil {
  208. err = cerr
  209. }
  210. self.actual_file = nil
  211. }
  212. return
  213. }
  214. func syscall_mode(i os.FileMode) (o uint32) {
  215. o |= uint32(i.Perm())
  216. if i&os.ModeSetuid != 0 {
  217. o |= unix.S_ISUID
  218. }
  219. if i&os.ModeSetgid != 0 {
  220. o |= unix.S_ISGID
  221. }
  222. if i&os.ModeSticky != 0 {
  223. o |= unix.S_ISVTX
  224. }
  225. // No mapping for Go's ModeTemporary (plan9 only).
  226. return
  227. }
  228. func (self *remote_file) apply_metadata() {
  229. t := unix.NsecToTimespec(int64(self.mtime))
  230. for {
  231. if err := unix.UtimesNanoAt(unix.AT_FDCWD, self.expanded_local_path, []unix.Timespec{t, t}, unix.AT_SYMLINK_NOFOLLOW); err == nil || !(errors.Is(err, unix.EINTR) || errors.Is(err, unix.EAGAIN)) {
  232. break
  233. }
  234. }
  235. if self.ftype == FileType_symlink {
  236. for {
  237. if err := unix.Fchmodat(unix.AT_FDCWD, self.expanded_local_path, syscall_mode(self.permissions), unix.AT_SYMLINK_NOFOLLOW); err == nil || !(errors.Is(err, unix.EINTR) || errors.Is(err, unix.EAGAIN)) {
  238. break
  239. }
  240. }
  241. } else {
  242. _ = os.Chmod(self.expanded_local_path, self.permissions)
  243. }
  244. }
  245. func new_remote_file(opts *Options, ftc *FileTransmissionCommand, file_id uint64) (*remote_file, error) {
  246. spec_id, err := strconv.Atoi(ftc.File_id)
  247. if err != nil {
  248. return nil, err
  249. }
  250. ans := &remote_file{
  251. expected_size: ftc.Size, ftype: ftc.Ftype, mtime: ftc.Mtime, spec_id: spec_id, file_id: strconv.FormatUint(file_id, 10),
  252. permissions: ftc.Permissions, remote_path: ftc.Name, display_name: wcswidth.StripEscapeCodes(ftc.Name),
  253. remote_id: ftc.Status, remote_target: string(ftc.Data), parent: ftc.Parent,
  254. }
  255. compression_capable := ftc.Ftype == FileType_regular && ftc.Size > 4096 && should_be_compressed(ftc.Name, opts.Compress)
  256. if compression_capable {
  257. ans.decompressor = utils.NewStreamDecompressor(zlib.NewReader, ans)
  258. ans.compression_type = Compression_zlib
  259. } else {
  260. ans.decompressor = utils.NewStreamDecompressor(nil, ans)
  261. ans.compression_type = Compression_none
  262. }
  263. return ans, nil
  264. }
  265. type receive_progress_tracker struct {
  266. total_size_of_all_files int64
  267. total_bytes_to_transfer int64
  268. total_transferred int64
  269. transfered_stats_amt int64
  270. transfered_stats_interval time.Duration
  271. started_at time.Time
  272. transfers []Transfer
  273. active_file *remote_file
  274. done_files []*remote_file
  275. }
  276. func (self *receive_progress_tracker) change_active_file(nf *remote_file) {
  277. now := time.Now()
  278. self.active_file = nf
  279. nf.transmit_started_at = now
  280. }
  281. func (self *receive_progress_tracker) start_transfer() {
  282. self.started_at = time.Now()
  283. self.transfers = append(self.transfers, Transfer{at: time.Now()})
  284. }
  285. func (self *receive_progress_tracker) file_written(af *remote_file, amt int64, is_done bool) {
  286. if self.active_file != af {
  287. self.change_active_file(af)
  288. }
  289. af.written_bytes += amt
  290. self.total_transferred += amt
  291. now := time.Now()
  292. self.transfers = append(self.transfers, Transfer{amt: amt, at: now})
  293. for len(self.transfers) > 2 && self.transfers[0].is_too_old(now) {
  294. utils.ShiftLeft(self.transfers, 1)
  295. }
  296. self.transfered_stats_interval = now.Sub(self.transfers[0].at)
  297. self.transfered_stats_amt = 0
  298. for _, t := range self.transfers {
  299. self.transfered_stats_amt += t.amt
  300. }
  301. if is_done {
  302. af.done_at = now
  303. self.done_files = append(self.done_files, af)
  304. }
  305. }
  306. type manager struct {
  307. request_id string
  308. file_id_counter uint64
  309. cli_opts *Options
  310. spec []string
  311. dest string
  312. bypass string
  313. use_rsync bool
  314. failed_specs map[int]string
  315. spec_counts map[int]int
  316. remote_home string
  317. prefix, suffix string
  318. transfer_done bool
  319. files []*remote_file
  320. files_to_be_transferred map[string]*remote_file
  321. state state
  322. progress_tracker receive_progress_tracker
  323. }
  324. type transmit_iterator = func(queue_write func(string) loop.IdType) (loop.IdType, error)
  325. type sigwriter struct {
  326. wid loop.IdType
  327. file_id, prefix, suffix string
  328. q func(string) loop.IdType
  329. amt int64
  330. b bytes.Buffer
  331. }
  332. func (self *sigwriter) Write(b []byte) (int, error) {
  333. self.b.Write(b)
  334. if self.b.Len() > 4000 {
  335. self.flush()
  336. }
  337. return len(b), nil
  338. }
  339. func (self *sigwriter) flush() {
  340. frame := len(self.prefix) + len(self.suffix)
  341. split_for_transfer(self.b.Bytes(), self.file_id, false, func(ftc *FileTransmissionCommand) {
  342. self.q(self.prefix)
  343. data := ftc.Serialize(false)
  344. self.q(data)
  345. self.wid = self.q(self.suffix)
  346. self.amt += int64(frame + len(data))
  347. })
  348. self.b.Reset()
  349. }
  350. var files_done error = errors.New("files done")
  351. func (self *manager) request_files() transmit_iterator {
  352. pos := 0
  353. return func(queue_write func(string) loop.IdType) (last_write_id loop.IdType, err error) {
  354. var f *remote_file
  355. for pos < len(self.files) {
  356. f = self.files[pos]
  357. pos++
  358. if f.ftype == FileType_directory || (f.ftype == FileType_link && f.remote_target != "") {
  359. f = nil
  360. } else {
  361. break
  362. }
  363. }
  364. if f == nil {
  365. return 0, files_done
  366. }
  367. read_signature := self.use_rsync && f.ftype == FileType_regular
  368. if read_signature {
  369. if s, err := os.Lstat(f.expanded_local_path); err == nil {
  370. read_signature = s.Size() > 4096
  371. } else {
  372. read_signature = false
  373. }
  374. }
  375. last_write_id = self.send(FileTransmissionCommand{
  376. Action: Action_file, Name: f.remote_path, File_id: f.file_id, Ttype: utils.IfElse(
  377. read_signature, TransmissionType_rsync, TransmissionType_simple), Compression: f.compression_type,
  378. }, queue_write)
  379. if read_signature {
  380. fsf, err := os.Open(f.expanded_local_path)
  381. if err != nil {
  382. return 0, err
  383. }
  384. defer fsf.Close()
  385. f.expect_diff = true
  386. f.patcher = rsync.NewPatcher(f.expected_size)
  387. output := sigwriter{q: queue_write, file_id: f.file_id, prefix: self.prefix, suffix: self.suffix}
  388. s_it := f.patcher.CreateSignatureIterator(fsf, &output)
  389. for {
  390. err = s_it()
  391. if err == io.EOF {
  392. break
  393. } else if err != nil {
  394. return 0, err
  395. }
  396. }
  397. output.flush()
  398. f.sent_bytes += output.amt
  399. last_write_id = self.send(FileTransmissionCommand{Action: Action_end_data, File_id: f.file_id}, queue_write)
  400. }
  401. return
  402. }
  403. }
  404. type handler struct {
  405. lp *loop.Loop
  406. progress_update_timer loop.IdType
  407. spinner *tui.Spinner
  408. cli_opts *Options
  409. ctx *markup.Context
  410. manager manager
  411. quit_after_write_code int
  412. check_paths_printed bool
  413. transmit_started bool
  414. progress_drawn bool
  415. max_name_length int
  416. transmit_iterator transmit_iterator
  417. last_data_write_id loop.IdType
  418. }
  419. func (self *manager) send(c FileTransmissionCommand, send func(string) loop.IdType) loop.IdType {
  420. send(self.prefix)
  421. send(c.Serialize(false))
  422. return send(self.suffix)
  423. }
  424. func (self *manager) start_transfer(send func(string) loop.IdType) {
  425. self.send(FileTransmissionCommand{Action: Action_receive, Bypass: self.bypass, Size: int64(len(self.spec))}, send)
  426. for i, x := range self.spec {
  427. self.send(FileTransmissionCommand{Action: Action_file, File_id: strconv.Itoa(i), Name: x}, send)
  428. }
  429. self.progress_tracker.start_transfer()
  430. }
  431. func (self *handler) print_err(err error) {
  432. self.lp.Println(self.ctx.BrightRed(err.Error()))
  433. }
  434. func (self *handler) abort_with_error(err error, delay ...time.Duration) {
  435. if err != nil {
  436. self.print_err(err)
  437. }
  438. var d time.Duration = 5 * time.Second
  439. if len(delay) > 0 {
  440. d = delay[0]
  441. }
  442. self.lp.Println(`Waiting to ensure terminal cancels transfer, will quit in no more than`, d)
  443. self.manager.send(FileTransmissionCommand{Action: Action_cancel}, self.lp.QueueWriteString)
  444. self.manager.state = state_canceled
  445. _, _ = self.lp.AddTimer(d, false, self.do_error_quit)
  446. }
  447. func (self *handler) do_error_quit(loop.IdType) error {
  448. self.lp.Quit(1)
  449. return nil
  450. }
  451. func (self *manager) finalize_transfer() (err error) {
  452. self.transfer_done = true
  453. rid_map := make(map[string]*remote_file)
  454. for _, f := range self.files {
  455. rid_map[f.remote_id] = f
  456. }
  457. for _, f := range self.files {
  458. switch f.ftype {
  459. case FileType_directory:
  460. if err = os.MkdirAll(f.expanded_local_path, 0o755); err != nil {
  461. return fmt.Errorf("Failed to create directory with error: %w", err)
  462. }
  463. case FileType_link:
  464. tgt, found := rid_map[f.remote_target]
  465. if !found {
  466. return fmt.Errorf(`Hard link with remote id: {%s} not found`, f.remote_target)
  467. }
  468. if err = os.MkdirAll(filepath.Dir(f.expanded_local_path), 0o755); err == nil {
  469. os.Remove(f.expanded_local_path)
  470. err = os.Link(tgt.expanded_local_path, f.expanded_local_path)
  471. }
  472. if err != nil {
  473. return fmt.Errorf(`Failed to create link with error: %w`, err)
  474. }
  475. case FileType_symlink:
  476. lt := f.remote_symlink_value
  477. if f.remote_target != "" {
  478. tgt, found := rid_map[f.remote_target]
  479. if !found {
  480. return fmt.Errorf(`Symbolic link with remote id: {%s} not found`, f.remote_target)
  481. }
  482. lt = tgt.expanded_local_path
  483. if !strings.HasPrefix(f.remote_symlink_value, "/") {
  484. if lt, err = filepath.Rel(filepath.Dir(f.expanded_local_path), lt); err != nil {
  485. return fmt.Errorf(`Could not make symlink relative with error: %w`, err)
  486. }
  487. }
  488. }
  489. if lt == "" {
  490. return fmt.Errorf("Symlink %s sent without target", f.expanded_local_path)
  491. }
  492. os.Remove(f.expanded_local_path)
  493. if err = os.MkdirAll(filepath.Dir(f.expanded_local_path), 0o755); err != nil {
  494. return fmt.Errorf("Failed to create directory with error: %w", err)
  495. }
  496. if err = os.Symlink(lt, f.expanded_local_path); err != nil {
  497. return fmt.Errorf(`Failed to create symlink with error: %w`, err)
  498. }
  499. }
  500. f.apply_metadata()
  501. }
  502. return
  503. }
  504. func (self *manager) on_file_transfer_response(ftc *FileTransmissionCommand) (err error) {
  505. switch self.state {
  506. case state_waiting_for_permission:
  507. if ftc.Action == Action_status {
  508. if ftc.Status == `OK` {
  509. self.state = state_waiting_for_file_metadata
  510. } else {
  511. return unicode_input.ErrCanceledByUser
  512. }
  513. } else {
  514. return fmt.Errorf(`Unexpected response from terminal: %s`, ftc.String())
  515. }
  516. case state_waiting_for_file_metadata:
  517. switch ftc.Action {
  518. case Action_status:
  519. if ftc.File_id != "" {
  520. fid, err := strconv.Atoi(ftc.File_id)
  521. if err != nil {
  522. return fmt.Errorf(`Unexpected response from terminal (non-integer file_id): %s`, ftc.String())
  523. }
  524. if fid < 0 || fid >= len(self.spec) {
  525. return fmt.Errorf(`Unexpected response from terminal (out-of-range file_id): %s`, ftc.String())
  526. }
  527. self.failed_specs[fid] = ftc.Status
  528. } else {
  529. if ftc.Status == `OK` {
  530. self.state = state_transferring
  531. self.remote_home = ftc.Name
  532. return
  533. }
  534. return fmt.Errorf("%s", ftc.Status)
  535. }
  536. case Action_file:
  537. fid, err := strconv.Atoi(ftc.File_id)
  538. if err != nil {
  539. return fmt.Errorf(`Unexpected response from terminal (non-integer file_id): %s`, ftc.String())
  540. }
  541. if fid < 0 || fid >= len(self.spec) {
  542. return fmt.Errorf(`Unexpected response from terminal (out-of-range file_id): %s`, ftc.String())
  543. }
  544. self.spec_counts[fid] += 1
  545. self.file_id_counter++
  546. if rf, err := new_remote_file(self.cli_opts, ftc, self.file_id_counter); err == nil {
  547. self.files = append(self.files, rf)
  548. } else {
  549. return err
  550. }
  551. default:
  552. return fmt.Errorf(`Unexpected response from terminal (invalid action): %s`, ftc.String())
  553. }
  554. case state_transferring:
  555. if ftc.Action == Action_data || ftc.Action == Action_end_data {
  556. f, found := self.files_to_be_transferred[ftc.File_id]
  557. if !found {
  558. return fmt.Errorf(`Got data for unknown file id: %s`, ftc.File_id)
  559. }
  560. is_last := ftc.Action == Action_end_data
  561. if amt_written, err := f.write_data(ftc.Data, is_last); err != nil {
  562. return err
  563. } else {
  564. self.progress_tracker.file_written(f, amt_written, is_last)
  565. }
  566. if is_last {
  567. delete(self.files_to_be_transferred, ftc.File_id)
  568. if len(self.files_to_be_transferred) == 0 {
  569. return self.finalize_transfer()
  570. }
  571. }
  572. }
  573. }
  574. return
  575. }
  576. type tree_node struct {
  577. entry *remote_file
  578. added_files map[string]*tree_node
  579. }
  580. func (self *tree_node) add_child(f *remote_file) *tree_node {
  581. if x, found := self.added_files[f.remote_id]; found {
  582. return x
  583. }
  584. c := tree_node{entry: f, added_files: make(map[string]*tree_node)}
  585. f.expanded_local_path = filepath.Join(self.entry.expanded_local_path, filepath.Base(f.remote_path))
  586. self.added_files[f.remote_id] = &c
  587. return &c
  588. }
  589. func walk_tree(root *tree_node, cb func(*tree_node) error) error {
  590. for _, c := range root.added_files {
  591. if err := cb(c); err != nil {
  592. return err
  593. }
  594. if err := walk_tree(c, cb); err != nil {
  595. return err
  596. }
  597. }
  598. return nil
  599. }
  600. func ensure_parent(f *remote_file, node_map map[string]*tree_node, fid_map map[string]*remote_file) *tree_node {
  601. if ans := node_map[f.parent]; ans != nil {
  602. return ans
  603. }
  604. parent := fid_map[f.parent]
  605. gp := ensure_parent(parent, node_map, fid_map)
  606. node := gp.add_child(parent)
  607. node_map[parent.remote_id] = node
  608. return node
  609. }
  610. func make_tree(all_files []*remote_file, local_base string) (root_node *tree_node) {
  611. fid_map := make(map[string]*remote_file, len(all_files))
  612. node_map := make(map[string]*tree_node, len(all_files))
  613. for _, f := range all_files {
  614. if f.remote_id != "" {
  615. fid_map[f.remote_id] = f
  616. }
  617. }
  618. root_node = &tree_node{entry: &remote_file{expanded_local_path: local_base}, added_files: make(map[string]*tree_node)}
  619. node_map[""] = root_node
  620. for _, f := range all_files {
  621. if f.remote_id != "" {
  622. p := ensure_parent(f, node_map, fid_map)
  623. p.add_child(f)
  624. }
  625. }
  626. return
  627. }
  628. func isdir(path string) bool {
  629. if s, err := os.Stat(path); err == nil {
  630. return s.IsDir()
  631. }
  632. return false
  633. }
  634. func files_for_receive(opts *Options, dest string, files []*remote_file, remote_home string, specs []string) (ans []*remote_file, err error) {
  635. spec_map := make(map[int][]*remote_file)
  636. for _, f := range files {
  637. spec_map[f.spec_id] = append(spec_map[f.spec_id], f)
  638. }
  639. spec_paths := make([]string, len(specs))
  640. for i := range specs {
  641. // use the shortest path as the path for the spec
  642. slices.SortStableFunc(spec_map[i], func(a, b *remote_file) int { return len(a.remote_path) - len(b.remote_path) })
  643. spec_paths[i] = spec_map[i][0].remote_path
  644. }
  645. if opts.Mode == "mirror" {
  646. common_path := utils.Commonpath(spec_paths...)
  647. home := strings.TrimRight(remote_home, "/")
  648. if strings.HasPrefix(common_path, home+"/") {
  649. for i, x := range spec_paths {
  650. b, err := filepath.Rel(home, x)
  651. if err != nil {
  652. return nil, err
  653. }
  654. spec_paths[i] = filepath.Join("~", b)
  655. }
  656. }
  657. for spec_id, files_for_spec := range spec_map {
  658. spec := spec_paths[spec_id]
  659. tree := make_tree(files_for_spec, filepath.Dir(expand_home(spec)))
  660. if err = walk_tree(tree, func(x *tree_node) error {
  661. ans = append(ans, x.entry)
  662. return nil
  663. }); err != nil {
  664. return nil, err
  665. }
  666. }
  667. } else {
  668. number_of_source_files := 0
  669. for _, x := range spec_map {
  670. number_of_source_files += len(x)
  671. }
  672. dest_is_dir := strings.HasSuffix(dest, "/") || number_of_source_files > 1 || isdir(dest)
  673. for _, files_for_spec := range spec_map {
  674. if dest_is_dir {
  675. dest_path := filepath.Join(dest, filepath.Base(files_for_spec[0].remote_path))
  676. tree := make_tree(files_for_spec, filepath.Dir(expand_home(dest_path)))
  677. if err = walk_tree(tree, func(x *tree_node) error {
  678. ans = append(ans, x.entry)
  679. return nil
  680. }); err != nil {
  681. return nil, err
  682. }
  683. } else {
  684. f := files_for_spec[0]
  685. f.expanded_local_path = expand_home(dest)
  686. ans = append(ans, f)
  687. }
  688. }
  689. }
  690. return
  691. }
  692. func (self *manager) collect_files() (err error) {
  693. if self.files, err = files_for_receive(self.cli_opts, self.dest, self.files, self.remote_home, self.spec); err != nil {
  694. return err
  695. }
  696. self.progress_tracker.total_size_of_all_files = 0
  697. for _, f := range self.files {
  698. if f.ftype != FileType_directory && f.ftype != FileType_link {
  699. self.files_to_be_transferred[f.file_id] = f
  700. self.progress_tracker.total_size_of_all_files += utils.Max(0, f.expected_size)
  701. }
  702. }
  703. self.progress_tracker.total_bytes_to_transfer = self.progress_tracker.total_size_of_all_files
  704. return nil
  705. }
  706. func (self *handler) print_continue_msg() {
  707. self.lp.Println(`Press`, self.ctx.Green(`y`), `to continue or`, self.ctx.BrightRed(`n`), `to abort`)
  708. }
  709. func lexists(path string) bool {
  710. _, err := os.Lstat(path)
  711. return err == nil
  712. }
  713. func (self *handler) print_check_paths() {
  714. if self.check_paths_printed {
  715. return
  716. }
  717. self.check_paths_printed = true
  718. self.lp.Println(`The following file transfers will be performed. A red destination means an existing file will be overwritten.`)
  719. for _, df := range self.manager.files {
  720. self.lp.QueueWriteString(self.ctx.Prettify(fmt.Sprintf(":%s:`%s` ", df.ftype.Color(), df.ftype.ShortText())))
  721. self.lp.QueueWriteString(" ")
  722. lpath := df.expanded_local_path
  723. if lexists(lpath) {
  724. lpath = self.ctx.Prettify(self.ctx.BrightRed(lpath) + " ")
  725. }
  726. self.lp.Println(df.display_name, "→", lpath)
  727. }
  728. self.lp.Println(fmt.Sprintf(`Transferring %d file(s) of total size: %s`, len(self.manager.files), humanize.Size(self.manager.progress_tracker.total_size_of_all_files)))
  729. self.print_continue_msg()
  730. }
  731. func (self *handler) confirm_paths() {
  732. self.print_check_paths()
  733. }
  734. func (self *handler) transmit_one() {
  735. if self.transmit_iterator == nil {
  736. return
  737. }
  738. wid, err := self.transmit_iterator(self.lp.QueueWriteString)
  739. if err != nil {
  740. if err == files_done {
  741. self.transmit_iterator = nil
  742. } else {
  743. self.abort_with_error(err)
  744. return
  745. }
  746. } else {
  747. self.last_data_write_id = wid
  748. }
  749. }
  750. func (self *handler) start_transfer() {
  751. self.transmit_started = true
  752. n := len(self.manager.files)
  753. msg := `Transmitting signature of`
  754. if self.manager.use_rsync {
  755. msg = `Queueing transfer of`
  756. }
  757. msg += ` `
  758. if n == 1 {
  759. msg += `one file`
  760. } else {
  761. msg += fmt.Sprintf(`%d files`, n)
  762. }
  763. self.lp.Println(msg)
  764. self.max_name_length = 0
  765. for _, f := range self.manager.files {
  766. self.max_name_length = utils.Max(6, self.max_name_length, wcswidth.Stringwidth(f.display_name))
  767. }
  768. self.transmit_iterator = self.manager.request_files()
  769. self.transmit_one()
  770. }
  771. func (self *handler) on_file_transfer_response(ftc *FileTransmissionCommand) (err error) {
  772. if ftc.Id != self.manager.request_id {
  773. return
  774. }
  775. if ftc.Action == Action_status && ftc.Status == "CANCELED" {
  776. self.lp.Quit(1)
  777. return
  778. }
  779. if self.quit_after_write_code > -1 || self.manager.state == state_canceled {
  780. return
  781. }
  782. transfer_started := self.manager.state == state_transferring
  783. if merr := self.manager.on_file_transfer_response(ftc); merr != nil {
  784. if merr == unicode_input.ErrCanceledByUser {
  785. // terminal will not respond to cancel request
  786. return fmt.Errorf("Permission denied by user")
  787. }
  788. self.abort_with_error(merr)
  789. return
  790. }
  791. if !transfer_started && self.manager.state == state_transferring {
  792. if len(self.manager.failed_specs) > 0 {
  793. self.print_err(fmt.Errorf(`Failed to process some sources:`))
  794. for spec_id, msg := range self.manager.failed_specs {
  795. spec := self.manager.spec[spec_id]
  796. if strings.HasPrefix(msg, `ENOENT:`) {
  797. msg = `File not found`
  798. }
  799. self.lp.Println(fmt.Sprintf(` %s: %s`, spec, msg))
  800. }
  801. self.abort_with_error(nil)
  802. return
  803. }
  804. zero_specs := make([]string, 0, len(self.manager.spec_counts))
  805. for k, v := range self.manager.spec_counts {
  806. if v == 0 {
  807. zero_specs = append(zero_specs, self.manager.spec[k])
  808. }
  809. }
  810. if len(zero_specs) > 0 {
  811. self.abort_with_error(fmt.Errorf(`No matches found for: %s`, strings.Join(zero_specs, ", ")))
  812. return
  813. }
  814. if merr := self.manager.collect_files(); merr != nil {
  815. self.abort_with_error(merr)
  816. return
  817. }
  818. if self.cli_opts.ConfirmPaths {
  819. self.confirm_paths()
  820. } else {
  821. self.start_transfer()
  822. }
  823. }
  824. if self.manager.transfer_done {
  825. self.manager.send(FileTransmissionCommand{Action: Action_finish}, self.lp.QueueWriteString)
  826. self.quit_after_write_code = 0
  827. if err = self.refresh_progress(0); err != nil {
  828. return err
  829. }
  830. } else if self.transmit_started {
  831. if err = self.refresh_progress(0); err != nil {
  832. return err
  833. }
  834. }
  835. return
  836. }
  837. func (self *handler) on_writing_finished(msg_id loop.IdType, has_pending_writes bool) (err error) {
  838. if self.quit_after_write_code > -1 {
  839. self.lp.Quit(self.quit_after_write_code)
  840. } else if msg_id == self.last_data_write_id {
  841. self.transmit_one()
  842. }
  843. return nil
  844. }
  845. func (self *handler) on_interrupt() (handled bool, err error) {
  846. handled = true
  847. if self.quit_after_write_code > -1 {
  848. return
  849. }
  850. if self.manager.state == state_canceled {
  851. self.lp.Println(`Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received`)
  852. return
  853. }
  854. self.abort_with_error(fmt.Errorf(`Interrupt requested, cancelling transfer, transferred files are in undefined state.`))
  855. return
  856. }
  857. func (self *handler) on_sigterm() (handled bool, err error) {
  858. handled = true
  859. if self.quit_after_write_code > -1 {
  860. return
  861. }
  862. self.abort_with_error(fmt.Errorf(`Terminate requested, cancelling transfer, transferred files are in undefined state.`), 2*time.Second)
  863. return
  864. }
  865. func (self *handler) erase_progress() {
  866. if self.progress_drawn {
  867. self.lp.MoveCursorVertically(-2)
  868. self.lp.QueueWriteString("\r")
  869. self.lp.ClearToEndOfScreen()
  870. self.progress_drawn = false
  871. }
  872. }
  873. func (self *handler) render_progress(name string, p Progress) {
  874. if p.is_complete {
  875. p.bytes_so_far = p.total_bytes
  876. }
  877. ss, _ := self.lp.ScreenSize()
  878. self.lp.QueueWriteString(render_progress_in_width(name, p, int(ss.WidthCells), self.ctx))
  879. }
  880. func (self *handler) draw_progress_for_current_file(af *remote_file, spinner_char string, is_complete bool) {
  881. p := &self.manager.progress_tracker
  882. now := time.Now()
  883. secs := utils.IfElse(af.done_at.IsZero(), now, af.done_at)
  884. self.render_progress(af.display_name, Progress{
  885. spinner_char: spinner_char, is_complete: is_complete,
  886. bytes_so_far: af.written_bytes, total_bytes: af.expected_size,
  887. secs_so_far: secs.Sub(af.transmit_started_at).Seconds(),
  888. bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval),
  889. })
  890. }
  891. func (self *handler) draw_files() {
  892. tick := self.ctx.Green(`✔`)
  893. var sc string
  894. for _, df := range self.manager.progress_tracker.done_files {
  895. sc = tick
  896. if df.ftype == FileType_regular {
  897. self.draw_progress_for_current_file(df, sc, true)
  898. } else {
  899. self.lp.QueueWriteString(fmt.Sprintf("%s %s %s", sc, df.display_name, self.ctx.Italic(self.ctx.Dim(df.ftype.String()))))
  900. }
  901. self.lp.Println()
  902. self.manager.progress_tracker.done_files = nil
  903. }
  904. is_complete := self.quit_after_write_code > -1
  905. if is_complete {
  906. sc = utils.IfElse(self.quit_after_write_code == 0, tick, self.ctx.Red(`✘`))
  907. } else {
  908. sc = self.spinner.Tick()
  909. }
  910. p := &self.manager.progress_tracker
  911. ss, _ := self.lp.ScreenSize()
  912. if is_complete {
  913. tui.RepeatChar(`─`, int(ss.WidthCells))
  914. } else {
  915. af := p.active_file
  916. if af != nil {
  917. self.draw_progress_for_current_file(af, sc, false)
  918. }
  919. }
  920. self.lp.Println()
  921. if p.total_transferred > 0 {
  922. self.render_progress(`Total`, Progress{
  923. spinner_char: sc, bytes_so_far: p.total_transferred, total_bytes: p.total_bytes_to_transfer,
  924. secs_so_far: time.Since(p.started_at).Seconds(), is_complete: is_complete,
  925. bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval.Abs().Seconds()),
  926. })
  927. self.lp.Println()
  928. } else {
  929. self.lp.Println(`File data transfer has not yet started`)
  930. }
  931. }
  932. func (self *handler) schedule_progress_update(delay time.Duration) {
  933. if self.progress_update_timer != 0 {
  934. self.lp.RemoveTimer(self.progress_update_timer)
  935. self.progress_update_timer = 0
  936. }
  937. timer_id, err := self.lp.AddTimer(delay, false, self.refresh_progress)
  938. if err == nil {
  939. self.progress_update_timer = timer_id
  940. }
  941. }
  942. func (self *handler) draw_progress() {
  943. if self.manager.state == state_canceled {
  944. return
  945. }
  946. self.lp.AllowLineWrapping(false)
  947. defer self.lp.AllowLineWrapping(true)
  948. self.draw_files()
  949. self.schedule_progress_update(self.spinner.Interval())
  950. self.progress_drawn = true
  951. }
  952. func (self *handler) refresh_progress(loop.IdType) error {
  953. self.lp.StartAtomicUpdate()
  954. defer self.lp.EndAtomicUpdate()
  955. self.erase_progress()
  956. self.draw_progress()
  957. return nil
  958. }
  959. func (self *handler) on_text(text string, from_key_event, in_bracketed_paste bool) error {
  960. if self.quit_after_write_code > -1 {
  961. return nil
  962. }
  963. if self.check_paths_printed && !self.transmit_started {
  964. switch strings.ToLower(text) {
  965. case "y":
  966. self.start_transfer()
  967. return nil
  968. case "n":
  969. self.abort_with_error(fmt.Errorf(`Canceled by user`))
  970. return nil
  971. }
  972. self.print_continue_msg()
  973. }
  974. return nil
  975. }
  976. func (self *handler) on_key_event(ev *loop.KeyEvent) error {
  977. if self.quit_after_write_code > -1 {
  978. return nil
  979. }
  980. if ev.MatchesPressOrRepeat("esc") {
  981. ev.Handled = true
  982. if self.check_paths_printed && !self.transmit_started {
  983. self.abort_with_error(fmt.Errorf(`Canceled by user`))
  984. } else {
  985. if _, err := self.on_interrupt(); err != nil {
  986. return err
  987. }
  988. }
  989. } else if ev.MatchesPressOrRepeat("ctrl+c") {
  990. ev.Handled = true
  991. if _, err := self.on_interrupt(); err != nil {
  992. return err
  993. }
  994. }
  995. return nil
  996. }
  997. func receive_loop(opts *Options, spec []string, dest string) (err error, rc int) {
  998. lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors)
  999. if err != nil {
  1000. return err, 1
  1001. }
  1002. handler := handler{
  1003. lp: lp, quit_after_write_code: -1, cli_opts: opts, spinner: tui.NewSpinner("dots"),
  1004. ctx: markup.New(true),
  1005. manager: manager{
  1006. request_id: random_id(), spec: spec, dest: dest, bypass: opts.PermissionsBypass, use_rsync: opts.TransmitDeltas,
  1007. failed_specs: make(map[int]string, len(spec)), spec_counts: make(map[int]int, len(spec)),
  1008. suffix: "\x1b\\", cli_opts: opts, files_to_be_transferred: make(map[string]*remote_file),
  1009. },
  1010. }
  1011. for i := range spec {
  1012. handler.manager.spec_counts[i] = 0
  1013. }
  1014. handler.manager.prefix = fmt.Sprintf("\x1b]%d;id=%s;", kitty.FileTransferCode, handler.manager.request_id)
  1015. if handler.manager.bypass != `` {
  1016. if handler.manager.bypass, err = encode_bypass(handler.manager.request_id, handler.manager.bypass); err != nil {
  1017. return err, 1
  1018. }
  1019. }
  1020. lp.OnInitialize = func() (string, error) {
  1021. lp.SetCursorVisible(false)
  1022. lp.Println("Scanning files…")
  1023. handler.manager.start_transfer(lp.QueueWriteString)
  1024. return "", nil
  1025. }
  1026. lp.OnFinalize = func() string {
  1027. lp.SetCursorVisible(true)
  1028. return ""
  1029. }
  1030. lp.OnSIGINT = handler.on_interrupt
  1031. lp.OnSIGTERM = handler.on_sigterm
  1032. lp.OnWriteComplete = handler.on_writing_finished
  1033. lp.OnText = handler.on_text
  1034. lp.OnKeyEvent = handler.on_key_event
  1035. lp.OnResize = func(old_sz, new_sz loop.ScreenSize) error {
  1036. if handler.progress_drawn {
  1037. return handler.refresh_progress(0)
  1038. }
  1039. return nil
  1040. }
  1041. ftc_code := strconv.Itoa(kitty.FileTransferCode)
  1042. lp.OnEscapeCode = func(et loop.EscapeCodeType, payload []byte) error {
  1043. if et == loop.OSC {
  1044. if idx := bytes.IndexByte(payload, ';'); idx > 0 {
  1045. if utils.UnsafeBytesToString(payload[:idx]) == ftc_code {
  1046. ftc, err := NewFileTransmissionCommand(utils.UnsafeBytesToString(payload[idx+1:]))
  1047. if err != nil {
  1048. return fmt.Errorf("Received invalid FileTransmissionCommand from terminal with error: %w", err)
  1049. }
  1050. return handler.on_file_transfer_response(ftc)
  1051. }
  1052. }
  1053. }
  1054. return nil
  1055. }
  1056. err = lp.Run()
  1057. defer func() {
  1058. for _, f := range handler.manager.files {
  1059. f.close()
  1060. }
  1061. }()
  1062. if err != nil {
  1063. return err, 1
  1064. }
  1065. if lp.DeathSignalName() != "" {
  1066. lp.KillIfSignalled()
  1067. return
  1068. }
  1069. if lp.ExitCode() != 0 {
  1070. rc = lp.ExitCode()
  1071. }
  1072. var tsf, dsz, ssz int64
  1073. for _, f := range handler.manager.files {
  1074. if rc == 0 { // no error has yet occurred report errors closing files
  1075. if cerr := f.close(); cerr != nil {
  1076. return cerr, 1
  1077. }
  1078. }
  1079. if f.expect_diff {
  1080. tsf += f.expected_size
  1081. dsz += f.received_bytes
  1082. ssz += f.sent_bytes
  1083. }
  1084. }
  1085. if tsf > 0 && dsz+ssz > 0 && rc == 0 {
  1086. print_rsync_stats(tsf, dsz, ssz)
  1087. }
  1088. return
  1089. }
  1090. func receive_main(opts *Options, args []string) (err error, rc int) {
  1091. spec := args
  1092. var dest string
  1093. switch opts.Mode {
  1094. case "mirror":
  1095. if len(args) < 1 {
  1096. return fmt.Errorf("Must specify at least one file to transfer"), 1
  1097. }
  1098. case "normal":
  1099. if len(args) < 2 {
  1100. return fmt.Errorf("Must specify at least one source and a destination file to transfer"), 1
  1101. }
  1102. dest = args[len(args)-1]
  1103. spec = args[:len(args)-1]
  1104. }
  1105. return receive_loop(opts, spec, dest)
  1106. }