123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- import hashlib
- import pyaes
- import blowfish
- import binascii
- from urllib.request import urlopen
- import demixed.utils
- from generic.track import Track
- import demixed.artist
- import demixed.album
- import demixed.track_quality
- class DeezerTrack(Track):
- def set_raw_data(self, song_data):
- try:
- date_value = song_data["PHYSICAL_RELEASE_DATE"]
- except:
- date_value = None
- self.set_values(
- title = song_data["SNG_TITLE"],
- song_id = song_data["SNG_ID"],
- artists = list(map(
- lambda a: demixed.artist.DeezerArtist.grab_or_create(
- a["ART_ID"],
- artist_id = a["ART_ID"],
- name = a["ART_NAME"],
- picture_url = demixed.utils.ARTIST_PICTURES_URL % a["ART_PICTURE"]
- ),
- song_data["ARTISTS"]
- )),
- album = demixed.album.DeezerAlbum.grab_or_create(
- song_data["ALB_ID"],
- album_id = song_data["ALB_ID"],
- title = song_data["ALB_TITLE"],
- date = date_value,
- picture_url = demixed.utils.ALBUM_PICTURES_URL % song_data["ALB_PICTURE"],
- picture_thumbnail_url = demixed.utils.ALBUM_PICTURES_URL_THUMBNAIL % song_data["ALB_PICTURE"]
- ),
- md5_origin = song_data["MD5_ORIGIN"],
- media_version = song_data["MEDIA_VERSION"],
- qualities = demixed.track_quality.DeezerTrackQuality.from_raw_data(song_data),
- disk_number = song_data["DISK_NUMBER"],
- track_number = song_data["TRACK_NUMBER"],
- isrc = song_data["ISRC"],
- is_full = True
- )
- def set_values(
- self,
- song_id = None,
- title = None,
- artists = None,
- album = None,
- md5_origin = None,
- media_version = None,
- qualities = None,
- is_full = False, # if the album is fully loaded
- disk_number = None,
- track_number = None,
- isrc = None,
- raw_data = None
- ):
- if raw_data != None:
- self.set_raw_data(raw_data)
- return
- self.song_id = song_id
- self.title = title
- self.artists = artists
- self.album = album
- self.md5_origin = md5_origin
- self.media_version = media_version
- self.qualities = qualities
- self.disk_number = disk_number
- self.track_number = track_number
- self.isrc = isrc
- self.is_full = is_full
- def get_full_data(self, deezer_instance):
- if not self.is_full:
- raw_data = deezer_instance.get_track_data(self.song_id)
- self.set_raw_data(raw_data)
- if self.artists != None:
- for artist in self.artists:
- if not artist.is_full:
- artist.get_full_data(deezer_instance)
- if self.album != None:
- if not album.is_full:
- self.album.get_full_data(deezer_instance)
- def stream(self, stream, quality = None, progress_callback = None):
- if quality == None:
- quality = self.guess_quality()
- quality_to_number = {"FLAC": "9", "MP3_320": "3", "MP3_128": "1"}
- try:
- quality = quality_to_number[quality]
- except:
- quality = quality_to_number[self.guess_quality()]
- file_format = demixed.track_quality.DeezerTrackQuality.get_file_format(quality)
- # get track url
- # - step 1: put a bunch of info together and hash it
- step1 = b'\xa4'.join(map(
- lambda s: s.encode(), [
- self.md5_origin,
- quality,
- str(self.song_id),
- self.media_version
- ]
- ))
- hash = hashlib.new("md5")
- hash.update(step1)
- step1_hash = hash.hexdigest()
- # - step 2: hash + info + add padding
- step2 = str.encode(step1_hash) + b'\xa4' + step1 + b'\xa4'
- while len(step2) % 16 > 0:
- step2 += b'.'
- # - step 3: AES encryption to get url
- # it will encrypt in parts of 16
- step3 = b''
- aes = pyaes.AESModeOfOperationECB(b'jo6aey6haid2Teih')
- for index in range(int(len(step2) / 16)):
- block = step2[(index * 16):(index * 16) + 16]
- step3 += binascii.hexlify(aes.encrypt(block))
- # - step 4: make url
- url = "http://e-cdn-proxy-{}.deezer.com/mobile/1/{}".format(
- self.md5_origin[0], # first char of md5origin is cdn to use
- step3.decode('utf-8')
- )
- # get blowfish key
- hash = hashlib.new("md5")
- hash.update(str(self.song_id).encode())
- hash = hash.hexdigest()
- key = ""
- for i in range(16):
- key += chr(ord(hash[i]) ^ ord(hash[i+16]) ^ ord(demixed.utils.BLOWFISH_SECRET[i]))
- response = urlopen(url)
- cipher = blowfish.Cipher(key.encode())
- length = int(response.getheader('Content-Length'))
- i = 0
- while True:
- chunk = response.read(2048)
- if progress_callback != None:
- if (i * 2048) > length:
- progress_callback(1)
- else:
- progress_callback((i * 2048) / length)
- if not chunk:
- break
- if (i % 3) == 0 and len(chunk) == 2048:
- chunk = b"".join(
- cipher.decrypt_cbc(chunk, b"\x00\x01\x02\x03\x04\x05\x06\x07")
- )
- stream.write(chunk)
- i += 1
- def download(self,
- to_file = None,
- quality = None,
- print_progress = False):
- if quality == None:
- quality = self.guess_quality()
- super().download(
- file_format = demixed.track_quality.DeezerTrackQuality.get_file_format(quality),
- to_file = to_file,
- quality = quality,
- print_progress = print_progress
- )
|