_reference_finder.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import gc as _gc
  2. import inspect
  3. import logging
  4. import types as _types
  5. import typing
  6. logger = logging.getLogger(__name__)
  7. def proxy0(data):
  8. def proxy1():
  9. return data
  10. return proxy1
  11. _CELLTYPE = type(proxy0(None).__closure__[0])
  12. def replace_all_refs(replace_from: typing.Any, replace_to: typing.Any) -> typing.Any:
  13. """
  14. :summary: Uses the :mod:`gc` module to replace all references to obj
  15. :attr:`replace_from` with :attr:`replace_to` (it tries it's best,
  16. anyway).
  17. :param replace_from: The obj you want to replace.
  18. :param replace_to: The new objject you want in place of the old one.
  19. :returns: The replace_from
  20. """
  21. # https://github.com/cart0113/pyjack/blob/dd1f9b70b71f48335d72f53ee0264cf70dbf4e28/pyjack.py
  22. _gc.collect()
  23. hit = False
  24. for referrer in _gc.get_referrers(replace_from):
  25. # FRAMES -- PASS THEM UP
  26. if isinstance(referrer, _types.FrameType):
  27. continue
  28. # DICTS
  29. if isinstance(referrer, dict):
  30. cls = None
  31. # THIS CODE HERE IS TO DEAL WITH DICTPROXY TYPES
  32. if "__dict__" in referrer and "__weakref__" in referrer:
  33. for cls in _gc.get_referrers(referrer):
  34. if inspect.isclass(cls) and cls.__dict__ == referrer:
  35. break
  36. for key, value in referrer.items():
  37. # REMEMBER TO REPLACE VALUES ...
  38. if value is replace_from:
  39. hit = True
  40. value = replace_to
  41. referrer[key] = value
  42. if cls: # AGAIN, CLEANUP DICTPROXY PROBLEM
  43. setattr(cls, key, replace_to)
  44. # AND KEYS.
  45. if key is replace_from:
  46. hit = True
  47. del referrer[key]
  48. referrer[replace_to] = value
  49. elif isinstance(referrer, list):
  50. for i, value in enumerate(referrer):
  51. if value is replace_from:
  52. hit = True
  53. referrer[i] = replace_to
  54. elif isinstance(referrer, set):
  55. referrer.remove(replace_from)
  56. referrer.add(replace_to)
  57. hit = True
  58. elif isinstance(
  59. referrer,
  60. (
  61. tuple,
  62. frozenset,
  63. ),
  64. ):
  65. new_tuple = []
  66. for obj in referrer:
  67. if obj is replace_from:
  68. new_tuple.append(replace_to)
  69. else:
  70. new_tuple.append(obj)
  71. replace_all_refs(referrer, type(referrer)(new_tuple))
  72. elif isinstance(referrer, _CELLTYPE):
  73. def _proxy0(data):
  74. def proxy1():
  75. return data
  76. return proxy1
  77. proxy = _proxy0(replace_to)
  78. newcell = proxy.__closure__[0]
  79. replace_all_refs(referrer, newcell)
  80. elif isinstance(referrer, _types.FunctionType):
  81. localsmap = {}
  82. for key in ["code", "globals", "name", "defaults", "closure"]:
  83. orgattr = getattr(referrer, f"__{key}__")
  84. localsmap[key] = replace_to if orgattr is replace_from else orgattr
  85. localsmap["argdefs"] = localsmap["defaults"]
  86. del localsmap["defaults"]
  87. newfn = _types.FunctionType(**localsmap)
  88. replace_all_refs(referrer, newfn)
  89. else:
  90. logger.debug("%s is not supported.", referrer)
  91. if hit is False:
  92. raise AttributeError(f"Object '{replace_from}' not found")
  93. return replace_from