123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- # coding=utf-8
- # !/usr/bin/python
- # by嗷呜
- import json
- import random
- import sys
- import time
- from base64 import b64decode
- from Crypto.Cipher import AES
- from Crypto.Hash import MD5
- from Crypto.Util.Padding import unpad
- sys.path.append('..')
- from base.spider import Spider
- class Spider(Spider):
- def getName(self):
- return "小红书"
- def init(self, extend=""):
- self.did = self.random_str(32)
- self.token,self.phost = self.gettoken()
- pass
- def isVideoFormat(self, url):
- pass
- def manualVideoCheck(self):
- pass
- def destroy(self):
- pass
- def random_str(self,length=16):
- hex_chars = '0123456789abcdef'
- return ''.join(random.choice(hex_chars) for _ in range(length))
- def md5(self, text: str) -> str:
- h = MD5.new()
- h.update(text.encode('utf-8'))
- return h.hexdigest()
- def homeContent(self, filter):
- data = self.fetch(f'{self.host}/api/video/queryClassifyList?mark=4', headers=self.headers()).json()['encData']
- data1 = self.aes(data)
- result = {}
- classes = []
- for k in data1['data']:
- classes.append({'type_name': k['classifyTitle'], 'type_id': k['classifyId']})
- result['class'] = classes
- return result
- def homeVideoContent(self):
- pass
- def categoryContent(self, tid, pg, filter, extend):
- path=f'/api/short/video/getShortVideos?classifyId={tid}&videoMark=4&page={pg}&pageSize=20'
- result = {}
- videos = []
- data=self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
- vdata=self.aes(data)
- for k in vdata['data']:
- videos.append({"vod_id": k['videoId'], 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + '&url=' + k['coverImg'],
- 'vod_remarks': self.dtim(k.get('playTime'))})
- result["list"] = videos
- result["page"] = pg
- result["pagecount"] = 9999
- result["limit"] = 90
- result["total"] = 999999
- return result
- def detailContent(self, ids):
- path = f'/api/video/getVideoById?videoId={ids[0]}'
- data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
- v = self.aes(data)
- d=f'{v["title"]}$auth_key={v["authKey"]}&path={v["videoUrl"]}'
- vod = {'vod_name': v["title"], 'type_name': ''.join(v.get('tagTitles',[])),'vod_play_from': v.get('nickName') or "小红书官方", 'vod_play_url': d}
- result = {"list": [vod]}
- return result
- def searchContent(self, key, quick, pg='1'):
- pass
- def playerContent(self, flag, id, vipFlags):
- h=self.headers()
- h['Authorization'] = h.pop('aut')
- del h['deviceid']
- result = {"parse": 0, "url": f"{self.host}/api/m3u8/decode/authPath?{id}", "header": h}
- return result
- def localProxy(self, param):
- return self.action(param)
- def aes(self, word):
- key = b64decode("SmhiR2NpT2lKSVV6STFOaQ==")
- iv = key
- cipher = AES.new(key, AES.MODE_CBC, iv)
- decrypted = unpad(cipher.decrypt(b64decode(word)), AES.block_size)
- return json.loads(decrypted.decode('utf-8'))
- def dtim(self, seconds):
- try:
- seconds = int(seconds)
- hours = seconds // 3600
- remaining_seconds = seconds % 3600
- minutes = remaining_seconds // 60
- remaining_seconds = remaining_seconds % 60
- formatted_minutes = str(minutes).zfill(2)
- formatted_seconds = str(remaining_seconds).zfill(2)
- if hours > 0:
- formatted_hours = str(hours).zfill(2)
- return f"{formatted_hours}:{formatted_minutes}:{formatted_seconds}"
- else:
- return f"{formatted_minutes}:{formatted_seconds}"
- except:
- return ''
- def getsign(self):
- t=str(int(time.time() * 1000))
- return self.md5(t[3:8])
- def gettoken(self):
- url = f'{self.host}/api/user/traveler'
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6',
- 'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), }
- data = {'deviceId': self.did, 'tt': 'U', 'code': '', 'chCode': 'dafe13'}
- data1 = self.post(url, json=data, headers=headers).json()
- data2 = data1['data']
- return data2['token'], data2['imgDomain']
- host = 'https://jhfkdnov21vfd.fhoumpjjih.work'
- def headers(self):
- henda = {
- 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6',
- 'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), 'aut': self.token}
- return henda
- def action(self, param):
- headers = {
- 'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'}
- data = self.fetch(f'{self.phost}{param["url"]}', headers=headers)
- type=data.headers.get('Content-Type').split(';')[0]
- base64_data = self.img(data.content, 100, '2020-zq3-888')
- return [200, type, base64_data]
- def img(self, data: bytes, length: int, key: str):
- GIF = b'\x47\x49\x46'
- JPG = b'\xFF\xD8\xFF'
- PNG = b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'
- def is_dont_need_decode_for_gif(data):
- return len(data) > 2 and data[:3] == GIF
- def is_dont_need_decode_for_jpg(data):
- return len(data) > 7 and data[:3] == JPG
- def is_dont_need_decode_for_png(data):
- return len(data) > 7 and data[1:8] == PNG[1:8]
- if is_dont_need_decode_for_png(data):
- return data
- elif is_dont_need_decode_for_gif(data):
- return data
- elif is_dont_need_decode_for_jpg(data):
- return data
- else:
- key_bytes = key.encode('utf-8')
- result = bytearray(data)
- for i in range(length):
- result[i] ^= key_bytes[i % len(key_bytes)]
- return bytes(result)