_ext.py 11 KB


  1. import copy
  2. import sys
  3. import pprint
  4. import os, os.path as op
  5. from dataclasses import dataclass
  6. from datetime import date, datetime, timedelta
  7. from collections import OrderedDict
  8. from functools import partial
  9. from typing import List, Dict
  10. from urllib.parse import urlparse
  11. import yaml
  12. from natsort import natsorted, ns
  13. from pykwalify.core import Core
  14. from slugify import slugify
  15. @dataclass
  16. class Game:
  17. item: Dict
  18. meta: Dict
  19. clones: List
  20. @property
  21. def names(self) -> List[str]:
  22. return names(self.item)
  23. @property
  24. def slug(self) -> str:
  25. return slugify(self.names[0])
  26. @property
  27. def wikilink(self) -> str:
  28. try:
  29. return "https://en.wikipedia.org/wiki/" + self.meta['external']['wikipedia']
  30. except KeyError:
  31. return self.meta['external']['website']
  32. def abort(msg):
  33. sys.stderr.write(msg + '\n')
  34. sys.exit(1)
  35. def validate(item, key):
  36. for name in names(item):
  37. if not (isinstance(name, str) or
  38. (len(name) == 2 and
  39. all(isinstance(x, str) for x in name))):
  40. abort('Error: %r should be a string or a list of two strings' % name)
  41. games = item[key]
  42. if (not isinstance(games, list) or
  43. not all(isinstance(x, dict) for x in games)):
  44. print('Error: this should be a list of dicts:')
  45. abort(pprint.pformat(games))
  46. return names, games
  47. def names(item):
  48. return [item['name']] + item.get('names', [])
  49. def game_name(game):
  50. return game['name'][0] if isinstance(game['name'], list) else game['name']
  51. def parse_tag(tag):
  52. return tag.replace(' ', '-').lower()
  53. def parse_unicode(text):
  54. if isinstance(text, str):
  55. return text
  56. if isinstance(text, (list, tuple)):
  57. result = []
  58. for item in text:
  59. result.append(parse_unicode(item))
  60. return result
  61. def parse_unicode_tag(tag):
  62. return parse_tag(parse_unicode(tag))
  63. def parse_tags(entry, keys):
  64. tags = []
  65. for key in keys:
  66. if key in entry:
  67. val = entry.get(key)
  68. if isinstance(val, str):
  69. tags.append(parse_tag(val))
  70. tags.append(parse_unicode_tag(val))
  71. elif isinstance(val, list):
  72. tags += [parse_tag(v) for v in val]
  73. tags += [parse_unicode_tag(v) for v in val]
  74. else:
  75. abort('Error: %s\'s key "%s" is not valid (%s)' %
  76. (entry['name'], key, type(val).__name__))
  77. result = []
  78. for tag in tags:
  79. if tag not in result:
  80. result.append(tag)
  81. return result
  82. def parse_global_tags(site, item, tag, item_key: str):
  83. if tag in item:
  84. if not getattr(site, tag, False):
  85. setattr(site, tag, {})
  86. if isinstance(item[tag], str):
  87. item[tag] = [item[tag]]
  88. for t in item[tag]:
  89. tagObj = getattr(site, tag, False)
  90. if not tagObj.get(t, False):
  91. tagObj[t] = {'tag_count': 0, 'keys': set()}
  92. if item_key not in tagObj[t]['keys']:
  93. tagObj[t]['tag_count'] += 1
  94. tagObj[t]['keys'].add(item_key)
  95. setattr(site, tag, OrderedDict(sorted(getattr(site, tag, {}).items())))
  96. def parse_item(entry, entry_tags=[], meta={}, meta_tags=[]):
  97. updated = entry.get('updated')
  98. if isinstance(updated, str):
  99. updated = datetime.strptime(updated, "%Y-%m-%d").date()
  100. added = entry.get('added') or date.min
  101. if isinstance(added, str):
  102. added = datetime.strptime(added, "%Y-%m-%d").date()
  103. result = dict(entry,
  104. new=added == updated and (date.today() - added) < timedelta(days=30),
  105. is_updated=(date.today() - updated) < timedelta(days=30),
  106. tags=parse_tags(entry, entry_tags) + parse_tags(meta, meta_tags),
  107. updated=updated)
  108. if "repo" in result:
  109. # Try to add extra repo information, like icons, badges
  110. repo_parsed = urlparse(result["repo"])
  111. domain = repo_parsed.netloc
  112. ext = os.path.splitext(result["repo"])[1]
  113. if domain == "github.com":
  114. try:
  115. # https://github.com/<user>/<repo>
  116. _, user, repo, *_ = repo_parsed.path.split("/")
  117. except ValueError:
  118. result["repoiconname"] = "github"
  119. result["repoiconstyle"] = "fab"
  120. result["repotitle"] = "GitHub"
  121. else:
  122. result["repobadge"] = f'<img class="badge lazyload" alt="GitHub stars" data-src="https://img.shields.io/github/stars/{user}/{repo}?style=flat-square&logo=github" src="https://img.shields.io/badge/stars-%3F-blue?style=flat-square&logo=github">'
  123. elif domain == "code.google.com":
  124. result["repoiconname"] = "google"
  125. result["repoiconstyle"] = "fab"
  126. result["repotitle"] = "Google Code"
  127. elif domain == "bitbucket.org":
  128. result["repoiconname"] = "bitbucket"
  129. result["repoiconstyle"] = "fab"
  130. result["repotitle"] = "Bitbucket"
  131. elif domain == "gitlab.com":
  132. try:
  133. # https://gitlab.com/<user>/<repo>
  134. _, user, repo, *_ = repo_parsed.path.split("/")
  135. except ValueError:
  136. result["repoiconname"] = "gitlab"
  137. result["repoiconstyle"] = "fab"
  138. result["repotitle"] = "GitLab"
  139. else:
  140. result["repobadge"] = f'<img class="badge lazyload" alt="GitLab stars" src="https://img.shields.io/badge/dynamic/json?color=green&label=stars&logo=gitlab&&query=%24.star_count&url=https%3A%2F%2Fgitlab.com%2Fapi%2Fv4%2Fprojects%2F{user}%252F{repo}">'
  141. elif domain == "sourceforge.net":
  142. try:
  143. # https://sourceforge.net/projects/<repo>
  144. _, _, repo, *_ = repo_parsed.path.split("/")
  145. except ValueError:
  146. pass
  147. else:
  148. result["repobadge"] = f'<img class="badge lazyload" alt="Sourceforge downloads" data-src="https://img.shields.io/sourceforge/dt/{repo}?style=flat-square&logo=sourceforge" src="https://img.shields.io/badge/downloads-%3F-brightgreen?style=flat-square&logo=sourceforge">'
  149. elif ext in (".gz", ".zip", ".tar", ".tgz", ".tbz2", ".bz2", ".xz", ".rar"):
  150. result["repoiconname"] = "box"
  151. result["repoiconstyle"] = "fas"
  152. result["repotitle"] = "Archive"
  153. return result
  154. def parse_items(site, item, key):
  155. if not (item.get(key) and validate(item, key)):
  156. return
  157. if not getattr(site, key, False):
  158. setattr(site, key, [])
  159. meta_tags = ['genres', 'subgenres', 'themes']
  160. game_tags = [
  161. 'status',
  162. 'development',
  163. 'langs',
  164. 'frameworks',
  165. 'content',
  166. 'licenses',
  167. 'multiplayer',
  168. 'type'
  169. ]
  170. meta = item.get('meta', {})
  171. meta["names_ascii"] = parse_unicode(names(item))
  172. meta["external"] = item.get('external', {})
  173. parse_global_tags(site, meta, 'genres', item['name'])
  174. parse_global_tags(site, meta, 'subgenres', item['name'])
  175. parse_global_tags(site, meta, 'themes', item['name'])
  176. parse_fn = partial(parse_item, entry_tags=game_tags, meta=meta, meta_tags=meta_tags)
  177. for game in item[key]:
  178. parse_global_tags(site, game, 'langs', game['name'])
  179. getattr(site, key).append(Game(item, meta, [parse_fn(i) for i in item[key]]))
  180. def show_error(game_name, error_str):
  181. print(f'\033[91m {game_name}\033[0m')
  182. print(f' {error_str}')
  183. def show_errors(errors):
  184. print('\n')
  185. for error in errors:
  186. show_error(error["name"], error["error"])
  187. print(f'\n {len(errors)} errors\n')
  188. sys.exit(1)
  189. def show_validation_errors(data, validation_errors):
  190. errors = []
  191. for error in validation_errors:
  192. path = error.path.split('/')
  193. game = data[int(path[1])]
  194. name = game_name(game)
  195. errors.append({"name": name, "error": error.__repr__()})
  196. show_errors(errors)
  197. def validate_with_schema(source_data, schema_file):
  198. core = Core(source_data=source_data, schema_files=[schema_file])
  199. try:
  200. core.validate(raise_exception=True)
  201. except Exception as error:
  202. if len(core.errors) > 0:
  203. show_validation_errors(source_data, core.errors)
  204. else:
  205. raise error
  206. def parse_data(site):
  207. base = op.dirname(__file__)
  208. originals = []
  209. for fn in os.listdir(op.join(base, 'originals')):
  210. if fn.endswith('.yaml'):
  211. originals.extend(yaml.safe_load(open(op.join(base, 'originals', fn), encoding="utf-8")))
  212. def sort_key(game):
  213. name = game_name(game)
  214. # Always sort SCUMM first
  215. if name == 'SCUMM':
  216. return '0'
  217. if name.startswith('The '):
  218. return name[4:]
  219. return name
  220. originals = natsorted(originals, key=sort_key, alg=ns.IGNORECASE)
  221. print(str(len(originals)) + ' games in total')
  222. validate_with_schema(originals, 'schema/originals.yaml')
  223. clones = []
  224. for fn in sorted(os.listdir(op.join(base, 'games'))):
  225. if fn.endswith('.yaml'):
  226. clones.extend(yaml.safe_load(open(op.join(base, 'games', fn), encoding="utf-8")))
  227. print(str(len(clones)) + ' clones in total')
  228. validate_with_schema(clones, 'schema/games.yaml')
  229. errors = []
  230. originals_map = {}
  231. for item in originals:
  232. name = game_name(item)
  233. if name in originals_map:
  234. errors.append({
  235. "name": name,
  236. "error": "Duplicate original game '%s'" % name
  237. })
  238. originals_map[name] = item
  239. if len(errors) > 0:
  240. show_errors(errors)
  241. def has_no_status(clone) -> bool:
  242. return clone["type"] not in ("official", "tool") and "status" not in clone
  243. for clone in clones:
  244. if 'originals' not in clone:
  245. show_errors([{
  246. "name": clone["name"],
  247. "error": "Unable to find 'remakes' or 'clones' in game"
  248. }])
  249. for original in clone['originals']:
  250. if original not in originals_map:
  251. errors.append({
  252. "name": clone["name"],
  253. "error": "Original game '%s' not found" % original
  254. })
  255. if isinstance(clone['updated'], str):
  256. clone['updated'] = datetime.strptime(clone['updated'], "%Y-%m-%d").date()
  257. if isinstance(clone.get('added'), str):
  258. clone['added'] = datetime.strptime(clone['added'], "%Y-%m-%d").date()
  259. if clone.get('added', date.min) > clone['updated']:
  260. errors.append({
  261. "name": clone['name'],
  262. "error": "Added date is after updated date"
  263. })
  264. if has_no_status(clone):
  265. errors.append({
  266. "name": clone['name'],
  267. "error": "Has no status field"
  268. })
  269. if len(errors) > 0:
  270. show_errors(errors)
  271. for item in originals:
  272. # Recombine originals and clones
  273. combined = copy.deepcopy(item)
  274. name = game_name(combined)
  275. combined['games'] = [
  276. clone for clone in clones
  277. if name in clone['originals']
  278. ]
  279. parse_items(site, combined, 'games')
  280. # Deduplicate clones by using a dictionary
  281. site.new_games = {
  282. clone['name']: (_names, meta, clone)
  283. for (_names, meta, clone) in sorted([
  284. (game.names, game.meta, clone)
  285. for game in site.games
  286. for clone in game.clones
  287. if clone['is_updated']
  288. ], key=lambda args: args[2]['updated'], reverse=True)
  289. }