directory_upload_manager.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package awsuploader
  2. import (
  3. "os"
  4. "path/filepath"
  5. "time"
  6. "github.com/cloudflare/cloudflared/logger"
  7. )
  8. // DirectoryUploadManager is used to manage file uploads on an interval from a directory
  9. type DirectoryUploadManager struct {
  10. logger logger.Service
  11. uploader Uploader
  12. rootDirectory string
  13. sweepInterval time.Duration
  14. ticker *time.Ticker
  15. shutdownC chan struct{}
  16. workQueue chan string
  17. }
  18. // NewDirectoryUploadManager create a new DirectoryUploadManager
  19. // uploader is an Uploader to use as an actual uploading engine
  20. // directory is the directory to sweep for files to upload
  21. // sweepInterval is how often to iterate the directory and upload the files within
  22. func NewDirectoryUploadManager(logger logger.Service, uploader Uploader, directory string, sweepInterval time.Duration, shutdownC chan struct{}) *DirectoryUploadManager {
  23. workerCount := 10
  24. manager := &DirectoryUploadManager{
  25. logger: logger,
  26. uploader: uploader,
  27. rootDirectory: directory,
  28. sweepInterval: sweepInterval,
  29. shutdownC: shutdownC,
  30. workQueue: make(chan string, workerCount),
  31. }
  32. //start workers
  33. for i := 0; i < workerCount; i++ {
  34. go manager.worker()
  35. }
  36. return manager
  37. }
  38. // Upload a file using the uploader
  39. // This is useful for "out of band" uploads that need to be triggered immediately instead of waiting for the sweep
  40. func (m *DirectoryUploadManager) Upload(filepath string) error {
  41. return m.uploader.Upload(filepath)
  42. }
  43. // Start the upload ticker to walk the directories
  44. func (m *DirectoryUploadManager) Start() {
  45. m.ticker = time.NewTicker(m.sweepInterval)
  46. go m.run()
  47. }
  48. func (m *DirectoryUploadManager) run() {
  49. for {
  50. select {
  51. case <-m.shutdownC:
  52. m.ticker.Stop()
  53. return
  54. case <-m.ticker.C:
  55. m.sweep()
  56. }
  57. }
  58. }
  59. // sweep the directory and kick off uploads
  60. func (m *DirectoryUploadManager) sweep() {
  61. filepath.Walk(m.rootDirectory, func(path string, info os.FileInfo, err error) error {
  62. if err != nil || info.IsDir() {
  63. return nil
  64. }
  65. //30 days ago
  66. retentionTime := 30 * (time.Hour * 24)
  67. checkTime := time.Now().Add(-time.Duration(retentionTime))
  68. //delete the file it is stale
  69. if info.ModTime().Before(checkTime) {
  70. os.Remove(path)
  71. return nil
  72. }
  73. //add the upload to the work queue
  74. go func() {
  75. m.workQueue <- path
  76. }()
  77. return nil
  78. })
  79. }
  80. // worker handles upload requests
  81. func (m *DirectoryUploadManager) worker() {
  82. for {
  83. select {
  84. case <-m.shutdownC:
  85. return
  86. case filepath := <-m.workQueue:
  87. if err := m.Upload(filepath); err != nil {
  88. m.logger.Errorf("Cannot upload file to s3 bucket: %s", err)
  89. } else {
  90. os.Remove(filepath)
  91. }
  92. }
  93. }
  94. }