123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- import errno
- from subprocess import Popen, PIPE, STDOUT
- import re
- import json
- class Error(Exception):
- """Something did go wrong"""
- class UnknownOption(Error):
- """Unknown option to unar given."""
- class ArchiveNotFound(Error):
- """Archive not found."""
- class PasswordRequired(Error):
- """Password required."""
- class ParseError(Error):
- """ParseError."""
- class UnarchiveError(Error):
- """UnarchiveError."""
- class UnknownFormatError(Error):
- """UnknownFormatError."""
- class JsonResultItem:
- def __init__(self, itemData):
- self._data = itemData
- """
- XADSolidOffset
- XADCompressionName
- XADSolidObject
- XADFileName
- XADIndex
- XADPosixPermissions
- XADLastModificationDate
- XADFileSize
- XADCompressedSize
- XADSolidLength
- """
- def index(self): return self._data.get("XADIndex", None)
- def name(self): return self._data.get("XADFileName", "")
- class JsonResultItemRar(JsonResultItem):
- def __init__(self, itemData):
- JsonResultItem.__init__(self, itemData)
- def attributes(self): return self._data.get('RARAttributes', None)
- def solidIndex(self): return self._data.get('RARSolidIndex', None)
- def compressionVersion(self): return self._data.get('RARCompressionVersion', None)
- def compressionMethod(self): return self._data.get('RARCompressionMethod', None)
- def OS(self): return self._data.get('RAROS', None)
- def CRC32(self): return self._data.get('RARCRC32', None)
- def flags(self): return self._data.get('RARFlags', None)
- def OSName(self): return self._data.get('RAROSName', "")
- class JsonResultItemTar(JsonResultItem):
- def __init__(self, itemData):
- JsonResultItem.__init__(self, itemData)
- def isSparseFile(self): return self._data.get('TARIsSparseFile', None)
- class JsonResultItemZip(JsonResultItem):
- def __init__(self, itemData):
- JsonResultItem.__init__(self, itemData)
- def localDate(self): return self._data.get('ZipLocalDate', 0)
- def compressionMethod(self): return self._data.get('ZipCompressionMethod', None)
- def extractVersion(self): return self._data.get('ZipExtractVersion', None)
- def OS(self): return self._data.get('ZipOS', None)
- def CRC32(self): return self._data.get('ZipCRC32', None)
- def fileAttributes(self): return self._data.get('ZipFileAttributes', None)
- def flags(self): return self._data.get('ZipFlags', None)
- def OSName(self): return self._data.get('ZipOSName', "")
- class JsonResult:
- def __init__(self, data):
- self._data = json.loads(data)
- @staticmethod
- def getFormatName(data):
- """
- getFormatName static method to get archive format name
- @type data: dict
- @param data: json data from the lsar command
- @rtype: string
- @return: Returns the archive it's format (tar, zip, rar, etc..)
- """
- return json.loads(data).get('lsarFormatName', "")
- def formatName(self):
- """
- formatName get archive format name
- @rtype: string
- @return: Returns the archive it's format (tar, zip, rar, etc..)
- """
- return self._data.get('lsarFormatName', "")
- def formatVersion(self):
- """
- formatVersion
- @rtype: string
- @return: Returns the archive it's format (tar, zip, rar, etc..)
- """
- return self._data.get('lsarFormatVersion', None)
- def contents(self):
- """
- contents raw archive contents
- @rtype: list
- @return: Returns a list with dicts (every dict is a file)
- """
- return self._data.get('lsarContents', [])
- def encoding(self):
- """
- encoding archive it's encoding (guess)
- @rtype: string
- @return: Returns a string with the archive it's guessed encoding.
- """
- return self._data.get('lsarEncoding', "")
- def confidence(self): return self._data.get('lsarConfidence', None)
- def properties(self): return self._data.get('lsarProperties', {})
- def isEncrypted(self):
- """
- isEncrypted
- @rtype: bool
- @return: Returns True if the archive is encrypted, False if not.
- """
- return bool(self._data.get('XADIsEncrypted', 0))
- def archiveName(self):
- """
- archiveName
- @rtype: string
- @return: Returns the archive file name
- """
- return self.properties().get('XADArchiveName', "")
- def volumes(self):
- """
- volumes
- @rtype: list
- @return: Returns a list with strings (archive name(s))
- """
- return self.properties().get('XADVolumes', [])
- def contentsObjects(self):
- """
- contentsObjects
- @rtype: list
- @return: Returns a list with JsonResultItem|JsonResultItemZip
- |JsonResultItemRar|JsonResultItemTar objects depending
- on the archive it's format.
- """
- objType = JsonResultItem
- if self.formatName() == 'Zip': objType = JsonResultItemZip
- elif self.formatName() == 'RAR': objType = JsonResultItemRar
- elif self.formatName() == 'Tar': objType = JsonResultItemTar
- l = []
- for item in self.contents(): l.append(objType(item))
- return l
- def isMultiArchive(self):
- """
- isMultiArchive
- @rtype: bool
- @return: Returns True if the archive has multiple volumes,
- else it will return False.
- """
- return True if len(self.volumes()) > 1 else False
- class Lsar:
- # grep -rnw -e "XADException describeXADError" ./unarchiver
- EXCEPTION_PATTERNS = {
- "Unknown option" : UnknownOption,
- "Couldn't open archive." : ArchiveNotFound,
- "This archive requires a password to unpack" : PasswordRequired,
- "Couldn't recognize the archive format." : UnknownFormatError,
- "Archive parsing failed!" : ParseError,
- "Listing failed!" : UnarchiveError
- }
- def __init__(self):
- self._path = '/usr/bin'
- self._command = 'lsar'
- self._args = [] # unar command arguments
- self._files = [] # list of files to extract (alternative is all or use indexes with -i)
- self._file = ''
- # Options
- self._output = ""
- self._forceOverwrite = False
- self._forceRename = False
- self._forceSkip = False
- self._forceDirectory = False
- self._noDirectory = False
- self._password = ""
- self._encoding = None # TODO
- self._passwordEncoding = None # TODO
- self._indexes = []
- self._noRecursion = False
- self._copyTime = False
- self._forks = False
- self._quiet = False
- def run(self, cmd=None, cb=None):
- if cmd == None: cmd = self._compileCmd()
- p = Popen(cmd, stdout=PIPE, stderr=PIPE)
- log = ""
- errLog = ""
- while p.poll() is None:
- o = p.communicate()
- m = o[0].decode('UTF-8')
- log += m
- if o[1]: errLog += o[1].decode('UTF-8')
- exitCode = p.returncode
- # Check for exceptions.
- if exitCode != 0:
- for pattern, exception in self.EXCEPTION_PATTERNS.items():
- for line in log.split('\n'):
- if pattern in line:
- raise exception(line)
- for line in errLog.split('\n'):
- if pattern in line:
- raise exception(line)
- if cb: cb(exitCode)
- return log
- def _compileCmd(self):
- # full command path + command
- args = [self.fullCommandPath]
- # archive path
- if self.file: args.append(self.file)
- # optional files to extract TODO
- #if self._files: args.append(",".join())
- return args
- ########################
- @property
- def path(self): return self._path
- @path.setter
- def path(self, path): self._path = path
- @property
- def command(self): return self._command
- @command.setter
- def command(self, cmd): self._command = cmd
- @property
- def fullCommandPath(self):
- return "{0}/{1}".format(self._path, self._command)
- @fullCommandPath.setter
- def fullCommandPath(self, tupleValue):
- self._path = tupleValue[0]
- self._command = tupleValue[1]
- @property
- def file(self): return self._file
- @file.setter
- def file(self, path):
- self._file = path
- self._data = None
- """ Options
- """
- # -test (-t)
- def test(self): # TODO
- result = self.run(cmd=[self.fullCommandPath, '-t', self.file])
- matches = re.findall("(.*)(\.\.\. ([a-zA-Z]+)(\.|!))", result, re.M)
- tests = {}
- if matches:
- for r in matches: print(r)
- def fetch(self):
- result = self.run(cmd=[self.fullCommandPath, '--json', self.file])
- if result: return JsonResult(result)
- return None
|