12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190 |
- // License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>
- package transfer
- import (
- "bytes"
- "compress/zlib"
- "errors"
- "fmt"
- "io"
- "io/fs"
- "os"
- "path/filepath"
- "slices"
- "strconv"
- "strings"
- "time"
- "kitty"
- "kitty/kittens/unicode_input"
- "kitty/tools/cli/markup"
- "kitty/tools/rsync"
- "kitty/tools/tui"
- "kitty/tools/tui/loop"
- "kitty/tools/utils"
- "kitty/tools/utils/humanize"
- "kitty/tools/wcswidth"
- "golang.org/x/sys/unix"
- )
- var _ = fmt.Print
- type state int
- const (
- state_waiting_for_permission state = iota
- state_waiting_for_file_metadata
- state_transferring
- state_canceled
- )
- type output_file interface {
- write([]byte) (int, error)
- close() error
- tell() (int64, error)
- }
- type filesystem_file struct {
- f *os.File
- }
- func (ff *filesystem_file) tell() (int64, error) {
- return ff.f.Seek(0, io.SeekCurrent)
- }
- func (ff *filesystem_file) close() error {
- return ff.f.Close()
- }
- func (ff *filesystem_file) write(data []byte) (int, error) {
- n, err := ff.f.Write(data)
- if err == nil && n < len(data) {
- err = io.ErrShortWrite
- }
- return n, err
- }
- type patch_file struct {
- path string
- src, temp *os.File
- p *rsync.Patcher
- }
- func (pf *patch_file) tell() (int64, error) {
- if pf.temp == nil {
- s, err := os.Stat(pf.path)
- return s.Size(), err
- }
- return pf.temp.Seek(0, io.SeekCurrent)
- }
- func (pf *patch_file) close() (err error) {
- if pf.p == nil {
- return
- }
- err = pf.p.FinishDelta()
- pf.src.Close()
- pf.temp.Close()
- if err == nil {
- err = os.Rename(pf.temp.Name(), pf.src.Name())
- }
- pf.src = nil
- pf.temp = nil
- pf.p = nil
- return
- }
- func (pf *patch_file) write(data []byte) (int, error) {
- if err := pf.p.UpdateDelta(data); err == nil {
- return len(data), nil
- } else {
- return 0, err
- }
- }
- func new_patch_file(path string, p *rsync.Patcher) (ans *patch_file, err error) {
- ans = &patch_file{p: p, path: path}
- var f *os.File
- if f, err = os.Open(path); err != nil {
- return
- } else {
- ans.src = f
- }
- if f, err = os.CreateTemp(filepath.Dir(path), ""); err != nil {
- ans.src.Close()
- return
- } else {
- ans.temp = f
- }
- ans.p.StartDelta(ans.temp, ans.src)
- return
- }
- type remote_file struct {
- expected_size int64
- expect_diff bool
- patcher *rsync.Patcher
- transmit_started_at, done_at time.Time
- written_bytes int64
- received_bytes int64
- sent_bytes int64
- ftype FileType
- mtime time.Duration
- spec_id int
- permissions fs.FileMode
- remote_path string
- display_name string
- remote_id, remote_target string
- parent string
- expanded_local_path string
- file_id string
- decompressor utils.StreamDecompressor
- compression_type Compression
- remote_symlink_value string
- actual_file output_file
- }
- func (self *remote_file) close() (err error) {
- if self.decompressor != nil {
- err = self.decompressor(nil, true)
- self.decompressor = nil
- }
- if self.actual_file != nil {
- af := self.actual_file
- self.actual_file = nil
- cerr := af.close()
- if err == nil {
- err = cerr
- }
- }
- return
- }
- func (self *remote_file) Write(data []byte) (n int, err error) {
- switch self.ftype {
- default:
- return 0, fmt.Errorf("Cannot write data to files of type: %s", self.ftype)
- case FileType_symlink:
- self.remote_symlink_value += string(data)
- return len(data), nil
- case FileType_regular:
- if self.actual_file == nil {
- parent := filepath.Dir(self.expanded_local_path)
- if parent != "" {
- if err = os.MkdirAll(parent, 0o755); err != nil {
- return 0, err
- }
- }
- if self.expect_diff {
- if pf, err := new_patch_file(self.expanded_local_path, self.patcher); err != nil {
- return 0, err
- } else {
- self.actual_file = pf
- }
- } else {
- if ff, err := os.Create(self.expanded_local_path); err != nil {
- return 0, err
- } else {
- f := filesystem_file{f: ff}
- self.actual_file = &f
- }
- }
- }
- return self.actual_file.write(data)
- }
- }
- func (self *remote_file) write_data(data []byte, is_last bool) (amt_written int64, err error) {
- self.received_bytes += int64(len(data))
- var base, pos int64
- defer func() {
- if err != nil {
- err = fmt.Errorf("Failed writing to %s with error: %w", self.expanded_local_path, err)
- }
- }()
- if self.actual_file != nil {
- base, err = self.actual_file.tell()
- if err != nil {
- return 0, err
- }
- }
- err = self.decompressor(data, is_last)
- if is_last {
- self.decompressor = nil
- }
- if self.actual_file != nil && err == nil {
- pos, err = self.actual_file.tell()
- if err != nil {
- return 0, err
- }
- } else {
- pos = base
- }
- amt_written = pos - base
- if is_last && self.actual_file != nil {
- cerr := self.actual_file.close()
- if err == nil {
- err = cerr
- }
- self.actual_file = nil
- }
- return
- }
- func syscall_mode(i os.FileMode) (o uint32) {
- o |= uint32(i.Perm())
- if i&os.ModeSetuid != 0 {
- o |= unix.S_ISUID
- }
- if i&os.ModeSetgid != 0 {
- o |= unix.S_ISGID
- }
- if i&os.ModeSticky != 0 {
- o |= unix.S_ISVTX
- }
- // No mapping for Go's ModeTemporary (plan9 only).
- return
- }
- func (self *remote_file) apply_metadata() {
- t := unix.NsecToTimespec(int64(self.mtime))
- for {
- if err := unix.UtimesNanoAt(unix.AT_FDCWD, self.expanded_local_path, []unix.Timespec{t, t}, unix.AT_SYMLINK_NOFOLLOW); err == nil || !(errors.Is(err, unix.EINTR) || errors.Is(err, unix.EAGAIN)) {
- break
- }
- }
- if self.ftype == FileType_symlink {
- for {
- if err := unix.Fchmodat(unix.AT_FDCWD, self.expanded_local_path, syscall_mode(self.permissions), unix.AT_SYMLINK_NOFOLLOW); err == nil || !(errors.Is(err, unix.EINTR) || errors.Is(err, unix.EAGAIN)) {
- break
- }
- }
- } else {
- _ = os.Chmod(self.expanded_local_path, self.permissions)
- }
- }
- func new_remote_file(opts *Options, ftc *FileTransmissionCommand, file_id uint64) (*remote_file, error) {
- spec_id, err := strconv.Atoi(ftc.File_id)
- if err != nil {
- return nil, err
- }
- ans := &remote_file{
- expected_size: ftc.Size, ftype: ftc.Ftype, mtime: ftc.Mtime, spec_id: spec_id, file_id: strconv.FormatUint(file_id, 10),
- permissions: ftc.Permissions, remote_path: ftc.Name, display_name: wcswidth.StripEscapeCodes(ftc.Name),
- remote_id: ftc.Status, remote_target: string(ftc.Data), parent: ftc.Parent,
- }
- compression_capable := ftc.Ftype == FileType_regular && ftc.Size > 4096 && should_be_compressed(ftc.Name, opts.Compress)
- if compression_capable {
- ans.decompressor = utils.NewStreamDecompressor(zlib.NewReader, ans)
- ans.compression_type = Compression_zlib
- } else {
- ans.decompressor = utils.NewStreamDecompressor(nil, ans)
- ans.compression_type = Compression_none
- }
- return ans, nil
- }
- type receive_progress_tracker struct {
- total_size_of_all_files int64
- total_bytes_to_transfer int64
- total_transferred int64
- transfered_stats_amt int64
- transfered_stats_interval time.Duration
- started_at time.Time
- transfers []Transfer
- active_file *remote_file
- done_files []*remote_file
- }
- func (self *receive_progress_tracker) change_active_file(nf *remote_file) {
- now := time.Now()
- self.active_file = nf
- nf.transmit_started_at = now
- }
- func (self *receive_progress_tracker) start_transfer() {
- self.started_at = time.Now()
- self.transfers = append(self.transfers, Transfer{at: time.Now()})
- }
- func (self *receive_progress_tracker) file_written(af *remote_file, amt int64, is_done bool) {
- if self.active_file != af {
- self.change_active_file(af)
- }
- af.written_bytes += amt
- self.total_transferred += amt
- now := time.Now()
- self.transfers = append(self.transfers, Transfer{amt: amt, at: now})
- for len(self.transfers) > 2 && self.transfers[0].is_too_old(now) {
- utils.ShiftLeft(self.transfers, 1)
- }
- self.transfered_stats_interval = now.Sub(self.transfers[0].at)
- self.transfered_stats_amt = 0
- for _, t := range self.transfers {
- self.transfered_stats_amt += t.amt
- }
- if is_done {
- af.done_at = now
- self.done_files = append(self.done_files, af)
- }
- }
- type manager struct {
- request_id string
- file_id_counter uint64
- cli_opts *Options
- spec []string
- dest string
- bypass string
- use_rsync bool
- failed_specs map[int]string
- spec_counts map[int]int
- remote_home string
- prefix, suffix string
- transfer_done bool
- files []*remote_file
- files_to_be_transferred map[string]*remote_file
- state state
- progress_tracker receive_progress_tracker
- }
- type transmit_iterator = func(queue_write func(string) loop.IdType) (loop.IdType, error)
- type sigwriter struct {
- wid loop.IdType
- file_id, prefix, suffix string
- q func(string) loop.IdType
- amt int64
- b bytes.Buffer
- }
- func (self *sigwriter) Write(b []byte) (int, error) {
- self.b.Write(b)
- if self.b.Len() > 4000 {
- self.flush()
- }
- return len(b), nil
- }
- func (self *sigwriter) flush() {
- frame := len(self.prefix) + len(self.suffix)
- split_for_transfer(self.b.Bytes(), self.file_id, false, func(ftc *FileTransmissionCommand) {
- self.q(self.prefix)
- data := ftc.Serialize(false)
- self.q(data)
- self.wid = self.q(self.suffix)
- self.amt += int64(frame + len(data))
- })
- self.b.Reset()
- }
- var files_done error = errors.New("files done")
- func (self *manager) request_files() transmit_iterator {
- pos := 0
- return func(queue_write func(string) loop.IdType) (last_write_id loop.IdType, err error) {
- var f *remote_file
- for pos < len(self.files) {
- f = self.files[pos]
- pos++
- if f.ftype == FileType_directory || (f.ftype == FileType_link && f.remote_target != "") {
- f = nil
- } else {
- break
- }
- }
- if f == nil {
- return 0, files_done
- }
- read_signature := self.use_rsync && f.ftype == FileType_regular
- if read_signature {
- if s, err := os.Lstat(f.expanded_local_path); err == nil {
- read_signature = s.Size() > 4096
- } else {
- read_signature = false
- }
- }
- last_write_id = self.send(FileTransmissionCommand{
- Action: Action_file, Name: f.remote_path, File_id: f.file_id, Ttype: utils.IfElse(
- read_signature, TransmissionType_rsync, TransmissionType_simple), Compression: f.compression_type,
- }, queue_write)
- if read_signature {
- fsf, err := os.Open(f.expanded_local_path)
- if err != nil {
- return 0, err
- }
- defer fsf.Close()
- f.expect_diff = true
- f.patcher = rsync.NewPatcher(f.expected_size)
- output := sigwriter{q: queue_write, file_id: f.file_id, prefix: self.prefix, suffix: self.suffix}
- s_it := f.patcher.CreateSignatureIterator(fsf, &output)
- for {
- err = s_it()
- if err == io.EOF {
- break
- } else if err != nil {
- return 0, err
- }
- }
- output.flush()
- f.sent_bytes += output.amt
- last_write_id = self.send(FileTransmissionCommand{Action: Action_end_data, File_id: f.file_id}, queue_write)
- }
- return
- }
- }
- type handler struct {
- lp *loop.Loop
- progress_update_timer loop.IdType
- spinner *tui.Spinner
- cli_opts *Options
- ctx *markup.Context
- manager manager
- quit_after_write_code int
- check_paths_printed bool
- transmit_started bool
- progress_drawn bool
- max_name_length int
- transmit_iterator transmit_iterator
- last_data_write_id loop.IdType
- }
- func (self *manager) send(c FileTransmissionCommand, send func(string) loop.IdType) loop.IdType {
- send(self.prefix)
- send(c.Serialize(false))
- return send(self.suffix)
- }
- func (self *manager) start_transfer(send func(string) loop.IdType) {
- self.send(FileTransmissionCommand{Action: Action_receive, Bypass: self.bypass, Size: int64(len(self.spec))}, send)
- for i, x := range self.spec {
- self.send(FileTransmissionCommand{Action: Action_file, File_id: strconv.Itoa(i), Name: x}, send)
- }
- self.progress_tracker.start_transfer()
- }
- func (self *handler) print_err(err error) {
- self.lp.Println(self.ctx.BrightRed(err.Error()))
- }
- func (self *handler) abort_with_error(err error, delay ...time.Duration) {
- if err != nil {
- self.print_err(err)
- }
- var d time.Duration = 5 * time.Second
- if len(delay) > 0 {
- d = delay[0]
- }
- self.lp.Println(`Waiting to ensure terminal cancels transfer, will quit in no more than`, d)
- self.manager.send(FileTransmissionCommand{Action: Action_cancel}, self.lp.QueueWriteString)
- self.manager.state = state_canceled
- _, _ = self.lp.AddTimer(d, false, self.do_error_quit)
- }
- func (self *handler) do_error_quit(loop.IdType) error {
- self.lp.Quit(1)
- return nil
- }
- func (self *manager) finalize_transfer() (err error) {
- self.transfer_done = true
- rid_map := make(map[string]*remote_file)
- for _, f := range self.files {
- rid_map[f.remote_id] = f
- }
- for _, f := range self.files {
- switch f.ftype {
- case FileType_directory:
- if err = os.MkdirAll(f.expanded_local_path, 0o755); err != nil {
- return fmt.Errorf("Failed to create directory with error: %w", err)
- }
- case FileType_link:
- tgt, found := rid_map[f.remote_target]
- if !found {
- return fmt.Errorf(`Hard link with remote id: {%s} not found`, f.remote_target)
- }
- if err = os.MkdirAll(filepath.Dir(f.expanded_local_path), 0o755); err == nil {
- os.Remove(f.expanded_local_path)
- err = os.Link(tgt.expanded_local_path, f.expanded_local_path)
- }
- if err != nil {
- return fmt.Errorf(`Failed to create link with error: %w`, err)
- }
- case FileType_symlink:
- lt := f.remote_symlink_value
- if f.remote_target != "" {
- tgt, found := rid_map[f.remote_target]
- if !found {
- return fmt.Errorf(`Symbolic link with remote id: {%s} not found`, f.remote_target)
- }
- lt = tgt.expanded_local_path
- if !strings.HasPrefix(f.remote_symlink_value, "/") {
- if lt, err = filepath.Rel(filepath.Dir(f.expanded_local_path), lt); err != nil {
- return fmt.Errorf(`Could not make symlink relative with error: %w`, err)
- }
- }
- }
- if lt == "" {
- return fmt.Errorf("Symlink %s sent without target", f.expanded_local_path)
- }
- os.Remove(f.expanded_local_path)
- if err = os.MkdirAll(filepath.Dir(f.expanded_local_path), 0o755); err != nil {
- return fmt.Errorf("Failed to create directory with error: %w", err)
- }
- if err = os.Symlink(lt, f.expanded_local_path); err != nil {
- return fmt.Errorf(`Failed to create symlink with error: %w`, err)
- }
- }
- f.apply_metadata()
- }
- return
- }
- func (self *manager) on_file_transfer_response(ftc *FileTransmissionCommand) (err error) {
- switch self.state {
- case state_waiting_for_permission:
- if ftc.Action == Action_status {
- if ftc.Status == `OK` {
- self.state = state_waiting_for_file_metadata
- } else {
- return unicode_input.ErrCanceledByUser
- }
- } else {
- return fmt.Errorf(`Unexpected response from terminal: %s`, ftc.String())
- }
- case state_waiting_for_file_metadata:
- switch ftc.Action {
- case Action_status:
- if ftc.File_id != "" {
- fid, err := strconv.Atoi(ftc.File_id)
- if err != nil {
- return fmt.Errorf(`Unexpected response from terminal (non-integer file_id): %s`, ftc.String())
- }
- if fid < 0 || fid >= len(self.spec) {
- return fmt.Errorf(`Unexpected response from terminal (out-of-range file_id): %s`, ftc.String())
- }
- self.failed_specs[fid] = ftc.Status
- } else {
- if ftc.Status == `OK` {
- self.state = state_transferring
- self.remote_home = ftc.Name
- return
- }
- return fmt.Errorf("%s", ftc.Status)
- }
- case Action_file:
- fid, err := strconv.Atoi(ftc.File_id)
- if err != nil {
- return fmt.Errorf(`Unexpected response from terminal (non-integer file_id): %s`, ftc.String())
- }
- if fid < 0 || fid >= len(self.spec) {
- return fmt.Errorf(`Unexpected response from terminal (out-of-range file_id): %s`, ftc.String())
- }
- self.spec_counts[fid] += 1
- self.file_id_counter++
- if rf, err := new_remote_file(self.cli_opts, ftc, self.file_id_counter); err == nil {
- self.files = append(self.files, rf)
- } else {
- return err
- }
- default:
- return fmt.Errorf(`Unexpected response from terminal (invalid action): %s`, ftc.String())
- }
- case state_transferring:
- if ftc.Action == Action_data || ftc.Action == Action_end_data {
- f, found := self.files_to_be_transferred[ftc.File_id]
- if !found {
- return fmt.Errorf(`Got data for unknown file id: %s`, ftc.File_id)
- }
- is_last := ftc.Action == Action_end_data
- if amt_written, err := f.write_data(ftc.Data, is_last); err != nil {
- return err
- } else {
- self.progress_tracker.file_written(f, amt_written, is_last)
- }
- if is_last {
- delete(self.files_to_be_transferred, ftc.File_id)
- if len(self.files_to_be_transferred) == 0 {
- return self.finalize_transfer()
- }
- }
- }
- }
- return
- }
- type tree_node struct {
- entry *remote_file
- added_files map[string]*tree_node
- }
- func (self *tree_node) add_child(f *remote_file) *tree_node {
- if x, found := self.added_files[f.remote_id]; found {
- return x
- }
- c := tree_node{entry: f, added_files: make(map[string]*tree_node)}
- f.expanded_local_path = filepath.Join(self.entry.expanded_local_path, filepath.Base(f.remote_path))
- self.added_files[f.remote_id] = &c
- return &c
- }
- func walk_tree(root *tree_node, cb func(*tree_node) error) error {
- for _, c := range root.added_files {
- if err := cb(c); err != nil {
- return err
- }
- if err := walk_tree(c, cb); err != nil {
- return err
- }
- }
- return nil
- }
- func ensure_parent(f *remote_file, node_map map[string]*tree_node, fid_map map[string]*remote_file) *tree_node {
- if ans := node_map[f.parent]; ans != nil {
- return ans
- }
- parent := fid_map[f.parent]
- gp := ensure_parent(parent, node_map, fid_map)
- node := gp.add_child(parent)
- node_map[parent.remote_id] = node
- return node
- }
- func make_tree(all_files []*remote_file, local_base string) (root_node *tree_node) {
- fid_map := make(map[string]*remote_file, len(all_files))
- node_map := make(map[string]*tree_node, len(all_files))
- for _, f := range all_files {
- if f.remote_id != "" {
- fid_map[f.remote_id] = f
- }
- }
- root_node = &tree_node{entry: &remote_file{expanded_local_path: local_base}, added_files: make(map[string]*tree_node)}
- node_map[""] = root_node
- for _, f := range all_files {
- if f.remote_id != "" {
- p := ensure_parent(f, node_map, fid_map)
- p.add_child(f)
- }
- }
- return
- }
- func isdir(path string) bool {
- if s, err := os.Stat(path); err == nil {
- return s.IsDir()
- }
- return false
- }
- func files_for_receive(opts *Options, dest string, files []*remote_file, remote_home string, specs []string) (ans []*remote_file, err error) {
- spec_map := make(map[int][]*remote_file)
- for _, f := range files {
- spec_map[f.spec_id] = append(spec_map[f.spec_id], f)
- }
- spec_paths := make([]string, len(specs))
- for i := range specs {
- // use the shortest path as the path for the spec
- slices.SortStableFunc(spec_map[i], func(a, b *remote_file) int { return len(a.remote_path) - len(b.remote_path) })
- spec_paths[i] = spec_map[i][0].remote_path
- }
- if opts.Mode == "mirror" {
- common_path := utils.Commonpath(spec_paths...)
- home := strings.TrimRight(remote_home, "/")
- if strings.HasPrefix(common_path, home+"/") {
- for i, x := range spec_paths {
- b, err := filepath.Rel(home, x)
- if err != nil {
- return nil, err
- }
- spec_paths[i] = filepath.Join("~", b)
- }
- }
- for spec_id, files_for_spec := range spec_map {
- spec := spec_paths[spec_id]
- tree := make_tree(files_for_spec, filepath.Dir(expand_home(spec)))
- if err = walk_tree(tree, func(x *tree_node) error {
- ans = append(ans, x.entry)
- return nil
- }); err != nil {
- return nil, err
- }
- }
- } else {
- number_of_source_files := 0
- for _, x := range spec_map {
- number_of_source_files += len(x)
- }
- dest_is_dir := strings.HasSuffix(dest, "/") || number_of_source_files > 1 || isdir(dest)
- for _, files_for_spec := range spec_map {
- if dest_is_dir {
- dest_path := filepath.Join(dest, filepath.Base(files_for_spec[0].remote_path))
- tree := make_tree(files_for_spec, filepath.Dir(expand_home(dest_path)))
- if err = walk_tree(tree, func(x *tree_node) error {
- ans = append(ans, x.entry)
- return nil
- }); err != nil {
- return nil, err
- }
- } else {
- f := files_for_spec[0]
- f.expanded_local_path = expand_home(dest)
- ans = append(ans, f)
- }
- }
- }
- return
- }
- func (self *manager) collect_files() (err error) {
- if self.files, err = files_for_receive(self.cli_opts, self.dest, self.files, self.remote_home, self.spec); err != nil {
- return err
- }
- self.progress_tracker.total_size_of_all_files = 0
- for _, f := range self.files {
- if f.ftype != FileType_directory && f.ftype != FileType_link {
- self.files_to_be_transferred[f.file_id] = f
- self.progress_tracker.total_size_of_all_files += utils.Max(0, f.expected_size)
- }
- }
- self.progress_tracker.total_bytes_to_transfer = self.progress_tracker.total_size_of_all_files
- return nil
- }
- func (self *handler) print_continue_msg() {
- self.lp.Println(`Press`, self.ctx.Green(`y`), `to continue or`, self.ctx.BrightRed(`n`), `to abort`)
- }
- func lexists(path string) bool {
- _, err := os.Lstat(path)
- return err == nil
- }
- func (self *handler) print_check_paths() {
- if self.check_paths_printed {
- return
- }
- self.check_paths_printed = true
- self.lp.Println(`The following file transfers will be performed. A red destination means an existing file will be overwritten.`)
- for _, df := range self.manager.files {
- self.lp.QueueWriteString(self.ctx.Prettify(fmt.Sprintf(":%s:`%s` ", df.ftype.Color(), df.ftype.ShortText())))
- self.lp.QueueWriteString(" ")
- lpath := df.expanded_local_path
- if lexists(lpath) {
- lpath = self.ctx.Prettify(self.ctx.BrightRed(lpath) + " ")
- }
- self.lp.Println(df.display_name, "→", lpath)
- }
- self.lp.Println(fmt.Sprintf(`Transferring %d file(s) of total size: %s`, len(self.manager.files), humanize.Size(self.manager.progress_tracker.total_size_of_all_files)))
- self.print_continue_msg()
- }
- func (self *handler) confirm_paths() {
- self.print_check_paths()
- }
- func (self *handler) transmit_one() {
- if self.transmit_iterator == nil {
- return
- }
- wid, err := self.transmit_iterator(self.lp.QueueWriteString)
- if err != nil {
- if err == files_done {
- self.transmit_iterator = nil
- } else {
- self.abort_with_error(err)
- return
- }
- } else {
- self.last_data_write_id = wid
- }
- }
- func (self *handler) start_transfer() {
- self.transmit_started = true
- n := len(self.manager.files)
- msg := `Transmitting signature of`
- if self.manager.use_rsync {
- msg = `Queueing transfer of`
- }
- msg += ` `
- if n == 1 {
- msg += `one file`
- } else {
- msg += fmt.Sprintf(`%d files`, n)
- }
- self.lp.Println(msg)
- self.max_name_length = 0
- for _, f := range self.manager.files {
- self.max_name_length = utils.Max(6, self.max_name_length, wcswidth.Stringwidth(f.display_name))
- }
- self.transmit_iterator = self.manager.request_files()
- self.transmit_one()
- }
- func (self *handler) on_file_transfer_response(ftc *FileTransmissionCommand) (err error) {
- if ftc.Id != self.manager.request_id {
- return
- }
- if ftc.Action == Action_status && ftc.Status == "CANCELED" {
- self.lp.Quit(1)
- return
- }
- if self.quit_after_write_code > -1 || self.manager.state == state_canceled {
- return
- }
- transfer_started := self.manager.state == state_transferring
- if merr := self.manager.on_file_transfer_response(ftc); merr != nil {
- if merr == unicode_input.ErrCanceledByUser {
- // terminal will not respond to cancel request
- return fmt.Errorf("Permission denied by user")
- }
- self.abort_with_error(merr)
- return
- }
- if !transfer_started && self.manager.state == state_transferring {
- if len(self.manager.failed_specs) > 0 {
- self.print_err(fmt.Errorf(`Failed to process some sources:`))
- for spec_id, msg := range self.manager.failed_specs {
- spec := self.manager.spec[spec_id]
- if strings.HasPrefix(msg, `ENOENT:`) {
- msg = `File not found`
- }
- self.lp.Println(fmt.Sprintf(` %s: %s`, spec, msg))
- }
- self.abort_with_error(nil)
- return
- }
- zero_specs := make([]string, 0, len(self.manager.spec_counts))
- for k, v := range self.manager.spec_counts {
- if v == 0 {
- zero_specs = append(zero_specs, self.manager.spec[k])
- }
- }
- if len(zero_specs) > 0 {
- self.abort_with_error(fmt.Errorf(`No matches found for: %s`, strings.Join(zero_specs, ", ")))
- return
- }
- if merr := self.manager.collect_files(); merr != nil {
- self.abort_with_error(merr)
- return
- }
- if self.cli_opts.ConfirmPaths {
- self.confirm_paths()
- } else {
- self.start_transfer()
- }
- }
- if self.manager.transfer_done {
- self.manager.send(FileTransmissionCommand{Action: Action_finish}, self.lp.QueueWriteString)
- self.quit_after_write_code = 0
- if err = self.refresh_progress(0); err != nil {
- return err
- }
- } else if self.transmit_started {
- if err = self.refresh_progress(0); err != nil {
- return err
- }
- }
- return
- }
- func (self *handler) on_writing_finished(msg_id loop.IdType, has_pending_writes bool) (err error) {
- if self.quit_after_write_code > -1 {
- self.lp.Quit(self.quit_after_write_code)
- } else if msg_id == self.last_data_write_id {
- self.transmit_one()
- }
- return nil
- }
- func (self *handler) on_interrupt() (handled bool, err error) {
- handled = true
- if self.quit_after_write_code > -1 {
- return
- }
- if self.manager.state == state_canceled {
- self.lp.Println(`Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received`)
- return
- }
- self.abort_with_error(fmt.Errorf(`Interrupt requested, cancelling transfer, transferred files are in undefined state.`))
- return
- }
- func (self *handler) on_sigterm() (handled bool, err error) {
- handled = true
- if self.quit_after_write_code > -1 {
- return
- }
- self.abort_with_error(fmt.Errorf(`Terminate requested, cancelling transfer, transferred files are in undefined state.`), 2*time.Second)
- return
- }
- func (self *handler) erase_progress() {
- if self.progress_drawn {
- self.lp.MoveCursorVertically(-2)
- self.lp.QueueWriteString("\r")
- self.lp.ClearToEndOfScreen()
- self.progress_drawn = false
- }
- }
- func (self *handler) render_progress(name string, p Progress) {
- if p.is_complete {
- p.bytes_so_far = p.total_bytes
- }
- ss, _ := self.lp.ScreenSize()
- self.lp.QueueWriteString(render_progress_in_width(name, p, int(ss.WidthCells), self.ctx))
- }
- func (self *handler) draw_progress_for_current_file(af *remote_file, spinner_char string, is_complete bool) {
- p := &self.manager.progress_tracker
- now := time.Now()
- secs := utils.IfElse(af.done_at.IsZero(), now, af.done_at)
- self.render_progress(af.display_name, Progress{
- spinner_char: spinner_char, is_complete: is_complete,
- bytes_so_far: af.written_bytes, total_bytes: af.expected_size,
- secs_so_far: secs.Sub(af.transmit_started_at).Seconds(),
- bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval),
- })
- }
- func (self *handler) draw_files() {
- tick := self.ctx.Green(`✔`)
- var sc string
- for _, df := range self.manager.progress_tracker.done_files {
- sc = tick
- if df.ftype == FileType_regular {
- self.draw_progress_for_current_file(df, sc, true)
- } else {
- self.lp.QueueWriteString(fmt.Sprintf("%s %s %s", sc, df.display_name, self.ctx.Italic(self.ctx.Dim(df.ftype.String()))))
- }
- self.lp.Println()
- self.manager.progress_tracker.done_files = nil
- }
- is_complete := self.quit_after_write_code > -1
- if is_complete {
- sc = utils.IfElse(self.quit_after_write_code == 0, tick, self.ctx.Red(`✘`))
- } else {
- sc = self.spinner.Tick()
- }
- p := &self.manager.progress_tracker
- ss, _ := self.lp.ScreenSize()
- if is_complete {
- tui.RepeatChar(`─`, int(ss.WidthCells))
- } else {
- af := p.active_file
- if af != nil {
- self.draw_progress_for_current_file(af, sc, false)
- }
- }
- self.lp.Println()
- if p.total_transferred > 0 {
- self.render_progress(`Total`, Progress{
- spinner_char: sc, bytes_so_far: p.total_transferred, total_bytes: p.total_bytes_to_transfer,
- secs_so_far: time.Since(p.started_at).Seconds(), is_complete: is_complete,
- bytes_per_sec: safe_divide(p.transfered_stats_amt, p.transfered_stats_interval.Abs().Seconds()),
- })
- self.lp.Println()
- } else {
- self.lp.Println(`File data transfer has not yet started`)
- }
- }
- func (self *handler) schedule_progress_update(delay time.Duration) {
- if self.progress_update_timer != 0 {
- self.lp.RemoveTimer(self.progress_update_timer)
- self.progress_update_timer = 0
- }
- timer_id, err := self.lp.AddTimer(delay, false, self.refresh_progress)
- if err == nil {
- self.progress_update_timer = timer_id
- }
- }
- func (self *handler) draw_progress() {
- if self.manager.state == state_canceled {
- return
- }
- self.lp.AllowLineWrapping(false)
- defer self.lp.AllowLineWrapping(true)
- self.draw_files()
- self.schedule_progress_update(self.spinner.Interval())
- self.progress_drawn = true
- }
- func (self *handler) refresh_progress(loop.IdType) error {
- self.lp.StartAtomicUpdate()
- defer self.lp.EndAtomicUpdate()
- self.erase_progress()
- self.draw_progress()
- return nil
- }
- func (self *handler) on_text(text string, from_key_event, in_bracketed_paste bool) error {
- if self.quit_after_write_code > -1 {
- return nil
- }
- if self.check_paths_printed && !self.transmit_started {
- switch strings.ToLower(text) {
- case "y":
- self.start_transfer()
- return nil
- case "n":
- self.abort_with_error(fmt.Errorf(`Canceled by user`))
- return nil
- }
- self.print_continue_msg()
- }
- return nil
- }
- func (self *handler) on_key_event(ev *loop.KeyEvent) error {
- if self.quit_after_write_code > -1 {
- return nil
- }
- if ev.MatchesPressOrRepeat("esc") {
- ev.Handled = true
- if self.check_paths_printed && !self.transmit_started {
- self.abort_with_error(fmt.Errorf(`Canceled by user`))
- } else {
- if _, err := self.on_interrupt(); err != nil {
- return err
- }
- }
- } else if ev.MatchesPressOrRepeat("ctrl+c") {
- ev.Handled = true
- if _, err := self.on_interrupt(); err != nil {
- return err
- }
- }
- return nil
- }
- func receive_loop(opts *Options, spec []string, dest string) (err error, rc int) {
- lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors)
- if err != nil {
- return err, 1
- }
- handler := handler{
- lp: lp, quit_after_write_code: -1, cli_opts: opts, spinner: tui.NewSpinner("dots"),
- ctx: markup.New(true),
- manager: manager{
- request_id: random_id(), spec: spec, dest: dest, bypass: opts.PermissionsBypass, use_rsync: opts.TransmitDeltas,
- failed_specs: make(map[int]string, len(spec)), spec_counts: make(map[int]int, len(spec)),
- suffix: "\x1b\\", cli_opts: opts, files_to_be_transferred: make(map[string]*remote_file),
- },
- }
- for i := range spec {
- handler.manager.spec_counts[i] = 0
- }
- handler.manager.prefix = fmt.Sprintf("\x1b]%d;id=%s;", kitty.FileTransferCode, handler.manager.request_id)
- if handler.manager.bypass != `` {
- if handler.manager.bypass, err = encode_bypass(handler.manager.request_id, handler.manager.bypass); err != nil {
- return err, 1
- }
- }
- lp.OnInitialize = func() (string, error) {
- lp.SetCursorVisible(false)
- lp.Println("Scanning files…")
- handler.manager.start_transfer(lp.QueueWriteString)
- return "", nil
- }
- lp.OnFinalize = func() string {
- lp.SetCursorVisible(true)
- return ""
- }
- lp.OnSIGINT = handler.on_interrupt
- lp.OnSIGTERM = handler.on_sigterm
- lp.OnWriteComplete = handler.on_writing_finished
- lp.OnText = handler.on_text
- lp.OnKeyEvent = handler.on_key_event
- lp.OnResize = func(old_sz, new_sz loop.ScreenSize) error {
- if handler.progress_drawn {
- return handler.refresh_progress(0)
- }
- return nil
- }
- ftc_code := strconv.Itoa(kitty.FileTransferCode)
- lp.OnEscapeCode = func(et loop.EscapeCodeType, payload []byte) error {
- if et == loop.OSC {
- if idx := bytes.IndexByte(payload, ';'); idx > 0 {
- if utils.UnsafeBytesToString(payload[:idx]) == ftc_code {
- ftc, err := NewFileTransmissionCommand(utils.UnsafeBytesToString(payload[idx+1:]))
- if err != nil {
- return fmt.Errorf("Received invalid FileTransmissionCommand from terminal with error: %w", err)
- }
- return handler.on_file_transfer_response(ftc)
- }
- }
- }
- return nil
- }
- err = lp.Run()
- defer func() {
- for _, f := range handler.manager.files {
- f.close()
- }
- }()
- if err != nil {
- return err, 1
- }
- if lp.DeathSignalName() != "" {
- lp.KillIfSignalled()
- return
- }
- if lp.ExitCode() != 0 {
- rc = lp.ExitCode()
- }
- var tsf, dsz, ssz int64
- for _, f := range handler.manager.files {
- if rc == 0 { // no error has yet occurred report errors closing files
- if cerr := f.close(); cerr != nil {
- return cerr, 1
- }
- }
- if f.expect_diff {
- tsf += f.expected_size
- dsz += f.received_bytes
- ssz += f.sent_bytes
- }
- }
- if tsf > 0 && dsz+ssz > 0 && rc == 0 {
- print_rsync_stats(tsf, dsz, ssz)
- }
- return
- }
- func receive_main(opts *Options, args []string) (err error, rc int) {
- spec := args
- var dest string
- switch opts.Mode {
- case "mirror":
- if len(args) < 1 {
- return fmt.Errorf("Must specify at least one file to transfer"), 1
- }
- case "normal":
- if len(args) < 2 {
- return fmt.Errorf("Must specify at least one source and a destination file to transfer"), 1
- }
- dest = args[len(args)-1]
- spec = args[:len(args)-1]
- }
- return receive_loop(opts, spec, dest)
- }
|