123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- import errno
- from subprocess import Popen, PIPE, STDOUT
- import re
- class Error(Exception):
- """Something did go wrong"""
- class UnknownOption(Error):
- """Unknown option to unar given."""
- class ArchiveNotFound(Error):
- """Archive not found."""
- class StreamError(Error):
- """When extraction failed (cannot write, duplicate, ..?)"""
- class PasswordError(Error):
- """Password required."""
- class Unar:
- # grep -rnw -e "XADException describeXADError" ./unarchiver
- EXCEPTION_PATTERNS = {
- "Unknown option" : UnknownOption,
- "Couldn't open archive." : ArchiveNotFound,
- "Couldn't recognize the archive format" : StreamError,
- "Extraction to current directory failed!" : StreamError,
- "Archive parsing failed" : StreamError,
- "Extraction to directory \"\" failed" : StreamError, # TODO
- "This archive requires a password to unpack" : PasswordError
- }
- def __init__(self):
- self._path = '/usr/bin'
- self._command = 'unar'
- self._args = [] # unar command arguments
- self._files = [] # list of files to extract (alternative is all or use indexes with -i)
- self._file = ''
- # Options
- self._output = ""
- self._forceOverwrite = False
- self._forceRename = False
- self._forceSkip = False
- self._forceDirectory = False
- self._noDirectory = False
- self._password = ""
- self._encoding = None # TODO
- self._passwordEncoding = None # TODO
- self._indexes = []
- self._noRecursion = False
- self._copyTime = False
- self._forks = False
- self._quiet = False
- """
- TODO:
- List all supported encodings: unar -e list
- grep -rnw 'TODO' unar.py
- """
- def run(self, cb=None):
- p = Popen(self._compileCmd(), stdout=PIPE, stderr=PIPE)
- log = ""
- while p.poll() is None:
- o = p.communicate()
- m = o[0].decode('UTF-8')
- log += m
- print("m: {}".format(m))
- print(o[1])
- exitCode = p.returncode
- print("Return code: {0}".format(exitCode))
- if exitCode != 0:
- for pattern, exception in self.EXCEPTION_PATTERNS.items():
- for line in log.split('\n'):
- if pattern in line:
- raise exception(line)
- if cb: cb(exitCode)
- def _compileCmd(self):
- # full command path + command
- args = [self.fullCommandPath]
- # options
- if self.output:
- args.append('-o')
- args.append(self.output)
- if self.forceOverwrite: args.append("-f")
- if self.forceRename: args.append("-r")
- if self.forceSkip: args.append("-s")
- if self.forceDirectory: args.append("-d")
- if self.noDirectory: args.append("-D")
- if self.password:
- args.append('-p')
- args.append(self.password)
- if self.encoding:
- args.append('-e')
- args.append(self.encoding)
- if self.passwordEncoding:
- args.append('-E')
- args.append(self.passwordEncoding)
- if self.indexes:
- args.append('-i')
- args.append("".join(i for i in self.indexes))#TODO does this work?
- if self.noRecursion: args.append("-nr")
- if self.copyTime: args.append("-t")
- if self.forks:
- args.append('-k')
- args.append("")# TODO <visible|hidden|skip>
- # archive path
- if self.file: args.append(self.file)
- # optional files to extract TODO
- #if self._files: args.append(",".join())
- return args
- ########################
- @property
- def path(self): return self._path
- @path.setter
- def path(self, path): self._path = path
- @property
- def command(self): return self._command
- @command.setter
- def command(self, cmd): self._command = cmd
- @property
- def fullCommandPath(self):
- return "{0}/{1}".format(self._path, self._command)
- @fullCommandPath.setter
- def fullCommandPath(self, tupleValue):
- self._path = tupleValue[0]
- self._command = tupleValue[1]
- @property
- def file(self): return self._file
- @file.setter
- def file(self, path): self._file = path
- """ Options
- """
- # -output-directory (-o) <string>
- @property
- def output(self): return self._output
- @output.setter
- def output(self, path): self._output = path
- # -force-overwrite (-f)
- @property
- def forceOverwrite(self): return self._forceOverwrite
- @forceOverwrite.setter
- def forceOverwrite(self, state): self._forceOverwrite = state
- # -force-rename (-r)
- @property
- def forceRename(self): return self._forceRename
- @forceRename.setter
- def forceRename(self, state): self._forceRename = state
- # -force-skip (-s)
- @property
- def forceSkip(self): return self._forceSkip
- @forceSkip.setter
- def forceSkip(self, state): self._forceSkip = state
- # -force-directory (-d)
- @property
- def forceDirectory(self): return self._forceDirectory
- @forceDirectory.setter
- def forceDirectory(self, state): self._forceDirectory = state
- # -no-directory (-D)
- @property
- def noDirectory(self): return self._noDirectory
- @noDirectory.setter
- def noDirectory(self, state): self._noDirectory = state
- # -password (-p) <string>
- @property
- def password(self): return self._password
- @password.setter
- def password(self, password): self._password = password
- # -encoding (-e) <encoding name>
- @property
- def encoding(self): return self._encoding
- @encoding.setter
- def encoding(self, encoding): self._encoding = encoding
- # -password-encoding (-E) <name>
- @property
- def passwordEncoding(self): return self._passwordEncoding
- @passwordEncoding.setter
- def passwordEncoding(self, encoding): self._passwordEncoding = encoding
- # -indexes (-i)
- @property
- def indexes(self): return self._indexes
- @indexes.setter
- def indexes(self, indexes): self._indexes = indexes
- # -no-recursion (-nr)
- @property
- def noRecursion(self): return self._noRecursion
- @noRecursion.setter
- def noRecursion(self, state): self._noRecursion = state
- # -copy-time (-t)
- @property
- def copyTime(self): return self._copyTime
- @copyTime.setter
- def copyTime(self, state): self._copyTime = state
- # -forks (-k) <visible|hidden|skip>
- @property
- def forks(self): return self._forks
- @forks.setter
- def forks(self, option): self._forks = option
- # -version (-v)
- @property
- def version(self): return self._version
|