check_links.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. """
  2. Check game URLs and repos for broken links
  3. """
  4. import asyncio
  5. import httpx
  6. from scripts.utils import games
  7. async def check_link(q):
  8. while True:
  9. game, url = await q.get()
  10. resp = httpx.get(url)
  11. if not resp.is_success:
  12. print(f"{url} returned {resp.status_code} ({game['name']})")
  13. q.task_done()
  14. async def main():
  15. q = asyncio.Queue()
  16. # start workers
  17. worker_tasks = []
  18. for i in range(10):
  19. task = asyncio.create_task(check_link(q))
  20. worker_tasks.append(task)
  21. # add urls to queue
  22. for game in games():
  23. print(f"Checking {game['name']}...")
  24. if "repo" in game:
  25. await q.put((game, game["repo"]))
  26. if "url" in game:
  27. await q.put((game, game["url"]))
  28. for img in game.get("images", []):
  29. await q.put((game, img))
  30. # TODO: check videos
  31. # wait for tasks to complete
  32. print("Waiting for workers...")
  33. await q.join()
  34. # stop workers
  35. for task in worker_tasks:
  36. task.cancel()
  37. await asyncio.gather(*worker_tasks, return_exceptions=True)
  38. if __name__ == "__main__":
  39. asyncio.run(main())