track.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import hashlib
  2. import pyaes
  3. import blowfish
  4. import binascii
  5. from urllib.request import urlopen
  6. import demixed.utils
  7. from generic.track import Track
  8. import demixed.artist
  9. import demixed.album
  10. import demixed.track_quality
  11. class DeezerTrack(Track):
  12. def set_raw_data(self, song_data):
  13. try:
  14. date_value = song_data["PHYSICAL_RELEASE_DATE"]
  15. except:
  16. date_value = None
  17. self.set_values(
  18. title = song_data["SNG_TITLE"],
  19. song_id = song_data["SNG_ID"],
  20. artists = list(map(
  21. lambda a: demixed.artist.DeezerArtist.grab_or_create(
  22. a["ART_ID"],
  23. artist_id = a["ART_ID"],
  24. name = a["ART_NAME"],
  25. picture_url = demixed.utils.ARTIST_PICTURES_URL % a["ART_PICTURE"]
  26. ),
  27. song_data["ARTISTS"]
  28. )),
  29. album = demixed.album.DeezerAlbum.grab_or_create(
  30. song_data["ALB_ID"],
  31. album_id = song_data["ALB_ID"],
  32. title = song_data["ALB_TITLE"],
  33. date = date_value,
  34. picture_url = demixed.utils.ALBUM_PICTURES_URL % song_data["ALB_PICTURE"],
  35. picture_thumbnail_url = demixed.utils.ALBUM_PICTURES_URL_THUMBNAIL % song_data["ALB_PICTURE"]
  36. ),
  37. md5_origin = song_data["MD5_ORIGIN"],
  38. media_version = song_data["MEDIA_VERSION"],
  39. qualities = demixed.track_quality.DeezerTrackQuality.from_raw_data(song_data),
  40. disk_number = song_data["DISK_NUMBER"],
  41. track_number = song_data["TRACK_NUMBER"],
  42. isrc = song_data["ISRC"],
  43. is_full = True
  44. )
  45. def set_values(
  46. self,
  47. song_id = None,
  48. title = None,
  49. artists = None,
  50. album = None,
  51. md5_origin = None,
  52. media_version = None,
  53. qualities = None,
  54. is_full = False, # if the album is fully loaded
  55. disk_number = None,
  56. track_number = None,
  57. isrc = None,
  58. raw_data = None
  59. ):
  60. if raw_data != None:
  61. self.set_raw_data(raw_data)
  62. return
  63. self.song_id = song_id
  64. self.title = title
  65. self.artists = artists
  66. self.album = album
  67. self.md5_origin = md5_origin
  68. self.media_version = media_version
  69. self.qualities = qualities
  70. self.disk_number = disk_number
  71. self.track_number = track_number
  72. self.isrc = isrc
  73. self.is_full = is_full
  74. def get_full_data(self, deezer_instance):
  75. if not self.is_full:
  76. raw_data = deezer_instance.get_track_data(self.song_id)
  77. self.set_raw_data(raw_data)
  78. if self.artists != None:
  79. for artist in self.artists:
  80. if not artist.is_full:
  81. artist.get_full_data(deezer_instance)
  82. if self.album != None:
  83. if not album.is_full:
  84. self.album.get_full_data(deezer_instance)
  85. def stream(self, stream, quality = None, progress_callback = None):
  86. if quality == None:
  87. quality = self.guess_quality()
  88. quality_to_number = {"FLAC": "9", "MP3_320": "3", "MP3_128": "1"}
  89. try:
  90. quality = quality_to_number[quality]
  91. except:
  92. quality = quality_to_number[self.guess_quality()]
  93. file_format = demixed.track_quality.DeezerTrackQuality.get_file_format(quality)
  94. # get track url
  95. # - step 1: put a bunch of info together and hash it
  96. step1 = b'\xa4'.join(map(
  97. lambda s: s.encode(), [
  98. self.md5_origin,
  99. quality,
  100. str(self.song_id),
  101. self.media_version
  102. ]
  103. ))
  104. hash = hashlib.new("md5")
  105. hash.update(step1)
  106. step1_hash = hash.hexdigest()
  107. # - step 2: hash + info + add padding
  108. step2 = str.encode(step1_hash) + b'\xa4' + step1 + b'\xa4'
  109. while len(step2) % 16 > 0:
  110. step2 += b'.'
  111. # - step 3: AES encryption to get url
  112. # it will encrypt in parts of 16
  113. step3 = b''
  114. aes = pyaes.AESModeOfOperationECB(b'jo6aey6haid2Teih')
  115. for index in range(int(len(step2) / 16)):
  116. block = step2[(index * 16):(index * 16) + 16]
  117. step3 += binascii.hexlify(aes.encrypt(block))
  118. # - step 4: make url
  119. url = "http://e-cdn-proxy-{}.deezer.com/mobile/1/{}".format(
  120. self.md5_origin[0], # first char of md5origin is cdn to use
  121. step3.decode('utf-8')
  122. )
  123. # get blowfish key
  124. hash = hashlib.new("md5")
  125. hash.update(str(self.song_id).encode())
  126. hash = hash.hexdigest()
  127. key = ""
  128. for i in range(16):
  129. key += chr(ord(hash[i]) ^ ord(hash[i+16]) ^ ord(demixed.utils.BLOWFISH_SECRET[i]))
  130. response = urlopen(url)
  131. cipher = blowfish.Cipher(key.encode())
  132. length = int(response.getheader('Content-Length'))
  133. i = 0
  134. while True:
  135. chunk = response.read(2048)
  136. if progress_callback != None:
  137. if (i * 2048) > length:
  138. progress_callback(1)
  139. else:
  140. progress_callback((i * 2048) / length)
  141. if not chunk:
  142. break
  143. if (i % 3) == 0 and len(chunk) == 2048:
  144. chunk = b"".join(
  145. cipher.decrypt_cbc(chunk, b"\x00\x01\x02\x03\x04\x05\x06\x07")
  146. )
  147. stream.write(chunk)
  148. i += 1
  149. def download(self,
  150. to_file = None,
  151. quality = None,
  152. print_progress = False):
  153. if quality == None:
  154. quality = self.guess_quality()
  155. super().download(
  156. file_format = demixed.track_quality.DeezerTrackQuality.get_file_format(quality),
  157. to_file = to_file,
  158. quality = quality,
  159. print_progress = print_progress
  160. )