123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package awsuploader
- import (
- "os"
- "path/filepath"
- "time"
- "github.com/cloudflare/cloudflared/logger"
- )
- // DirectoryUploadManager is used to manage file uploads on an interval from a directory
- type DirectoryUploadManager struct {
- logger logger.Service
- uploader Uploader
- rootDirectory string
- sweepInterval time.Duration
- ticker *time.Ticker
- shutdownC chan struct{}
- workQueue chan string
- }
- // NewDirectoryUploadManager create a new DirectoryUploadManager
- // uploader is an Uploader to use as an actual uploading engine
- // directory is the directory to sweep for files to upload
- // sweepInterval is how often to iterate the directory and upload the files within
- func NewDirectoryUploadManager(logger logger.Service, uploader Uploader, directory string, sweepInterval time.Duration, shutdownC chan struct{}) *DirectoryUploadManager {
- workerCount := 10
- manager := &DirectoryUploadManager{
- logger: logger,
- uploader: uploader,
- rootDirectory: directory,
- sweepInterval: sweepInterval,
- shutdownC: shutdownC,
- workQueue: make(chan string, workerCount),
- }
- //start workers
- for i := 0; i < workerCount; i++ {
- go manager.worker()
- }
- return manager
- }
- // Upload a file using the uploader
- // This is useful for "out of band" uploads that need to be triggered immediately instead of waiting for the sweep
- func (m *DirectoryUploadManager) Upload(filepath string) error {
- return m.uploader.Upload(filepath)
- }
- // Start the upload ticker to walk the directories
- func (m *DirectoryUploadManager) Start() {
- m.ticker = time.NewTicker(m.sweepInterval)
- go m.run()
- }
- func (m *DirectoryUploadManager) run() {
- for {
- select {
- case <-m.shutdownC:
- m.ticker.Stop()
- return
- case <-m.ticker.C:
- m.sweep()
- }
- }
- }
- // sweep the directory and kick off uploads
- func (m *DirectoryUploadManager) sweep() {
- filepath.Walk(m.rootDirectory, func(path string, info os.FileInfo, err error) error {
- if err != nil || info.IsDir() {
- return nil
- }
- //30 days ago
- retentionTime := 30 * (time.Hour * 24)
- checkTime := time.Now().Add(-time.Duration(retentionTime))
- //delete the file it is stale
- if info.ModTime().Before(checkTime) {
- os.Remove(path)
- return nil
- }
- //add the upload to the work queue
- go func() {
- m.workQueue <- path
- }()
- return nil
- })
- }
- // worker handles upload requests
- func (m *DirectoryUploadManager) worker() {
- for {
- select {
- case <-m.shutdownC:
- return
- case filepath := <-m.workQueue:
- if err := m.Upload(filepath); err != nil {
- m.logger.Errorf("Cannot upload file to s3 bucket: %s", err)
- } else {
- os.Remove(filepath)
- }
- }
- }
- }
|