shadacat.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. #!/usr/bin/env python3
  2. import os
  3. import sys
  4. import codecs
  5. from enum import Enum
  6. from datetime import datetime
  7. from functools import reduce
  8. import msgpack
  9. class EntryTypes(Enum):
  10. Unknown = -1
  11. Missing = 0
  12. Header = 1
  13. SearchPattern = 2
  14. SubString = 3
  15. HistoryEntry = 4
  16. Register = 5
  17. Variable = 6
  18. GlobalMark = 7
  19. Jump = 8
  20. BufferList = 9
  21. LocalMark = 10
  22. Change = 11
  23. def strtrans_errors(e):
  24. if not isinstance(e, UnicodeDecodeError):
  25. raise NotImplementedError('don’t know how to handle {0} error'.format(
  26. e.__class__.__name__))
  27. return '<{0:x}>'.format(reduce((lambda a, b: a*0x100+b),
  28. list(e.object[e.start:e.end]))), e.end
  29. codecs.register_error('strtrans', strtrans_errors)
  30. def idfunc(o):
  31. return o
  32. class CharInt(int):
  33. def __repr__(self):
  34. return super(CharInt, self).__repr__() + ' (\'%s\')' % chr(self)
  35. ctable = {
  36. bytes: lambda s: s.decode('utf-8', 'strtrans'),
  37. dict: lambda d: dict((mnormalize(k), mnormalize(v)) for k, v in d.items()),
  38. list: lambda l: list(mnormalize(i) for i in l),
  39. int: lambda n: CharInt(n) if 0x20 <= n <= 0x7E else n,
  40. }
  41. def mnormalize(o):
  42. return ctable.get(type(o), idfunc)(o)
  43. fname = sys.argv[1]
  44. try:
  45. filt = sys.argv[2]
  46. except IndexError:
  47. def filt(entry): return True
  48. else:
  49. _filt = filt
  50. def filt(entry): return eval(_filt, globals(), {'entry': entry})
  51. poswidth = len(str(os.stat(fname).st_size or 1000))
  52. class FullEntry(dict):
  53. def __init__(self, val):
  54. self.__dict__.update(val)
  55. with open(fname, 'rb') as fp:
  56. unpacker = msgpack.Unpacker(file_like=fp, read_size=1)
  57. max_type = max(typ.value for typ in EntryTypes)
  58. while True:
  59. try:
  60. pos = fp.tell()
  61. typ = unpacker.unpack()
  62. except msgpack.OutOfData:
  63. break
  64. else:
  65. timestamp = unpacker.unpack()
  66. time = datetime.fromtimestamp(timestamp)
  67. length = unpacker.unpack()
  68. if typ > max_type:
  69. entry = fp.read(length)
  70. typ = EntryTypes.Unknown
  71. else:
  72. entry = unpacker.unpack()
  73. typ = EntryTypes(typ)
  74. full_entry = FullEntry({
  75. 'value': entry,
  76. 'timestamp': timestamp,
  77. 'time': time,
  78. 'length': length,
  79. 'pos': pos,
  80. 'type': typ,
  81. })
  82. if not filt(full_entry):
  83. continue
  84. print('%*u %13s %s %5u %r' % (
  85. poswidth, pos, typ.name, time.isoformat(), length, mnormalize(entry)))