web.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import re
  2. import base64
  3. import requests
  4. from html.parser import HTMLParser as html_parser
  5. from config import HEADERS
  6. import time
  7. import sys
  8. import bs4
  9. def pickup_url(text):
  10. """Return a vaild URL from a string"""
  11. PROTOCOLS = ["http:", "https:", "magnet:"]
  12. for protocol in PROTOCOLS:
  13. index = text.find(protocol)
  14. if index == -1:
  15. continue
  16. raw_text = text[index:]
  17. if "」" in raw_text:
  18. # hack for bot-forwarding
  19. return None
  20. return text[index:]
  21. return None
  22. def openConnection(word):
  23. s = requests.Session()
  24. h = {}
  25. for item in HEADERS:
  26. h[item[0]] = item[1]
  27. if re.match("http:/*([^/]+)\\.i2p(/|$)", word) or re.match("http:/*([^/]+)\\.onion(/|$)", word):
  28. from config import I2P_USER, I2P_PASSWORD
  29. timeout = 60
  30. h.pop("X-Forwarded-For")
  31. s.auth = (I2P_USER, I2P_PASSWORD)
  32. s.proxies = {"http": "http://127.0.0.1:4444"}
  33. else:
  34. timeout = 10
  35. try:
  36. return s.get(word, headers=h, timeout=timeout, stream=True, verify=True)
  37. except Exception as e:
  38. raise RuntimeError(type(e).__name__)
  39. def readContents(h, timeout=3):
  40. """Read a little part of the contents"""
  41. contents = b""
  42. counter = 1
  43. MAX = 8192
  44. MAX_LENGTH = 16384
  45. r = h.iter_content(decode_unicode=False)
  46. start_time = time.time()
  47. while len(contents) < MAX_LENGTH and counter < MAX:
  48. if time.time() - start_time > timeout:
  49. raise RuntimeError("Request timeout.")
  50. following_contents = b""
  51. try:
  52. following_contents += next(r)
  53. except (StopIteration, Exception):
  54. break
  55. # Hack: read more when we saw a script
  56. if b"<script" in following_contents:
  57. MAX += 1
  58. MAX_LENGTH += 16384
  59. if following_contents:
  60. contents += following_contents
  61. counter += 1
  62. h.close()
  63. return contents
  64. def lookup_magnet(magnet):
  65. import json
  66. import bs4
  67. bthash_b16 = re.findall(u'(?:\\?|&|&amp;)xt=urn:btih:([0-9A-Fa-f]{40})', magnet)
  68. bthash_b32 = re.findall(u'(?:\\?|&|&amp;)xt=urn:btih:([2-7A-Za-z]{32})', magnet)
  69. if bthash_b16 and bthash_b32:
  70. sys.stderr.write("Assertion error, both bthash!", magnet, "\n")
  71. if bthash_b16:
  72. querystring = bthash_b16[0]
  73. elif bthash_b32:
  74. querystring = base64.b16encode(base64.b32decode(bthash_b32[0]))
  75. else:
  76. # no bt, do not touch url
  77. return
  78. raw_info = readContents(openConnection("https://torrentproject.se/?s=%s&out=json" % querystring))
  79. info = json.loads(raw_info.decode("UTF-8"))
  80. if info["total_found"] != "0":
  81. title = info["1"]["title"]
  82. cat = info["1"]["category"]
  83. size = info["1"]["torrent_size"]
  84. return title, cat, size
  85. # oh, gonna try plan b
  86. raw_info = readContents(openConnection("https://torrentz.eu/%s" % querystring))
  87. page = bs4.BeautifulSoup(raw_info, "html.parser")
  88. try:
  89. div = page.find_all("div", "download", recursive=True)[0]
  90. firstmatch = div.find_all(rel="e")[0]
  91. title = firstmatch.find_all("span")[1].text
  92. cat = firstmatch.text.split(title)[-1].split()[0]
  93. except:
  94. raise RuntimeError("404 Torrent Not Found, maybe DMCA?")
  95. try:
  96. div = page.find_all("div", "files")[0]
  97. size = div.div["title"].replace(",", "").replace("b", "")
  98. except Exception:
  99. size = ""
  100. return title, cat, size
  101. def remove_tailing_space(string):
  102. if not string:
  103. # in bs4, "<title></title>".string is not empty string but None...
  104. return ""
  105. if "\n" not in string:
  106. return string
  107. tmp = string.split("\n")
  108. for idx, str in enumerate(tmp):
  109. tmp[idx] = str.strip()
  110. return " ".join(tmp).strip()
  111. def web_res_info(word):
  112. webInfo = {
  113. "type": "",
  114. "title": None,
  115. "size": ""
  116. }
  117. def htmlDecode(encodedText):
  118. decodedText = ""
  119. for encoding in ("utf-8", "gbk", "gb18030", "iso-8859-1"):
  120. try:
  121. decodedText = encodedText.decode(encoding, errors='replace')
  122. break
  123. except UnicodeDecodeError:
  124. pass
  125. if not decodedText:
  126. decodedText = decodedText
  127. decodedText = html_parser().unescape(decodedText).replace("\r", "").replace("\n", " ").strip()
  128. return decodedText
  129. if word.startswith("magnet:"):
  130. webInfo["title"], webInfo["type"], webInfo["size"] = lookup_magnet(word)
  131. return webInfo
  132. h = openConnection(word)
  133. if "Content-Type" not in h.headers or h.headers["Content-Type"].split(";")[0] == "text/html":
  134. webInfo["type"] = "text/html"
  135. contents = readContents(h)
  136. # Other parsers are really naive,
  137. # they can't even distinguish between comments and code.
  138. soup = bs4.BeautifulSoup(contents, "html5lib")
  139. if soup.title:
  140. # in bs4, "<title></title>".string is not empty string but None...
  141. webInfo["title"] = remove_tailing_space(soup.title.string)
  142. else:
  143. webInfo["type"] = h.headers["Content-Type"]
  144. if "Content-Range" in h.headers:
  145. webInfo["size"] = h.headers["Content-Range"].split("/")[1]
  146. elif "Content-Length" in h.headers:
  147. webInfo["size"] = h.headers["Content-Length"]
  148. return webInfo