test_event_bus.py 572 B

123456789101112131415161718192021222324252627
  1. import asyncio
  2. import pytest
  3. from openfreebuds.utils.event_bus import Subscription
  4. @pytest.mark.asyncio
  5. async def test_event_bus_base():
  6. bus = Subscription()
  7. async def _recv():
  8. sub_id = await bus.subscribe()
  9. while True:
  10. async with asyncio.timeout(0.5):
  11. event = await bus.wait_for_event(sub_id)
  12. print("recv", event)
  13. t = asyncio.create_task(_recv())
  14. for i in range(4):
  15. await asyncio.sleep(0.25)
  16. print("send")
  17. await bus.send_message("Test", "message")
  18. t.cancel()