event_bus.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import asyncio
  2. from asyncio import Queue, Task
  3. from uuid import uuid4
  4. from openfreebuds.utils.logger import create_logger
  5. from openfreebuds.utils.stupid_rpc import rpc
  6. log = create_logger("EventBus")
  7. class Subscription:
  8. def __init__(self):
  9. self._callbacks: dict[str, tuple[list[str] | None, Queue]] = {}
  10. self._child_subs: dict[str, Task] = {}
  11. self.role: str = "standalone"
  12. @rpc
  13. async def send_message(self, kind, *args):
  14. for kind_filters, queue in self._callbacks.values():
  15. if kind_filters is not None and kind not in kind_filters:
  16. continue
  17. await queue.put((kind, *args))
  18. def include_subscription(self, callback_id: str, subscription):
  19. # Include another subscription into them, transfer their messages into us
  20. if callback_id in self._child_subs:
  21. self._child_subs[callback_id].cancel()
  22. async def _handler():
  23. # noinspection PyProtectedMember
  24. queue = subscription._new_queue(callback_id, None)
  25. while True:
  26. await self.send_message(*(await queue.get()))
  27. t = asyncio.create_task(_handler())
  28. self._child_subs[callback_id] = t
  29. def _new_queue(self, member_id, kind_filters):
  30. q = Queue()
  31. self._callbacks[member_id] = (kind_filters, q)
  32. return q
  33. @rpc
  34. async def subscribe(
  35. self,
  36. member_id: str = None,
  37. kind_filters: list[str] | None = None,
  38. ) -> str:
  39. if member_id is None:
  40. member_id = str(uuid4())
  41. self._new_queue(member_id, kind_filters)
  42. log.info(f"Add subscriber {member_id}")
  43. return member_id
  44. @rpc
  45. async def wait_for_event(self, member_id: str):
  46. return await self._callbacks[member_id][1].get()
  47. @rpc
  48. async def unsubscribe(self, member_id: str):
  49. log.info(f"Leave subscriber {member_id}")
  50. del self._callbacks[member_id]