send.go 38 KB


  1. // License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>
  2. package transfer
  3. import (
  4. "bytes"
  5. "compress/zlib"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/fs"
  10. "os"
  11. "path/filepath"
  12. "slices"
  13. "strconv"
  14. "strings"
  15. "syscall"
  16. "time"
  17. "unicode/utf8"
  18. "golang.org/x/exp/constraints"
  19. "kitty"
  20. "kitty/tools/cli/markup"
  21. "kitty/tools/rsync"
  22. "kitty/tools/tui"
  23. "kitty/tools/tui/loop"
  24. "kitty/tools/utils"
  25. "kitty/tools/utils/humanize"
  26. "kitty/tools/wcswidth"
  27. )
  28. var _ = fmt.Print
  29. type FileState int
  30. const (
  31. WAITING_FOR_START FileState = iota
  32. WAITING_FOR_DATA
  33. TRANSMITTING
  34. FINISHED
  35. ACKNOWLEDGED
  36. )
  37. type FileHash struct{ dev, inode uint64 }
  38. type Compressor interface {
  39. Compress(data []byte) []byte
  40. Flush() []byte
  41. }
  42. type IdentityCompressor struct{}
  43. func (self *IdentityCompressor) Compress(data []byte) []byte { return data }
  44. func (self *IdentityCompressor) Flush() []byte { return nil }
  45. type ZlibCompressor struct {
  46. b bytes.Buffer
  47. w zlib.Writer
  48. }
  49. func NewZlibCompressor() *ZlibCompressor {
  50. ans := ZlibCompressor{}
  51. ans.b.Grow(4096)
  52. ans.w = *zlib.NewWriter(&ans.b)
  53. return &ans
  54. }
  55. func (self *ZlibCompressor) Compress(data []byte) []byte {
  56. _, err := self.w.Write(data)
  57. if err != nil {
  58. panic(err)
  59. }
  60. defer self.b.Reset()
  61. return utils.UnsafeStringToBytes(self.b.String())
  62. }
  63. func (self *ZlibCompressor) Flush() []byte {
  64. self.w.Close()
  65. return self.b.Bytes()
  66. }
  67. type File struct {
  68. file_hash FileHash
  69. ttype TransmissionType
  70. compression Compression
  71. compressor Compressor
  72. file_type FileType
  73. file_id, hard_link_target string
  74. local_path, symbolic_link_target, expanded_local_path string
  75. stat_result fs.FileInfo
  76. state FileState
  77. display_name string
  78. mtime time.Time
  79. file_size, bytes_to_transmit int64
  80. permissions fs.FileMode
  81. remote_path string
  82. rsync_capable, compression_capable bool
  83. remote_final_path string
  84. remote_initial_size int64
  85. err_msg string
  86. actual_file *os.File
  87. transmitted_bytes, reported_progress int64
  88. transmit_started_at, transmit_ended_at, done_at time.Time
  89. differ *rsync.Differ
  90. delta_loader func() error
  91. deltabuf *bytes.Buffer
  92. }
  93. func get_remote_path(local_path string, remote_base string) string {
  94. if remote_base == "" {
  95. return filepath.ToSlash(local_path)
  96. }
  97. if strings.HasSuffix(remote_base, "/") {
  98. return filepath.Join(remote_base, filepath.Base(local_path))
  99. }
  100. return remote_base
  101. }
  102. func NewFile(opts *Options, local_path, expanded_local_path string, file_id int, stat_result fs.FileInfo, remote_base string, file_type FileType) *File {
  103. stat, ok := stat_result.Sys().(*syscall.Stat_t)
  104. if !ok {
  105. panic("This platform does not support getting file identities from stat results")
  106. }
  107. ans := File{
  108. local_path: local_path, expanded_local_path: expanded_local_path, file_id: fmt.Sprintf("%x", file_id),
  109. stat_result: stat_result, file_type: file_type, display_name: wcswidth.StripEscapeCodes(local_path),
  110. file_hash: FileHash{uint64(stat.Dev), stat.Ino}, mtime: stat_result.ModTime(),
  111. file_size: stat_result.Size(), bytes_to_transmit: stat_result.Size(),
  112. permissions: stat_result.Mode().Perm(), remote_path: filepath.ToSlash(get_remote_path(local_path, remote_base)),
  113. rsync_capable: file_type == FileType_regular && stat_result.Size() > 4096,
  114. compression_capable: file_type == FileType_regular && stat_result.Size() > 4096 && should_be_compressed(expanded_local_path, opts.Compress),
  115. remote_initial_size: -1,
  116. }
  117. return &ans
  118. }
  119. func process(opts *Options, paths []string, remote_base string, counter *int) (ans []*File, err error) {
  120. for _, x := range paths {
  121. expanded := expand_home(x)
  122. s, err := os.Lstat(expanded)
  123. if err != nil {
  124. return ans, fmt.Errorf("Failed to stat %s with error: %w", x, err)
  125. }
  126. if s.IsDir() {
  127. *counter += 1
  128. ans = append(ans, NewFile(opts, x, expanded, *counter, s, remote_base, FileType_directory))
  129. new_remote_base := remote_base
  130. if new_remote_base != "" {
  131. new_remote_base = strings.TrimRight(new_remote_base, "/") + "/" + filepath.Base(x) + "/"
  132. } else {
  133. new_remote_base = strings.TrimRight(filepath.ToSlash(x), "/") + "/"
  134. }
  135. contents, err := os.ReadDir(expanded)
  136. if err != nil {
  137. return ans, fmt.Errorf("Failed to read the directory %s with error: %w", x, err)
  138. }
  139. new_paths := make([]string, len(contents))
  140. for i, y := range contents {
  141. new_paths[i] = filepath.Join(x, y.Name())
  142. }
  143. new_ans, err := process(opts, new_paths, new_remote_base, counter)
  144. if err != nil {
  145. return ans, err
  146. }
  147. ans = append(ans, new_ans...)
  148. } else if s.Mode()&fs.ModeSymlink == fs.ModeSymlink {
  149. *counter += 1
  150. ans = append(ans, NewFile(opts, x, expanded, *counter, s, remote_base, FileType_symlink))
  151. } else if s.Mode().IsRegular() {
  152. *counter += 1
  153. ans = append(ans, NewFile(opts, x, expanded, *counter, s, remote_base, FileType_regular))
  154. }
  155. }
  156. return
  157. }
  158. func process_mirrored_files(opts *Options, args []string) (ans []*File, err error) {
  159. paths := utils.Map(func(x string) string { return abspath(expand_home(x)) }, args)
  160. home := strings.TrimRight(home_path(), string(filepath.Separator)) + string(filepath.Separator)
  161. paths = utils.Map(func(path string) string {
  162. if strings.HasPrefix(path, home) {
  163. r, _ := filepath.Rel(home, path)
  164. return filepath.Join("~", r)
  165. }
  166. return path
  167. }, paths)
  168. counter := 0
  169. return process(opts, paths, "", &counter)
  170. }
  171. func process_normal_files(opts *Options, args []string) (ans []*File, err error) {
  172. if len(args) < 2 {
  173. return ans, fmt.Errorf("Must specify at least one local path and one remote path")
  174. }
  175. args = slices.Clone(args)
  176. remote_base := filepath.ToSlash(args[len(args)-1])
  177. args = args[:len(args)-1]
  178. if len(args) > 1 && !strings.HasSuffix(remote_base, "/") {
  179. remote_base += "/"
  180. }
  181. paths := utils.Map(func(x string) string { return abspath(expand_home(x)) }, args)
  182. counter := 0
  183. return process(opts, paths, remote_base, &counter)
  184. }
  185. func files_for_send(opts *Options, args []string) (files []*File, err error) {
  186. if opts.Mode == "mirror" {
  187. files, err = process_mirrored_files(opts, args)
  188. } else {
  189. files, err = process_normal_files(opts, args)
  190. }
  191. if err != nil {
  192. return files, err
  193. }
  194. groups := make(map[FileHash][]*File, len(files))
  195. // detect hard links
  196. for _, f := range files {
  197. groups[f.file_hash] = append(groups[f.file_hash], f)
  198. }
  199. for _, group := range groups {
  200. if len(group) > 1 {
  201. for _, lf := range group[1:] {
  202. lf.file_type = FileType_link
  203. lf.hard_link_target = "fid:" + group[0].file_id
  204. }
  205. }
  206. }
  207. remove := make([]int, 0, len(files))
  208. // detect symlinks to other transferred files
  209. for i, f := range files {
  210. if f.file_type == FileType_symlink {
  211. link_dest, err := os.Readlink(f.expanded_local_path)
  212. if err != nil {
  213. remove = append(remove, i)
  214. continue
  215. }
  216. f.symbolic_link_target = "path:" + link_dest
  217. is_abs := filepath.IsAbs(link_dest)
  218. q := link_dest
  219. if !is_abs {
  220. q = filepath.Join(filepath.Dir(f.expanded_local_path), link_dest)
  221. }
  222. st, err := os.Stat(q)
  223. if err == nil {
  224. stat, ok := st.Sys().(*syscall.Stat_t)
  225. if ok {
  226. fh := FileHash{uint64(stat.Dev), stat.Ino}
  227. gr, found := groups[fh]
  228. if found {
  229. g := utils.Filter(gr, func(x *File) bool {
  230. return os.SameFile(x.stat_result, st)
  231. })
  232. if len(g) > 0 {
  233. f.symbolic_link_target = "fid"
  234. if is_abs {
  235. f.symbolic_link_target = "fid_abs"
  236. }
  237. f.symbolic_link_target += ":" + g[0].file_id
  238. }
  239. }
  240. }
  241. }
  242. }
  243. }
  244. if len(remove) > 0 {
  245. for _, idx := range utils.Reverse(remove) {
  246. files[idx] = nil
  247. files = slices.Delete(files, idx, idx+1)
  248. }
  249. }
  250. return files, nil
  251. }
  252. type SendState int
  253. const (
  254. SEND_WAITING_FOR_PERMISSION SendState = iota
  255. SEND_PERMISSION_GRANTED
  256. SEND_PERMISSION_DENIED
  257. SEND_CANCELED
  258. )
  259. type Transfer struct {
  260. amt int64
  261. at time.Time
  262. }
  263. func (self *Transfer) is_too_old(now time.Time) bool {
  264. return now.Sub(self.at) > 30*time.Second
  265. }
  266. type ProgressTracker struct {
  267. total_size_of_all_files, total_bytes_to_transfer int64
  268. active_file *File
  269. total_transferred int64
  270. transfers []*Transfer
  271. transfered_stats_amt int64
  272. transfered_stats_interval time.Duration
  273. started_at time.Time
  274. signature_bytes int
  275. total_reported_progress int64
  276. }
  277. func (self *ProgressTracker) change_active_file(nf *File) {
  278. now := time.Now()
  279. self.active_file = nf
  280. nf.transmit_started_at = now
  281. }
  282. func (self *ProgressTracker) start_transfer() {
  283. t := Transfer{at: time.Now()}
  284. self.transfers = append(self.transfers, &t)
  285. self.started_at = t.at
  286. }
  287. func (self *ProgressTracker) on_transmit(amt int64, active_file *File) {
  288. active_file.transmitted_bytes += amt
  289. self.total_transferred += amt
  290. now := time.Now()
  291. self.transfers = append(self.transfers, &Transfer{amt: amt, at: now})
  292. for len(self.transfers) > 2 && self.transfers[0].is_too_old(now) {
  293. self.transfers = self.transfers[1:]
  294. }
  295. self.transfered_stats_interval = now.Sub(self.transfers[0].at)
  296. self.transfered_stats_amt = 0
  297. for _, t := range self.transfers {
  298. self.transfered_stats_amt += t.amt
  299. }
  300. }
  301. func (self *ProgressTracker) on_file_progress(af *File, delta int64) {
  302. if delta > 0 {
  303. self.total_reported_progress += delta
  304. }
  305. }
  306. func (self *ProgressTracker) on_file_done(af *File) {
  307. af.done_at = time.Now()
  308. }
  309. type SendManager struct {
  310. request_id string
  311. state SendState
  312. files []*File
  313. bypass string
  314. use_rsync bool
  315. file_progress func(*File, int)
  316. file_done func(*File) error
  317. fid_map map[string]*File
  318. all_acknowledged, all_started, has_transmitting, has_rsync bool
  319. active_idx int
  320. prefix, suffix string
  321. last_progress_file *File
  322. progress_tracker ProgressTracker
  323. current_chunk_uncompressed_sz int64
  324. current_chunk_write_id loop.IdType
  325. current_chunk_for_file_id string
  326. }
  327. func (self *SendManager) start_transfer() string {
  328. return FileTransmissionCommand{Action: Action_send, Bypass: self.bypass}.Serialize()
  329. }
  330. func (self *SendManager) initialize() {
  331. if self.bypass != "" {
  332. q, err := encode_bypass(self.request_id, self.bypass)
  333. if err == nil {
  334. self.bypass = q
  335. } else {
  336. fmt.Fprintln(os.Stderr, "Ignoring password because of error:", err)
  337. }
  338. }
  339. self.fid_map = make(map[string]*File, len(self.files))
  340. for _, f := range self.files {
  341. self.fid_map[f.file_id] = f
  342. }
  343. self.active_idx = -1
  344. self.current_chunk_uncompressed_sz = -1
  345. self.current_chunk_for_file_id = ""
  346. self.prefix = fmt.Sprintf("\x1b]%d;id=%s;", kitty.FileTransferCode, self.request_id)
  347. self.suffix = "\x1b\\"
  348. for _, f := range self.files {
  349. if f.file_size > 0 {
  350. self.progress_tracker.total_size_of_all_files += f.file_size
  351. }
  352. }
  353. self.progress_tracker.total_bytes_to_transfer = self.progress_tracker.total_size_of_all_files
  354. }
  355. type SendHandler struct {
  356. manager *SendManager
  357. opts *Options
  358. files []*File
  359. lp *loop.Loop
  360. ctx *markup.Context
  361. transmit_started, file_metadata_sent bool
  362. quit_after_write_code int
  363. finish_cmd_write_id loop.IdType
  364. check_paths_printed bool
  365. transfer_finish_sent bool
  366. max_name_length int
  367. progress_drawn bool
  368. failed_files, done_files []*File
  369. done_file_ids *utils.Set[string]
  370. transmit_ok_checked bool
  371. progress_update_timer loop.IdType
  372. spinner *tui.Spinner
  373. }
  374. func safe_divide[A constraints.Integer | constraints.Float, B constraints.Integer | constraints.Float](a A, b B) float64 {
  375. if b == 0 {
  376. return 0
  377. }
  378. return float64(a) / float64(b)
  379. }
  380. type Progress struct {
  381. spinner_char string
  382. bytes_so_far int64
  383. total_bytes int64
  384. secs_so_far float64
  385. bytes_per_sec float64
  386. is_complete bool
  387. max_path_length int
  388. }
  389. func reduce_to_single_grapheme(text string) string {
  390. limit := utf8.RuneCountInString(text)
  391. if limit < 2 {
  392. return text
  393. }
  394. for x := 1; x < limit; x++ {
  395. tt, w := wcswidth.TruncateToVisualLengthWithWidth(text, x)
  396. if w <= x {
  397. return tt
  398. }
  399. }
  400. return text
  401. }
  402. func render_path_in_width(path string, width int) string {
  403. path = filepath.ToSlash(path)
  404. if wcswidth.Stringwidth(path) <= width {
  405. return path
  406. }
  407. parts := strings.Split(path, string(filepath.Separator))
  408. reduced := strings.Join(utils.Map(reduce_to_single_grapheme, parts[:len(parts)-1]), string(filepath.Separator))
  409. path = filepath.Join(reduced, parts[len(parts)-1])
  410. if wcswidth.Stringwidth(path) <= width {
  411. return path
  412. }
  413. return wcswidth.TruncateToVisualLength(path, width-1) + `…`
  414. }
  415. func ljust(text string, width int) string {
  416. if w := wcswidth.Stringwidth(text); w < width {
  417. text += strings.Repeat(` `, (width - w))
  418. }
  419. return text
  420. }
  421. func rjust(text string, width int) string {
  422. if w := wcswidth.Stringwidth(text); w < width {
  423. text = strings.Repeat(` `, (width-w)) + text
  424. }
  425. return text
  426. }
  427. func render_progress_in_width(path string, p Progress, width int, ctx *markup.Context) string {
  428. unit_style := ctx.Dim(`|`)
  429. sep, trail, _ := strings.Cut(unit_style, "|")
  430. var ratio, rate, eta string
  431. if p.is_complete || p.bytes_so_far >= p.total_bytes {
  432. ratio = humanize.Size(uint64(p.total_bytes), humanize.SizeOptions{Separator: sep})
  433. rate = humanize.Size(uint64(safe_divide(float64(p.total_bytes), p.secs_so_far)), humanize.SizeOptions{Separator: sep}) + `/s`
  434. eta = ctx.Green(humanize.ShortDuration(time.Duration(float64(time.Second) * p.secs_so_far)))
  435. } else {
  436. tb := humanize.Size(p.total_bytes)
  437. sval, _, _ := strings.Cut(tb, " ")
  438. val, _ := strconv.ParseFloat(sval, 64)
  439. ratio = humanize.FormatNumber(val*safe_divide(p.bytes_so_far, p.total_bytes)) + `/` + strings.ReplaceAll(tb, ` `, sep)
  440. rate = humanize.Size(p.bytes_per_sec, humanize.SizeOptions{Separator: sep}) + `/s`
  441. bytes_left := p.total_bytes - p.bytes_so_far
  442. eta_seconds := safe_divide(bytes_left, p.bytes_per_sec)
  443. eta = humanize.ShortDuration(time.Duration(float64(time.Second) * eta_seconds))
  444. }
  445. lft := p.spinner_char + ` `
  446. max_space_for_path := width/2 - wcswidth.Stringwidth(lft)
  447. max_path_length := 80
  448. w := utils.Min(max_path_length, max_space_for_path)
  449. prefix := lft + render_path_in_width(path, w)
  450. w += wcswidth.Stringwidth(lft)
  451. prefix = ljust(prefix, w)
  452. q := ratio + trail + ctx.Yellow(" @ ") + rate + trail
  453. q = rjust(q, 25) + ` `
  454. eta = ` ` + eta
  455. if extra := width - w - wcswidth.Stringwidth(q) - wcswidth.Stringwidth(eta); extra > 4 {
  456. q += tui.RenderProgressBar(safe_divide(p.bytes_so_far, p.total_bytes), extra) + eta
  457. } else {
  458. q += strings.TrimSpace(eta)
  459. }
  460. return prefix + q
  461. }
  462. func (self *SendHandler) render_progress(name string, p Progress) {
  463. if p.spinner_char == "" {
  464. p.spinner_char = " "
  465. }
  466. if p.is_complete {
  467. p.bytes_so_far = p.total_bytes
  468. }
  469. p.max_path_length = self.max_name_length
  470. sz, _ := self.lp.ScreenSize()
  471. self.lp.QueueWriteString(render_progress_in_width(name, p, int(sz.WidthCells), self.ctx))
  472. }
  473. func (self *SendHandler) draw_progress() {
  474. self.lp.AllowLineWrapping(false)
  475. defer self.lp.AllowLineWrapping(true)
  476. var sc string
  477. for _, df := range self.done_files {
  478. sc = self.ctx.Green(`✔`)
  479. if df.err_msg != "" {
  480. sc = self.ctx.Err(`✘`)
  481. }
  482. if df.file_type == FileType_regular {
  483. self.draw_progress_for_current_file(df, sc, true)
  484. } else {
  485. self.lp.QueueWriteString(sc + ` ` + df.display_name + ` ` + self.ctx.Dim(self.ctx.Italic(df.file_type.String())))
  486. }
  487. self.lp.Println()
  488. self.done_file_ids.Add(df.file_id)
  489. }
  490. self.done_files = nil
  491. is_complete := self.quit_after_write_code > -1
  492. if is_complete {
  493. sc = self.ctx.Green(`✔`)
  494. if self.quit_after_write_code != 0 {
  495. sc = self.ctx.Err(`✘`)
  496. }
  497. } else {
  498. sc = self.spinner.Tick()
  499. }
  500. now := time.Now()
  501. if is_complete {
  502. sz, _ := self.lp.ScreenSize()
  503. self.lp.QueueWriteString(tui.RepeatChar(`─`, int(sz.WidthCells)))
  504. } else {
  505. af := self.manager.last_progress_file
  506. if af == nil || self.done_file_ids.Has(af.file_id) {
  507. if !self.manager.has_transmitting && self.done_file_ids.Len() == 0 {
  508. if self.manager.has_rsync {
  509. self.lp.QueueWriteString(sc + ` Transferring rsync signatures...`)
  510. } else {
  511. self.lp.QueueWriteString(sc + ` Transferring metadata...`)
  512. }
  513. }
  514. } else {
  515. self.draw_progress_for_current_file(af, sc, false)
  516. }
  517. }
  518. self.lp.Println()
  519. if p := self.manager.progress_tracker; p.total_reported_progress > 0 {
  520. self.render_progress(`Total`, Progress{
  521. spinner_char: sc, bytes_so_far: p.total_reported_progress, total_bytes: p.total_bytes_to_transfer,
  522. secs_so_far: now.Sub(p.started_at).Seconds(), is_complete: is_complete,
  523. bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval.Abs().Seconds()),
  524. })
  525. } else {
  526. self.lp.QueueWriteString(`File data transfer has not yet started`)
  527. }
  528. self.lp.Println()
  529. self.schedule_progress_update(self.spinner.Interval())
  530. self.progress_drawn = true
  531. }
  532. func (self *SendHandler) draw_progress_for_current_file(af *File, spinner_char string, is_complete bool) {
  533. p := self.manager.progress_tracker
  534. var secs_so_far time.Duration
  535. empty := File{}
  536. if af.done_at == empty.done_at {
  537. secs_so_far = time.Since(af.transmit_started_at)
  538. } else {
  539. secs_so_far = af.done_at.Sub(af.transmit_started_at)
  540. }
  541. self.render_progress(af.display_name, Progress{
  542. spinner_char: spinner_char, is_complete: is_complete,
  543. bytes_so_far: af.reported_progress, total_bytes: af.bytes_to_transmit,
  544. secs_so_far: secs_so_far.Seconds(), bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval.Abs().Seconds()),
  545. })
  546. }
  547. func (self *SendHandler) erase_progress() {
  548. if self.progress_drawn {
  549. self.progress_drawn = false
  550. self.lp.MoveCursorVertically(-2)
  551. self.lp.QueueWriteString("\r")
  552. self.lp.ClearToEndOfScreen()
  553. }
  554. }
  555. func (self *SendHandler) refresh_progress(timer_id loop.IdType) (err error) {
  556. if !self.transmit_started || self.manager.state == SEND_CANCELED {
  557. return nil
  558. }
  559. if timer_id == self.progress_update_timer {
  560. self.progress_update_timer = 0
  561. }
  562. if self.manager.active_file() == nil && !self.manager.all_acknowledged && self.done_file_ids.Len() != 0 && self.done_file_ids.Len() < len(self.manager.files) {
  563. if err = self.transmit_next_chunk(); err != nil {
  564. return err
  565. }
  566. }
  567. self.lp.StartAtomicUpdate()
  568. defer self.lp.EndAtomicUpdate()
  569. self.erase_progress()
  570. self.draw_progress()
  571. return nil
  572. }
  573. func (self *SendHandler) schedule_progress_update(delay time.Duration) {
  574. if self.progress_update_timer == 0 {
  575. timer_id, err := self.lp.AddTimer(delay, false, self.refresh_progress)
  576. if err == nil {
  577. self.progress_update_timer = timer_id
  578. }
  579. }
  580. }
  581. func (self *SendHandler) on_file_progress(f *File, change int) {
  582. self.schedule_progress_update(100 * time.Millisecond)
  583. }
  584. func (self *SendHandler) on_file_done(f *File) error {
  585. self.done_files = append(self.done_files, f)
  586. if f.err_msg != "" {
  587. self.failed_files = append(self.failed_files, f)
  588. }
  589. return self.refresh_progress(0)
  590. }
  591. func (self *SendHandler) send_payload(payload string) loop.IdType {
  592. self.lp.QueueWriteString(self.manager.prefix)
  593. self.lp.QueueWriteString(payload)
  594. return self.lp.QueueWriteString(self.manager.suffix)
  595. }
  596. func (self *File) metadata_command(use_rsync bool) *FileTransmissionCommand {
  597. if use_rsync && self.rsync_capable {
  598. self.ttype = TransmissionType_rsync
  599. }
  600. if self.compression_capable {
  601. self.compression = Compression_zlib
  602. self.compressor = NewZlibCompressor()
  603. } else {
  604. self.compressor = &IdentityCompressor{}
  605. }
  606. return &FileTransmissionCommand{
  607. Action: Action_file, Compression: self.compression, Ftype: self.file_type,
  608. Name: self.remote_path, Permissions: self.permissions, Mtime: time.Duration(self.mtime.UnixNano()),
  609. File_id: self.file_id, Ttype: self.ttype,
  610. }
  611. }
  612. func (self *SendManager) send_file_metadata(send func(string) loop.IdType) {
  613. for _, f := range self.files {
  614. ftc := f.metadata_command(self.use_rsync)
  615. send(ftc.Serialize())
  616. }
  617. }
  618. func (self *SendHandler) send_file_metadata() {
  619. if !self.file_metadata_sent {
  620. self.file_metadata_sent = true
  621. self.manager.send_file_metadata(self.send_payload)
  622. }
  623. }
  624. func (self *SendManager) update_collective_statuses() {
  625. var found_not_started, found_not_done, has_rsync, has_transmitting bool
  626. for _, f := range self.files {
  627. if f.state != ACKNOWLEDGED {
  628. found_not_done = true
  629. }
  630. if f.state == WAITING_FOR_START {
  631. found_not_started = true
  632. } else if f.state == TRANSMITTING {
  633. has_transmitting = true
  634. }
  635. if f.ttype == TransmissionType_rsync {
  636. has_rsync = true
  637. }
  638. }
  639. self.all_acknowledged = !found_not_done
  640. self.all_started = !found_not_started
  641. self.has_rsync = has_rsync
  642. self.has_transmitting = has_transmitting
  643. }
  644. func (self *SendManager) on_file_status_update(ftc *FileTransmissionCommand) error {
  645. file := self.fid_map[ftc.File_id]
  646. if file == nil {
  647. return nil
  648. }
  649. switch ftc.Status {
  650. case `STARTED`:
  651. file.remote_final_path = ftc.Name
  652. file.remote_initial_size = int64(ftc.Size)
  653. if file.file_type == FileType_directory {
  654. file.state = FINISHED
  655. } else {
  656. if ftc.Ttype == TransmissionType_rsync {
  657. file.state = WAITING_FOR_DATA
  658. } else {
  659. file.state = TRANSMITTING
  660. }
  661. if file.state == WAITING_FOR_DATA {
  662. file.differ = rsync.NewDiffer()
  663. }
  664. self.update_collective_statuses()
  665. }
  666. case `PROGRESS`:
  667. self.last_progress_file = file
  668. change := int64(ftc.Size) - file.reported_progress
  669. file.reported_progress = int64(ftc.Size)
  670. self.progress_tracker.on_file_progress(file, change)
  671. self.file_progress(file, int(change))
  672. default:
  673. if ftc.Name != "" && file.remote_final_path == "" {
  674. file.remote_final_path = ftc.Name
  675. }
  676. file.state = ACKNOWLEDGED
  677. if ftc.Status == `OK` {
  678. if ftc.Size > 0 {
  679. change := int64(ftc.Size) - file.reported_progress
  680. file.reported_progress = int64(ftc.Size)
  681. self.progress_tracker.on_file_progress(file, change)
  682. self.file_progress(file, int(change))
  683. }
  684. } else {
  685. file.err_msg = ftc.Status
  686. }
  687. self.progress_tracker.on_file_done(file)
  688. if err := self.file_done(file); err != nil {
  689. return err
  690. }
  691. if self.active_idx > -1 && file == self.files[self.active_idx] {
  692. self.active_idx = -1
  693. }
  694. self.update_collective_statuses()
  695. }
  696. return nil
  697. }
  698. func (self *File) start_delta_calculation() (err error) {
  699. self.state = TRANSMITTING
  700. if self.actual_file == nil {
  701. self.actual_file, err = os.Open(self.expanded_local_path)
  702. if err != nil {
  703. return
  704. }
  705. }
  706. self.deltabuf = bytes.NewBuffer(make([]byte, 0, 32+rsync.DataSizeMultiple*self.differ.BlockSize()))
  707. self.delta_loader = self.differ.CreateDelta(self.actual_file, self.deltabuf)
  708. return nil
  709. }
  710. func (self *SendManager) on_signature_data_received(ftc *FileTransmissionCommand) error {
  711. file := self.fid_map[ftc.File_id]
  712. if file == nil || file.state != WAITING_FOR_DATA {
  713. return nil
  714. }
  715. if file.differ == nil {
  716. file.differ = rsync.NewDiffer()
  717. }
  718. if err := file.differ.AddSignatureData(ftc.Data); err != nil {
  719. return err
  720. }
  721. self.progress_tracker.signature_bytes += len(ftc.Data)
  722. if ftc.Action == Action_end_data {
  723. if err := file.differ.FinishSignatureData(); err != nil {
  724. return err
  725. }
  726. return file.start_delta_calculation()
  727. }
  728. return nil
  729. }
  730. func (self *SendManager) on_file_transfer_response(ftc *FileTransmissionCommand) error {
  731. switch ftc.Action {
  732. case Action_status:
  733. if ftc.File_id != "" {
  734. return self.on_file_status_update(ftc)
  735. }
  736. if ftc.Status == "OK" {
  737. self.state = SEND_PERMISSION_GRANTED
  738. } else {
  739. self.state = SEND_PERMISSION_DENIED
  740. }
  741. case Action_data, Action_end_data:
  742. if ftc.File_id != "" {
  743. return self.on_signature_data_received(ftc)
  744. }
  745. }
  746. return nil
  747. }
  748. func (self *SendHandler) on_file_transfer_response(ftc *FileTransmissionCommand) error {
  749. if ftc.Id != self.manager.request_id {
  750. return nil
  751. }
  752. if ftc.Action == Action_status && ftc.Status == "CANCELED" {
  753. self.lp.Quit(1)
  754. return nil
  755. }
  756. if self.quit_after_write_code > -1 || self.manager.state == SEND_CANCELED {
  757. return nil
  758. }
  759. before := self.manager.state
  760. err := self.manager.on_file_transfer_response(ftc)
  761. if err != nil {
  762. return err
  763. }
  764. if before == SEND_WAITING_FOR_PERMISSION {
  765. switch self.manager.state {
  766. case SEND_PERMISSION_DENIED:
  767. self.lp.Println(self.ctx.Err("Permission denied for this transfer"))
  768. self.lp.Quit(1)
  769. return nil
  770. case SEND_PERMISSION_GRANTED:
  771. self.lp.Println(self.ctx.Green("Permission granted for this transfer"))
  772. self.send_file_metadata()
  773. }
  774. }
  775. if !self.transmit_started {
  776. return self.check_for_transmit_ok()
  777. }
  778. if self.manager.all_acknowledged {
  779. self.transfer_finished()
  780. } else if ftc.Action == Action_end_data && ftc.File_id != "" {
  781. return self.transmit_next_chunk()
  782. }
  783. return nil
  784. }
  785. func (self *SendHandler) check_for_transmit_ok() (err error) {
  786. if self.transmit_ok_checked {
  787. return self.start_transfer()
  788. }
  789. if self.manager.state != SEND_PERMISSION_GRANTED {
  790. return
  791. }
  792. if self.opts.ConfirmPaths {
  793. if self.manager.all_started {
  794. self.print_check_paths()
  795. }
  796. return
  797. }
  798. self.transmit_ok_checked = true
  799. return self.start_transfer()
  800. }
  801. func (self *SendHandler) print_check_paths() {
  802. if self.check_paths_printed {
  803. return
  804. }
  805. self.check_paths_printed = true
  806. self.lp.Println(`The following file transfers will be performed. A red destination means an existing file will be overwritten.`)
  807. for _, df := range self.manager.files {
  808. fn := df.remote_final_path
  809. if df.remote_initial_size > -1 {
  810. fn = self.ctx.Red(fn)
  811. }
  812. self.lp.Println(
  813. self.ctx.Prettify(fmt.Sprintf(":%s:`%s` ", df.file_type.Color(), df.file_type.ShortText())),
  814. df.display_name, ` → `, fn)
  815. }
  816. hsize := humanize.Size(self.manager.progress_tracker.total_bytes_to_transfer)
  817. if n := len(self.manager.files); n == 1 {
  818. self.lp.Println(fmt.Sprintf(`Transferring %d file of total size: %s`, n, hsize))
  819. } else {
  820. self.lp.Println(fmt.Sprintf(`Transferring %d files of total size: %s`, n, hsize))
  821. }
  822. self.print_continue_msg()
  823. }
  824. func (self *SendManager) activate_next_ready_file() *File {
  825. if self.active_idx > -1 && self.active_idx < len(self.files) {
  826. self.files[self.active_idx].transmit_ended_at = time.Now()
  827. }
  828. for i, f := range self.files {
  829. if f.state == TRANSMITTING {
  830. self.active_idx = i
  831. self.update_collective_statuses()
  832. self.progress_tracker.change_active_file(f)
  833. return f
  834. }
  835. }
  836. self.active_idx = -1
  837. self.update_collective_statuses()
  838. return nil
  839. }
  840. func (self *SendManager) active_file() *File {
  841. if self.active_idx > -1 && self.active_idx < len(self.files) {
  842. return self.files[self.active_idx]
  843. }
  844. return nil
  845. }
  846. func (self *File) next_chunk() (ans string, asz int, err error) {
  847. const sz = 1024 * 1024
  848. switch self.file_type {
  849. case FileType_symlink:
  850. self.state = FINISHED
  851. ans, asz = self.symbolic_link_target, len(self.symbolic_link_target)
  852. return
  853. case FileType_link:
  854. self.state = FINISHED
  855. ans, asz = self.hard_link_target, len(self.hard_link_target)
  856. return
  857. }
  858. is_last := false
  859. var chunk []byte
  860. if self.delta_loader != nil {
  861. for !is_last && self.deltabuf.Len() < sz {
  862. if err = self.delta_loader(); err != nil {
  863. if err == io.EOF {
  864. is_last = true
  865. } else {
  866. return
  867. }
  868. }
  869. }
  870. chunk = slices.Clone(self.deltabuf.Bytes())
  871. self.deltabuf.Reset()
  872. } else {
  873. if self.actual_file == nil {
  874. self.actual_file, err = os.Open(self.expanded_local_path)
  875. if err != nil {
  876. return
  877. }
  878. }
  879. chunk = make([]byte, sz)
  880. var n int
  881. n, err = self.actual_file.Read(chunk)
  882. if err != nil && !errors.Is(err, io.EOF) {
  883. return
  884. }
  885. if n <= 0 {
  886. is_last = true
  887. } else if pos, _ := self.actual_file.Seek(0, io.SeekCurrent); pos >= self.file_size {
  888. is_last = true
  889. }
  890. chunk = chunk[:n]
  891. }
  892. uncompressed_sz := len(chunk)
  893. cchunk := self.compressor.Compress(chunk)
  894. if is_last {
  895. trail := self.compressor.Flush()
  896. if len(trail) >= 0 {
  897. cchunk = append(cchunk, trail...)
  898. }
  899. self.state = FINISHED
  900. if self.actual_file != nil {
  901. err = self.actual_file.Close()
  902. self.actual_file = nil
  903. if err != nil {
  904. return
  905. }
  906. }
  907. self.delta_loader = nil
  908. self.deltabuf = nil
  909. }
  910. ans, asz = utils.UnsafeBytesToString(cchunk), uncompressed_sz
  911. return
  912. }
  913. func (self *SendManager) next_chunks(callback func(string) loop.IdType) error {
  914. if self.active_file() == nil {
  915. self.activate_next_ready_file()
  916. }
  917. af := self.active_file()
  918. if af == nil {
  919. return nil
  920. }
  921. chunk := ""
  922. self.current_chunk_uncompressed_sz = 0
  923. for af.state != FINISHED && len(chunk) == 0 {
  924. c, usz, err := af.next_chunk()
  925. if err != nil {
  926. return err
  927. }
  928. self.current_chunk_uncompressed_sz += int64(usz)
  929. self.current_chunk_for_file_id = af.file_id
  930. chunk = c
  931. }
  932. is_last := af.state == FINISHED
  933. if len(chunk) > 0 {
  934. split_for_transfer(utils.UnsafeStringToBytes(chunk), af.file_id, is_last, func(ftc *FileTransmissionCommand) {
  935. self.current_chunk_write_id = callback(ftc.Serialize())
  936. })
  937. } else if is_last {
  938. self.current_chunk_write_id = callback(FileTransmissionCommand{Action: Action_end_data, File_id: af.file_id}.Serialize())
  939. }
  940. if is_last {
  941. self.activate_next_ready_file()
  942. if self.active_file() == nil {
  943. return nil
  944. }
  945. }
  946. return nil
  947. }
  948. func (self *SendHandler) transmit_next_chunk() (err error) {
  949. found_chunk := false
  950. for !found_chunk {
  951. if err = self.manager.next_chunks(func(chunk string) loop.IdType {
  952. found_chunk = true
  953. return self.send_payload(chunk)
  954. }); err != nil {
  955. return err
  956. }
  957. if !found_chunk {
  958. if self.manager.all_acknowledged {
  959. self.transfer_finished()
  960. return
  961. }
  962. self.manager.update_collective_statuses()
  963. if !self.manager.has_transmitting {
  964. return
  965. }
  966. }
  967. }
  968. return
  969. }
  970. func (self *SendHandler) start_transfer() (err error) {
  971. if self.manager.active_file() == nil {
  972. self.manager.activate_next_ready_file()
  973. }
  974. if self.manager.active_file() != nil {
  975. self.transmit_started = true
  976. self.manager.progress_tracker.start_transfer()
  977. if err = self.transmit_next_chunk(); err != nil {
  978. return
  979. }
  980. self.draw_progress()
  981. }
  982. return
  983. }
  984. func (self *SendHandler) initialize() error {
  985. self.manager.initialize()
  986. self.spinner = tui.NewSpinner("dots")
  987. self.ctx = markup.New(true)
  988. self.send_payload(self.manager.start_transfer())
  989. if self.opts.PermissionsBypass != "" {
  990. // dont wait for permission, not needed with a bypass and avoids a roundtrip
  991. self.send_file_metadata()
  992. }
  993. return nil
  994. }
  995. func (self *SendHandler) transfer_finished() {
  996. if self.transfer_finish_sent {
  997. return
  998. }
  999. self.transfer_finish_sent = true
  1000. self.finish_cmd_write_id = self.send_payload(FileTransmissionCommand{Action: Action_finish}.Serialize())
  1001. }
  1002. func (self *SendHandler) on_text(text string, from_key_event, in_bracketed_paste bool) error {
  1003. if self.quit_after_write_code > -1 {
  1004. return nil
  1005. }
  1006. if self.check_paths_printed && !self.transmit_started {
  1007. switch strings.ToLower(text) {
  1008. case "y":
  1009. err := self.start_transfer()
  1010. if err != nil {
  1011. return err
  1012. }
  1013. if self.manager.all_acknowledged {
  1014. if err = self.refresh_progress(0); err != nil {
  1015. return err
  1016. }
  1017. self.transfer_finished()
  1018. }
  1019. return nil
  1020. case "n":
  1021. self.failed_files = nil
  1022. self.abort_transfer()
  1023. self.lp.Println(`Sending cancel request to terminal`)
  1024. return nil
  1025. }
  1026. self.print_continue_msg()
  1027. }
  1028. return nil
  1029. }
  1030. func (self *SendHandler) print_continue_msg() {
  1031. self.lp.Println(
  1032. `Press`, self.ctx.Green(`y`), `to continue or`, self.ctx.BrightRed(`n`), `to abort`)
  1033. }
  1034. func (self *SendHandler) abort_transfer(delay ...time.Duration) {
  1035. d := 5 * time.Second
  1036. if len(delay) > 0 {
  1037. d = delay[0]
  1038. }
  1039. self.send_payload(FileTransmissionCommand{Action: Action_cancel}.Serialize())
  1040. self.manager.state = SEND_CANCELED
  1041. _, _ = self.lp.AddTimer(d, false, func(loop.IdType) error {
  1042. self.lp.Quit(1)
  1043. return nil
  1044. })
  1045. }
  1046. func (self *SendHandler) on_resize(old_size, new_size loop.ScreenSize) error {
  1047. if self.progress_drawn {
  1048. return self.refresh_progress(0)
  1049. }
  1050. return nil
  1051. }
  1052. func (self *SendHandler) on_key_event(ev *loop.KeyEvent) error {
  1053. if self.quit_after_write_code > -1 {
  1054. return nil
  1055. }
  1056. if ev.MatchesPressOrRepeat("esc") {
  1057. ev.Handled = true
  1058. if self.check_paths_printed && !self.transmit_started {
  1059. self.failed_files = nil
  1060. self.abort_transfer()
  1061. self.lp.Println(`Sending cancel request to terminal`)
  1062. return nil
  1063. } else {
  1064. self.on_interrupt()
  1065. }
  1066. } else if ev.MatchesPressOrRepeat("ctrl+c") {
  1067. self.on_interrupt()
  1068. ev.Handled = true
  1069. }
  1070. return nil
  1071. }
  1072. func (self *SendHandler) on_writing_finished(msg_id loop.IdType, has_pending_writes bool) (err error) {
  1073. chunk_transmitted := self.manager.current_chunk_uncompressed_sz >= 0 && msg_id == self.manager.current_chunk_write_id
  1074. if chunk_transmitted {
  1075. self.manager.progress_tracker.on_transmit(self.manager.current_chunk_uncompressed_sz, self.manager.fid_map[self.manager.current_chunk_for_file_id])
  1076. self.manager.current_chunk_uncompressed_sz = -1
  1077. self.manager.current_chunk_write_id = 0
  1078. self.manager.current_chunk_for_file_id = ""
  1079. }
  1080. if self.finish_cmd_write_id > 0 && msg_id == self.finish_cmd_write_id {
  1081. if len(self.failed_files) > 0 {
  1082. self.quit_after_write_code = 1
  1083. } else {
  1084. self.quit_after_write_code = 0
  1085. }
  1086. if err = self.refresh_progress(0); err != nil {
  1087. return err
  1088. }
  1089. }
  1090. if self.quit_after_write_code > -1 && !has_pending_writes {
  1091. self.lp.Quit(self.quit_after_write_code)
  1092. return
  1093. }
  1094. if self.manager.state == SEND_PERMISSION_GRANTED && !self.transmit_started {
  1095. return self.check_for_transmit_ok()
  1096. }
  1097. if chunk_transmitted {
  1098. if err = self.refresh_progress(0); err != nil {
  1099. return err
  1100. }
  1101. return self.transmit_next_chunk()
  1102. }
  1103. return
  1104. }
  1105. func (self *SendHandler) on_interrupt() {
  1106. if self.quit_after_write_code > -1 {
  1107. return
  1108. }
  1109. if self.manager.state == SEND_CANCELED {
  1110. self.lp.Println(`Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received`)
  1111. return
  1112. }
  1113. self.lp.Println(self.ctx.BrightRed(`Interrupt requested, cancelling transfer, transferred files are in undefined state`))
  1114. self.abort_transfer()
  1115. }
  1116. func send_loop(opts *Options, files []*File) (err error, rc int) {
  1117. lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors)
  1118. if err != nil {
  1119. return err, 1
  1120. }
  1121. handler := &SendHandler{
  1122. opts: opts, files: files, lp: lp, quit_after_write_code: -1,
  1123. max_name_length: utils.Max(0, utils.Map(func(f *File) int { return wcswidth.Stringwidth(f.display_name) }, files)...),
  1124. progress_drawn: true, done_file_ids: utils.NewSet[string](),
  1125. manager: &SendManager{
  1126. request_id: random_id(), files: files, bypass: opts.PermissionsBypass, use_rsync: opts.TransmitDeltas,
  1127. },
  1128. }
  1129. handler.manager.file_progress = handler.on_file_progress
  1130. handler.manager.file_done = handler.on_file_done
  1131. lp.OnInitialize = func() (string, error) {
  1132. lp.SetCursorVisible(false)
  1133. return "", handler.initialize()
  1134. }
  1135. lp.OnFinalize = func() string {
  1136. lp.SetCursorVisible(true)
  1137. return ""
  1138. }
  1139. ftc_code := strconv.Itoa(kitty.FileTransferCode)
  1140. lp.OnEscapeCode = func(et loop.EscapeCodeType, payload []byte) error {
  1141. if et == loop.OSC {
  1142. if idx := bytes.IndexByte(payload, ';'); idx > 0 {
  1143. if utils.UnsafeBytesToString(payload[:idx]) == ftc_code {
  1144. ftc, err := NewFileTransmissionCommand(utils.UnsafeBytesToString(payload[idx+1:]))
  1145. if err != nil {
  1146. return fmt.Errorf("Received invalid FileTransmissionCommand from terminal with error: %w", err)
  1147. }
  1148. return handler.on_file_transfer_response(ftc)
  1149. }
  1150. }
  1151. }
  1152. return nil
  1153. }
  1154. lp.OnText = handler.on_text
  1155. lp.OnKeyEvent = handler.on_key_event
  1156. lp.OnResize = handler.on_resize
  1157. lp.OnWriteComplete = handler.on_writing_finished
  1158. err = lp.Run()
  1159. if err != nil {
  1160. return err, 1
  1161. }
  1162. if lp.DeathSignalName() != "" {
  1163. lp.KillIfSignalled()
  1164. return
  1165. }
  1166. p := handler.manager.progress_tracker
  1167. if handler.manager.has_rsync && p.total_transferred+int64(p.signature_bytes) > 0 && lp.ExitCode() == 0 {
  1168. var tsf int64
  1169. for _, f := range files {
  1170. if f.ttype == TransmissionType_rsync {
  1171. tsf += f.file_size
  1172. }
  1173. }
  1174. if tsf > 0 {
  1175. print_rsync_stats(tsf, p.total_transferred, int64(p.signature_bytes))
  1176. }
  1177. }
  1178. if len(handler.failed_files) > 0 {
  1179. fmt.Fprintf(os.Stderr, "Transfer of %d out of %d files failed\n", len(handler.failed_files), len(handler.manager.files))
  1180. for _, f := range handler.failed_files {
  1181. fmt.Println(handler.ctx.BrightRed(f.display_name))
  1182. fmt.Println(` `, f.err_msg)
  1183. }
  1184. rc = 1
  1185. }
  1186. if lp.ExitCode() != 0 {
  1187. rc = lp.ExitCode()
  1188. }
  1189. return
  1190. }
  1191. func send_main(opts *Options, args []string) (err error, rc int) {
  1192. fmt.Println("Scanning files…")
  1193. files, err := files_for_send(opts, args)
  1194. if err != nil {
  1195. return err, 1
  1196. }
  1197. fmt.Printf("Found %d files and directories, requesting transfer permission…", len(files))
  1198. fmt.Println()
  1199. err, rc = send_loop(opts, files)
  1200. return
  1201. }