123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- import collections
- import errno
- import os
- import time
- import tempfile
- import threading
- import shutil
- import logging
- from lvc import execute
- from lvc.utils import line_reader
- from lvc.video import get_thumbnail_synchronous
- from lvc.widgets import get_conversion_directory
- logger = logging.getLogger(__name__)
- class Conversion(object):
- def __init__(self, video, converter, manager, output_dir=None):
- self.video = video
- self.manager = manager
- if output_dir is None:
- output_dir = get_conversion_directory()
- self.output_dir = output_dir
- self.lines = []
- self.thread = None
- self.popen = None
- self.status = 'initialized'
- self.temp_output = None
- self.error = None
- self.started_at = None
- self.duration = None
- self.progress = None
- self.progress_percent = None
- self.create_thumbnail = False
- self.eta = None
- self.listeners = set()
- self.set_converter(converter)
- logger.info('created %r', self)
- def set_converter(self, converter):
- if self.status != 'initialized':
- raise RuntimeError("can't change converter after starting")
- self.converter = converter
- self.output = os.path.join(self.output_dir,
- converter.get_output_filename(self.video))
- def __repr__(self):
- return unicode(self)
- def __str__(self):
- return unicode(self).encode('utf8')
- def __unicode__(self):
- return u'<Conversion (%s) %r -> %r>' % (
- self.converter.name, self.video.filename, self.output)
- def listen(self, f):
- self.listeners.add(f)
- def unlisten(self, f):
- self.listeners.remove(f)
- def notify_listeners(self):
- self.manager.notify_queue.add(self)
- def run(self):
- logger.info('starting %r', self)
- try:
- self.temp_output = tempfile.mktemp(
- dir=os.path.dirname(self.output))
- except EnvironmentError as e:
- logger.exception('while creating temp file for %r',
- self.output)
- self.error = str(e)
- self.finalize()
- return
- logger.info('commandline: %r', ' '.join(
- self.get_subprocess_arguments(self.temp_output)))
- self.thread = threading.Thread(target=self._thread,
- name="Thread:%s" % (self,))
- self.thread.setDaemon(True)
- self.thread.start()
- def stop(self):
- logger.info('stopping %r', self)
- self.error = 'manually stopped'
- if self.popen is None:
- status = 'canceled'
- try:
- self.manager.remove(self)
- except ValueError:
- status = 'failed'
- logger.exception('not running and not waiting %s' % (self,))
- self.status = status
- return
- else:
- try:
- self.popen.kill()
- self.popen.wait()
- # set the status transition last, if we had hit an exception
- # then we will transition the next state to 'failed' in
- # finalize()
- self.status = 'canceled'
- except EnvironmentError as e:
- logger.exception('while stopping %s' % (self,))
- self.error = str(e)
- self.popen = None
- self.manager.conversion_finished(self)
- def _thread(self):
- try:
- commandline = self.get_subprocess_arguments(self.temp_output)
- self.popen = execute.Popen(commandline, bufsize=1)
- self.process_output()
- if self.popen:
- # if we stop the thread, we can get here after `.stop()`
- # finishes.
- self.popen.wait()
- except OSError as e:
- if e.errno == errno.ENOENT:
- self.error = '%r does not exist' % (
- self.converter.get_executable(),)
- else:
- logger.exception('OSError in %s' % (self.thread.name,))
- self.error = str(e)
- except Exception as e:
- logger.exception('in %s' % (self.thread.name,))
- self.error = str(e)
- if self.create_thumbnail:
- self.write_thumbnail_file()
- self.finalize()
- def write_thumbnail_file(self):
- try:
- self._write_thumbnail_file()
- except StandardError:
- logging.warn("Error writing thumbnail", exc_info=True)
- def _write_thumbnail_file(self):
- if self.video.audio_only:
- logging.warning("write_thumbnail_file: audio_only=True "
- "not writing thumbnail %s", self.video.filename)
- return
- output_basename = os.path.splitext(os.path.basename(self.output))[0]
- logging.info("td: %s ob: %s", self._get_thumbnail_dir(),
- output_basename)
- thumbnail_path = os.path.join(self._get_thumbnail_dir(),
- output_basename + '.png')
- logging.info("creating thumbnail: %s", thumbnail_path)
- width, height = self.converter.get_target_size(self.video)
- get_thumbnail_synchronous(self.video.filename, width, height,
- thumbnail_path)
- if os.path.exists(thumbnail_path):
- logging.info("thumbnail successful: %s", thumbnail_path)
- else:
- logging.warning("get_thumbnail_synchronous() succeeded, but the "
- "thumbnail file is missing!")
- def _get_thumbnail_dir(self):
- """Get the directory to store thumbnails in it.
- This method will create the directory if it doesn't exist
- """
- thumbnail_dir = os.path.join(self.output_dir, 'thumbnails')
- if not os.path.exists(thumbnail_dir):
- os.mkdir(thumbnail_dir)
- return thumbnail_dir
- def calc_progress_percent(self):
- if not self.duration:
- return 0.0
- if self.create_thumbnail:
- # assume that thumbnail creation takes as long as 2 seconds of
- # video processing
- effective_duration = self.duration + 2.0
- else:
- effective_duration = self.duration
- return self.progress / effective_duration
- def process_output(self):
- self.started_at = time.time()
- self.status = 'converting'
- # We use line_reader, rather than just iterating over the file object,
- # because iterating over the file object gives us all the lines when
- # the process ends, and we're looking for real-time updates.
- for line in line_reader(self.popen.stdout):
- self.lines.append(line) # for debugging, if needed
- try:
- status = self.converter.process_status_line(self.video, line)
- except StandardError:
- logging.warn("error in process_status_line()", exc_info=True)
- continue
- if status is None:
- continue
- updated = set()
- if 'finished' in status:
- self.error = status.get('error', None)
- break
- if 'duration' in status:
- updated.update(('duration', 'progress'))
- self.duration = float(status['duration'])
- if self.progress is None:
- self.progress = 0.0
- if 'progress' in status:
- updated.add('progress')
- self.progress = min(float(status['progress']),
- self.duration)
- if 'eta' in status:
- updated.add('eta')
- self.eta = float(status['eta'])
- if updated:
- self.progress_percent = self.calc_progress_percent()
- if 'eta' not in updated:
- if self.duration and 0 < self.progress_percent < 1.0:
- progress = self.progress_percent * 100
- elapsed = time.time() - self.started_at
- time_per_percent = elapsed / progress
- self.eta = float(
- time_per_percent * (100 - progress))
- else:
- self.eta = 0.0
- self.notify_listeners()
- def finalize(self):
- self.progress = self.duration
- self.progress_percent = 1.0
- self.eta = 0
- if self.error is None:
- self.status = 'staging'
- self.notify_listeners()
- try:
- self.converter.finalize(self.temp_output, self.output)
- except EnvironmentError as e:
- logger.exception('while trying to move %r to %r after %s',
- self.temp_output, self.output, self)
- self.error = str(e)
- self.status = 'failed'
- else:
- self.status = 'finished'
- else:
- if self.temp_output is not None:
- try:
- os.unlink(self.temp_output)
- except EnvironmentError:
- pass
- '''
- ignore errors removing temp files;
- they may not have been created
- '''
- if self.status != 'canceled':
- self.status = 'failed'
- if self.status != 'canceled':
- self.notify_listeners()
- logger.info('finished %r; status: %s', self, self.status)
- def get_subprocess_arguments(self, output):
- return ([self.converter.get_executable()] +
- list(self.converter.get_arguments(self.video, output)))
- class ConversionManager(object):
- def __init__(self, simultaneous=None):
- self.notify_queue = set()
- self.in_progress = set()
- self.waiting = collections.deque()
- self.simultaneous = simultaneous
- self.running = False
- self.create_thumbnails = False
- def get_conversion(self, video, converter, **kwargs):
- return Conversion(video, converter, self, **kwargs)
- def remove(self, conversion):
- self.waiting.remove(conversion)
- def start_conversion(self, video, converter):
- return self.run_conversion(self.get_conversion(video, converter))
- def run_conversion(self, conversion):
- if self.simultaneous is not None and len(self.in_progress) \
- >= self.simultaneous:
- self.waiting.append(conversion)
- else:
- self._start_conversion(conversion)
- self.running = True
- return conversion
- def _start_conversion(self, conversion):
- self.in_progress.add(conversion)
- conversion.create_thumbnail = self.create_thumbnails
- conversion.run()
- def check_notifications(self):
- if not self.running:
- # don't bother checking if we're not running
- return
- self.notify_queue, changed = set(), self.notify_queue
- for conversion in changed:
- if conversion.status in ('canceled', 'finished', 'failed'):
- self.conversion_finished(conversion)
- for listener in conversion.listeners:
- listener(conversion)
- def conversion_finished(self, conversion):
- self.in_progress.discard(conversion)
- while (self.waiting and self.simultaneous is not None and
- len(self.in_progress) < self.simultaneous):
- c = self.waiting.popleft()
- self._start_conversion(c)
- if not self.in_progress:
- self.running = False
|