magic.py 25 KB


  1. # Found on a russian zope mailing list, and modified to fix bugs in parsing
  2. # the magic file and string making
  3. # -- Daniel Berlin <dberlin@dberlin.org>
  4. import sys, struct, time, re, exceptions, pprint, stat, os, pwd, grp
  5. _mew = 0
  6. # _magic='/tmp/magic'
  7. # _magic='/usr/share/magic.mime'
  8. _magic='/usr/share/magic.mime'
  9. mime = 1
  10. _ldate_adjust = lambda x: time.mktime( time.gmtime(x) )
  11. BUFFER_SIZE = 1024 * 128 # 128K should be enough...
  12. class MagicError(exceptions.Exception): pass
  13. def _handle(fmt='@x',adj=None): return fmt, struct.calcsize(fmt), adj
  14. KnownTypes = {
  15. # 'byte':_handle('@b'),
  16. 'byte':_handle('@B'),
  17. 'ubyte':_handle('@B'),
  18. 'string':('s',0,None),
  19. 'pstring':_handle('p'),
  20. # 'short':_handle('@h'),
  21. # 'beshort':_handle('>h'),
  22. # 'leshort':_handle('<h'),
  23. 'short':_handle('@H'),
  24. 'beshort':_handle('>H'),
  25. 'leshort':_handle('<H'),
  26. 'ushort':_handle('@H'),
  27. 'ubeshort':_handle('>H'),
  28. 'uleshort':_handle('<H'),
  29. 'long':_handle('@l'),
  30. 'belong':_handle('>l'),
  31. 'lelong':_handle('<l'),
  32. 'ulong':_handle('@L'),
  33. 'ubelong':_handle('>L'),
  34. 'ulelong':_handle('<L'),
  35. 'date':_handle('=l'),
  36. 'bedate':_handle('>l'),
  37. 'ledate':_handle('<l'),
  38. 'ldate':_handle('=l',_ldate_adjust),
  39. 'beldate':_handle('>l',_ldate_adjust),
  40. 'leldate':_handle('<l',_ldate_adjust),
  41. }
  42. _mew_cnt = 0
  43. def mew(x):
  44. global _mew_cnt
  45. if _mew :
  46. if x=='.' :
  47. _mew_cnt += 1
  48. if _mew_cnt % 64 == 0 : sys.stderr.write( '\n' )
  49. sys.stderr.write( '.' )
  50. else:
  51. sys.stderr.write( '\b'+x )
  52. def has_format(s):
  53. n = 0
  54. l = None
  55. for c in s :
  56. if c == '%' :
  57. if l == '%' : n -= 1
  58. else : n += 1
  59. l = c
  60. return n
  61. def read_asciiz(file,size=None,pos=None):
  62. s = []
  63. if pos :
  64. mew('s')
  65. file.seek( pos, 0 )
  66. mew('z')
  67. if size is not None :
  68. s = [file.read( size ).split('\0')[0]]
  69. else:
  70. while 1 :
  71. c = file.read(1)
  72. if (not c) or (ord(c)==0) or (c=='\n') : break
  73. s.append (c)
  74. mew('Z')
  75. return ''.join(s)
  76. def a2i(v,base=0):
  77. if v[-1:] in 'lL' : v = v[:-1]
  78. return int( v, base )
  79. _cmap = {
  80. '\\' : '\\',
  81. '0' : '\0',
  82. }
  83. for c in range(ord('a'),ord('z')+1) :
  84. try : e = eval('"\\%c"' % chr(c))
  85. except ValueError : pass
  86. else : _cmap[chr(c)] = e
  87. else:
  88. del c
  89. del e
  90. def make_string(s):
  91. return eval( '"'+s.replace('"','\\"')+'"')
  92. class MagicTestError(MagicError): pass
  93. class MagicTest:
  94. def __init__(self,offset,mtype,test,message,line=None,level=None):
  95. self.line, self.level = line, level
  96. self.mtype = mtype
  97. self.mtest = test
  98. self.subtests = []
  99. self.mask = None
  100. self.smod = None
  101. self.nmod = None
  102. self.offset, self.type, self.test, self.message = \
  103. offset,mtype,test,message
  104. if self.mtype == 'true' : return # XXX hack to enable level skips
  105. if test[-1:]=='\\' and test[-2:]!='\\\\' :
  106. self.test += 'n' # looks like someone wanted EOL to match?
  107. if mtype[:6]=='string' :
  108. if '/' in mtype : # for strings
  109. self.type, self.smod = \
  110. mtype[:mtype.find('/')], mtype[mtype.find('/')+1:]
  111. else:
  112. for nm in '&+-' :
  113. if nm in mtype : # for integer-based
  114. self.nmod, self.type, self.mask = (
  115. nm,
  116. mtype[:mtype.find(nm)],
  117. # convert mask to int, autodetect base
  118. int( mtype[mtype.find(nm)+1:], 0 )
  119. )
  120. break
  121. self.struct, self.size, self.cast = KnownTypes[ self.type ]
  122. def __str__(self):
  123. return '%s %s %s %s' % (
  124. self.offset, self.mtype, self.mtest, self.message
  125. )
  126. def __repr__(self):
  127. return 'MagicTest(%s,%s,%s,%s,line=%s,level=%s,subtests=\n%s%s)' % (
  128. `self.offset`, `self.mtype`, `self.mtest`, `self.message`,
  129. `self.line`, `self.level`,
  130. '\t'*self.level, pprint.pformat(self.subtests)
  131. )
  132. def run(self,file):
  133. result = ''
  134. do_close = 0
  135. try:
  136. if type(file) == type('x') :
  137. file = open( file, 'r', BUFFER_SIZE )
  138. do_close = 1
  139. # else:
  140. # saved_pos = file.tell()
  141. if self.mtype != 'true' :
  142. data = self.read(file)
  143. last = file.tell()
  144. else:
  145. data = last = None
  146. if self.check( data ) :
  147. result = self.message+' '
  148. if has_format( result ) : result %= data
  149. for test in self.subtests :
  150. m = test.run(file)
  151. if m is not None : result += m
  152. return make_string( result )
  153. finally:
  154. if do_close :
  155. file.close()
  156. # else:
  157. # file.seek( saved_pos, 0 )
  158. def get_mod_and_value(self):
  159. if self.type[-6:] == 'string' :
  160. # "something like\tthis\n"
  161. if self.test[0] in '=<>' :
  162. mod, value = self.test[0], make_string( self.test[1:] )
  163. else:
  164. mod, value = '=', make_string( self.test )
  165. else:
  166. if self.test[0] in '=<>&^' :
  167. mod, value = self.test[0], a2i(self.test[1:])
  168. elif self.test[0] == 'x':
  169. mod = self.test[0]
  170. value = 0
  171. else:
  172. mod, value = '=', a2i(self.test)
  173. return mod, value
  174. def read(self,file):
  175. mew( 's' )
  176. file.seek( self.offset(file), 0 ) # SEEK_SET
  177. mew( 'r' )
  178. try:
  179. data = rdata = None
  180. # XXX self.size might be 0 here...
  181. if self.size == 0 :
  182. # this is an ASCIIZ string...
  183. size = None
  184. if self.test != '>\\0' : # magic's hack for string read...
  185. value = self.get_mod_and_value()[1]
  186. size = (value=='\0') and None or len(value)
  187. rdata = data = read_asciiz( file, size=size )
  188. else:
  189. rdata = file.read( self.size )
  190. if not rdata or (len(rdata)!=self.size) : return None
  191. data = struct.unpack( self.struct, rdata )[0] # XXX hack??
  192. except:
  193. print >>sys.stderr, self
  194. print >>sys.stderr, '@%s struct=%s size=%d rdata=%s' % (
  195. self.offset, `self.struct`, self.size,`rdata`)
  196. raise
  197. mew( 'R' )
  198. if self.cast : data = self.cast( data )
  199. if self.mask :
  200. try:
  201. if self.nmod == '&' : data &= self.mask
  202. elif self.nmod == '+' : data += self.mask
  203. elif self.nmod == '-' : data -= self.mask
  204. else: raise MagicTestError(self.nmod)
  205. except:
  206. print >>sys.stderr,'data=%s nmod=%s mask=%s' % (
  207. `data`, `self.nmod`, `self.mask`
  208. )
  209. raise
  210. return data
  211. def check(self,data):
  212. mew('.')
  213. if self.mtype == 'true' :
  214. return '' # not None !
  215. mod, value = self.get_mod_and_value()
  216. if self.type[-6:] == 'string' :
  217. # "something like\tthis\n"
  218. if self.smod :
  219. xdata = data
  220. if 'b' in self.smod : # all blanks are optional
  221. xdata = ''.join( data.split() )
  222. value = ''.join( value.split() )
  223. if 'c' in self.smod : # all blanks are optional
  224. xdata = xdata.upper()
  225. value = value.upper()
  226. # if 'B' in self.smod : # compact blanks
  227. ### XXX sorry, i don't understand this :-(
  228. # data = ' '.join( data.split() )
  229. # if ' ' not in data : return None
  230. else:
  231. xdata = data
  232. try:
  233. if mod == '=' : result = data == value
  234. elif mod == '<' : result = data < value
  235. elif mod == '>' : result = data > value
  236. elif mod == '&' : result = data & value
  237. elif mod == '^' : result = (data & (~value)) == 0
  238. elif mod == 'x' : result = 1
  239. else : raise MagicTestError(self.test)
  240. if result :
  241. zdata, zval = `data`, `value`
  242. if self.mtype[-6:]!='string' :
  243. try: zdata, zval = hex(data), hex(value)
  244. except: zdata, zval = `data`, `value`
  245. if 0 : print >>sys.stderr, '%s @%s %s:%s %s %s => %s (%s)' % (
  246. '>'*self.level, self.offset,
  247. zdata, self.mtype, `mod`, zval, `result`,
  248. self.message
  249. )
  250. return result
  251. except:
  252. print >>sys.stderr,'mtype=%s data=%s mod=%s value=%s' % (
  253. `self.mtype`, `data`, `mod`, `value`
  254. )
  255. raise
  256. def add(self,mt):
  257. if not isinstance(mt,MagicTest) :
  258. raise MagicTestError((mt,'incorrect subtest type %s'%(type(mt),)))
  259. if mt.level == self.level+1 :
  260. self.subtests.append( mt )
  261. elif self.subtests :
  262. self.subtests[-1].add( mt )
  263. elif mt.level > self.level+1 :
  264. # it's possible to get level 3 just after level 1 !!! :-(
  265. level = self.level + 1
  266. while level < mt.level :
  267. xmt = MagicTest(None,'true','x','',line=self.line,level=level)
  268. self.add( xmt )
  269. level += 1
  270. else:
  271. self.add( mt ) # retry...
  272. else:
  273. raise MagicTestError((mt,'incorrect subtest level %s'%(`mt.level`,)))
  274. def last_test(self):
  275. return self.subtests[-1]
  276. #end class MagicTest
  277. class OffsetError(MagicError): pass
  278. class Offset:
  279. pos_format = {'b':'<B','B':'>B','s':'<H','S':'>H','l':'<I','L':'>I',}
  280. pattern0 = re.compile(r''' # mere offset
  281. ^
  282. &? # possible ampersand
  283. ( 0 # just zero
  284. | [1-9]{1,1}[0-9]* # decimal
  285. | 0[0-7]+ # octal
  286. | 0x[0-9a-f]+ # hex
  287. )
  288. $
  289. ''', re.X|re.I
  290. )
  291. pattern1 = re.compile(r''' # indirect offset
  292. ^\(
  293. (?P<base>&?0 # just zero
  294. |&?[1-9]{1,1}[0-9]* # decimal
  295. |&?0[0-7]* # octal
  296. |&?0x[0-9A-F]+ # hex
  297. )
  298. (?P<type>
  299. \. # this dot might be alone
  300. [BSL]? # one of this chars in either case
  301. )?
  302. (?P<sign>
  303. [-+]{0,1}
  304. )?
  305. (?P<off>0 # just zero
  306. |[1-9]{1,1}[0-9]* # decimal
  307. |0[0-7]* # octal
  308. |0x[0-9a-f]+ # hex
  309. )?
  310. \)$''', re.X|re.I
  311. )
  312. def __init__(self,s):
  313. self.source = s
  314. self.value = None
  315. self.relative = 0
  316. self.base = self.type = self.sign = self.offs = None
  317. m = Offset.pattern0.match( s )
  318. if m : # just a number
  319. if s[0] == '&' :
  320. self.relative, self.value = 1, int( s[1:], 0 )
  321. else:
  322. self.value = int( s, 0 )
  323. return
  324. m = Offset.pattern1.match( s )
  325. if m : # real indirect offset
  326. try:
  327. self.base = m.group('base')
  328. if self.base[0] == '&' :
  329. self.relative, self.base = 1, int( self.base[1:], 0 )
  330. else:
  331. self.base = int( self.base, 0 )
  332. if m.group('type') : self.type = m.group('type')[1:]
  333. self.sign = m.group('sign')
  334. if m.group('off') : self.offs = int( m.group('off'), 0 )
  335. if self.sign == '-' : self.offs = 0 - self.offs
  336. except:
  337. print >>sys.stderr, '$$', m.groupdict()
  338. raise
  339. return
  340. raise OffsetError(`s`)
  341. def __call__(self,file=None):
  342. if self.value is not None : return self.value
  343. pos = file.tell()
  344. try:
  345. if not self.relative : file.seek( self.offset, 0 )
  346. frmt = Offset.pos_format.get( self.type, 'I' )
  347. size = struct.calcsize( frmt )
  348. data = struct.unpack( frmt, file.read( size ) )
  349. if self.offs : data += self.offs
  350. return data
  351. finally:
  352. file.seek( pos, 0 )
  353. def __str__(self): return self.source
  354. def __repr__(self): return 'Offset(%s)' % `self.source`
  355. #end class Offset
  356. class MagicFileError(MagicError): pass
  357. class MagicFile:
  358. def __init__(self,filename=_magic):
  359. self.file = None
  360. self.tests = []
  361. self.total_tests = 0
  362. self.load( filename )
  363. self.ack_tests = None
  364. self.nak_tests = None
  365. def __del__(self):
  366. self.close()
  367. def load(self,filename=None):
  368. self.open( filename )
  369. self.parse()
  370. self.close()
  371. def open(self,filename=None):
  372. self.close()
  373. if filename is not None :
  374. self.filename = filename
  375. self.file = open( self.filename, 'r', BUFFER_SIZE )
  376. def close(self):
  377. if self.file :
  378. self.file.close()
  379. self.file = None
  380. def parse(self):
  381. line_no = 0
  382. for line in self.file.xreadlines() :
  383. line_no += 1
  384. if not line or line[0]=='#' : continue
  385. line = line.lstrip().rstrip('\r\n')
  386. if not line or line[0]=='#' : continue
  387. try:
  388. x = self.parse_line( line )
  389. if x is None :
  390. print >>sys.stderr, '#[%04d]#'%line_no, line
  391. continue
  392. except:
  393. print >>sys.stderr, '###[%04d]###'%line_no, line
  394. raise
  395. self.total_tests += 1
  396. level, offset, mtype, test, message = x
  397. new_test = MagicTest(offset,mtype,test,message,
  398. line=line_no,level=level)
  399. try:
  400. if level == 0 :
  401. self.tests.append( new_test )
  402. else:
  403. self.tests[-1].add( new_test )
  404. except:
  405. if 1 :
  406. print >>sys.stderr, 'total tests=%s' % (
  407. `self.total_tests`,
  408. )
  409. print >>sys.stderr, 'level=%s' % (
  410. `level`,
  411. )
  412. print >>sys.stderr, 'tests=%s' % (
  413. pprint.pformat(self.tests),
  414. )
  415. raise
  416. else:
  417. while self.tests[-1].level > 0 :
  418. self.tests.pop()
  419. def parse_line(self,line):
  420. # print >>sys.stderr, 'line=[%s]' % line
  421. if (not line) or line[0]=='#' : return None
  422. level = 0
  423. offset = mtype = test = message = ''
  424. mask = None
  425. # get optional level (count leading '>')
  426. while line and line[0]=='>' :
  427. line, level = line[1:], level+1
  428. # get offset
  429. while line and not line[0].isspace() :
  430. offset, line = offset+line[0], line[1:]
  431. try:
  432. offset = Offset(offset)
  433. except:
  434. print >>sys.stderr, 'line=[%s]' % line
  435. raise
  436. # skip spaces
  437. line = line.lstrip()
  438. # get type
  439. c = None
  440. while line :
  441. last_c, c, line = c, line[0], line[1:]
  442. if last_c!='\\' and c.isspace() :
  443. break # unescaped space - end of field
  444. else:
  445. mtype += c
  446. if last_c == '\\' :
  447. c = None # don't fuck my brain with sequential backslashes
  448. # skip spaces
  449. line = line.lstrip()
  450. # get test
  451. c = None
  452. while line :
  453. last_c, c, line = c, line[0], line[1:]
  454. if last_c!='\\' and c.isspace() :
  455. break # unescaped space - end of field
  456. else:
  457. test += c
  458. if last_c == '\\' :
  459. c = None # don't fuck my brain with sequential backslashes
  460. # skip spaces
  461. line = line.lstrip()
  462. # get message
  463. message = line
  464. if mime and line.find("\t") != -1:
  465. message=line[0:line.find("\t")]
  466. #
  467. # print '>>', level, offset, mtype, test, message
  468. return level, offset, mtype, test, message
  469. def detect(self,file):
  470. self.ack_tests = 0
  471. self.nak_tests = 0
  472. answers = []
  473. for test in self.tests :
  474. message = test.run( file )
  475. if message :
  476. self.ack_tests += 1
  477. answers.append( message )
  478. else:
  479. self.nak_tests += 1
  480. if answers :
  481. return '; '.join( answers )
  482. #end class MagicFile
  483. def username(uid):
  484. try:
  485. return pwd.getpwuid( uid )[0]
  486. except:
  487. return '#%s'%uid
  488. def groupname(gid):
  489. try:
  490. return grp.getgrgid( gid )[0]
  491. except:
  492. return '#%s'%gid
  493. def get_file_type(fname,follow):
  494. t = None
  495. if not follow :
  496. try:
  497. st = os.lstat( fname ) # stat that entry, don't follow links!
  498. except os.error, why :
  499. pass
  500. else:
  501. if stat.S_ISLNK(st[stat.ST_MODE]) :
  502. t = 'symbolic link'
  503. try:
  504. lnk = os.readlink( fname )
  505. except:
  506. t += ' (unreadable)'
  507. else:
  508. t += ' to '+lnk
  509. if t is None :
  510. try:
  511. st = os.stat( fname )
  512. except os.error, why :
  513. return "can't stat `%s' (%s)." % (why.filename,why.strerror)
  514. dmaj, dmin = (st.st_rdev>>8)&0x0FF, st.st_rdev&0x0FF
  515. if 0 : pass
  516. elif stat.S_ISSOCK(st.st_mode) : t = 'socket'
  517. elif stat.S_ISLNK (st.st_mode) : t = follow and 'symbolic link' or t
  518. elif stat.S_ISREG (st.st_mode) : t = 'file'
  519. elif stat.S_ISBLK (st.st_mode) : t = 'block special (%d/%d)'%(dmaj,dmin)
  520. elif stat.S_ISDIR (st.st_mode) : t = 'directory'
  521. elif stat.S_ISCHR (st.st_mode) : t = 'character special (%d/%d)'%(dmaj,dmin)
  522. elif stat.S_ISFIFO(st.st_mode) : t = 'pipe'
  523. else: t = '<unknown>'
  524. if st.st_mode & stat.S_ISUID :
  525. t = 'setuid(%d=%s) %s'%(st.st_uid,username(st.st_uid),t)
  526. if st.st_mode & stat.S_ISGID :
  527. t = 'setgid(%d=%s) %s'%(st.st_gid,groupname(st.st_gid),t)
  528. if st.st_mode & stat.S_ISVTX :
  529. t = 'sticky '+t
  530. return t
  531. HELP = '''%s [options] [files...]
  532. Options:
  533. -?, --help -- this help
  534. -m, --magic=<file> -- use this magic <file> instead of %s
  535. -f, --files=<namefile> -- read filenames for <namefile>
  536. * -C, --compile -- write "compiled" magic file
  537. -b, --brief -- don't prepend filenames to output lines
  538. + -c, --check -- check the magic file
  539. -i, --mime -- output MIME types
  540. * -k, --keep-going -- don't stop st the first match
  541. -n, --flush -- flush stdout after each line
  542. -v, --verson -- print version and exit
  543. * -z, --compressed -- try to look inside compressed files
  544. -L, --follow -- follow symlinks
  545. -s, --special -- don't skip special files
  546. * -- not implemented so far ;-)
  547. + -- implemented, but in another way...
  548. '''
  549. def main():
  550. import getopt
  551. global _magic
  552. try:
  553. brief = 0
  554. flush = 0
  555. follow= 0
  556. mime = 0
  557. check = 0
  558. special=0
  559. try:
  560. opts, args = getopt.getopt(
  561. sys.argv[1:],
  562. '?m:f:CbciknvzLs',
  563. ( 'help',
  564. 'magic=',
  565. 'names=',
  566. 'compile',
  567. 'brief',
  568. 'check',
  569. 'mime',
  570. 'keep-going',
  571. 'flush',
  572. 'version',
  573. 'compressed',
  574. 'follow',
  575. 'special',
  576. )
  577. )
  578. except getopt.error, why:
  579. print >>sys.stderr, sys.argv[0], why
  580. return 1
  581. else:
  582. files = None
  583. for o,v in opts :
  584. if o in ('-?','--help'):
  585. print HELP % (
  586. sys.argv[0],
  587. _magic,
  588. )
  589. return 0
  590. elif o in ('-f','--files='):
  591. files = v
  592. elif o in ('-m','--magic='):
  593. _magic = v[:]
  594. elif o in ('-C','--compile'):
  595. pass
  596. elif o in ('-b','--brief'):
  597. brief = 1
  598. elif o in ('-c','--check'):
  599. check = 1
  600. elif o in ('-i','--mime'):
  601. mime = 1
  602. if os.path.exists( _magic+'.mime' ) :
  603. _magic += '.mime'
  604. print >>sys.stderr,sys.argv[0]+':',\
  605. "Using regular magic file `%s'" % _magic
  606. elif o in ('-k','--keep-going'):
  607. pass
  608. elif o in ('-n','--flush'):
  609. flush = 1
  610. elif o in ('-v','--version'):
  611. print 'VERSION'
  612. return 0
  613. elif o in ('-z','--compressed'):
  614. pass
  615. elif o in ('-L','--follow'):
  616. follow = 1
  617. elif o in ('-s','--special'):
  618. special = 1
  619. else:
  620. if files :
  621. files = map(lambda x: x.strip(), v.split(','))
  622. if '-' in files and '-' in args :
  623. error( 1, 'cannot use STDIN simultaneously for file list and data' )
  624. for file in files :
  625. for name in (
  626. (file=='-')
  627. and sys.stdin
  628. or open(file,'r',BUFFER_SIZE)
  629. ).xreadlines():
  630. name = name.strip()
  631. if name not in args :
  632. args.append( name )
  633. try:
  634. if check : print >>sys.stderr, 'Loading magic database...'
  635. t0 = time.time()
  636. m = MagicFile(_magic)
  637. t1 = time.time()
  638. if check :
  639. print >>sys.stderr, \
  640. m.total_tests, 'tests loaded', \
  641. 'for', '%.2f' % (t1-t0), 'seconds'
  642. print >>sys.stderr, len(m.tests), 'tests at top level'
  643. return 0 # XXX "shortened" form ;-)
  644. mlen = max( map(len, args) )+1
  645. for arg in args :
  646. if not brief : print (arg + ':').ljust(mlen),
  647. ftype = get_file_type( arg, follow )
  648. if (special and ftype.find('special')>=0) \
  649. or ftype[-4:] == 'file' :
  650. t0 = time.time()
  651. try:
  652. t = m.detect( arg )
  653. except (IOError,os.error), why:
  654. t = "can't read `%s' (%s)" % (why.filename,why.strerror)
  655. if ftype[-4:] == 'file' : t = ftype[:-4] + t
  656. t1 = time.time()
  657. print t and t or 'data'
  658. if 0 : print \
  659. '#\t%d tests ok, %d tests failed for %.2f seconds'%\
  660. (m.ack_tests, m.nak_tests, t1-t0)
  661. else:
  662. print mime and 'application/x-not-regular-file' or ftype
  663. if flush : sys.stdout.flush()
  664. # print >>sys.stderr, 'DONE'
  665. except:
  666. if check : return 1
  667. raise
  668. else:
  669. return 0
  670. finally:
  671. pass
  672. if __name__ == '__main__' :
  673. sys.exit( main() )
  674. # vim:ai
  675. # EOF #