event.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package rx
  2. import "runtime"
  3. const MinimumEventLoopBufferSize = 32768
  4. type event struct {
  5. kind event_kind
  6. payload Object
  7. observer *observer
  8. }
  9. type event_kind int
  10. const (
  11. ev_next event_kind = iota
  12. ev_error
  13. ev_complete
  14. )
  15. type task func()
  16. type EventLoop struct {
  17. event_channel chan event
  18. task_channel chan task
  19. }
  20. func SpawnEventLoop() *EventLoop {
  21. return SpawnEventLoopWithBufferSize(MinimumEventLoopBufferSize)
  22. }
  23. func SpawnEventLoopWithBufferSize(buf_size uint) *EventLoop {
  24. if buf_size < MinimumEventLoopBufferSize {
  25. buf_size = MinimumEventLoopBufferSize
  26. }
  27. var events = make(chan event, buf_size)
  28. var tasks = make(chan task, buf_size)
  29. go (func() {
  30. runtime.LockOSThread()
  31. for {
  32. select {
  33. case ev := <- events:
  34. switch ev.kind {
  35. case ev_next:
  36. ev.observer.next(ev.payload)
  37. case ev_error:
  38. ev.observer.error(ev.payload)
  39. case ev_complete:
  40. ev.observer.complete()
  41. }
  42. default:
  43. select {
  44. case t := <- tasks:
  45. t()
  46. default:
  47. continue
  48. }
  49. }
  50. }
  51. })()
  52. return &EventLoop {
  53. event_channel: events,
  54. task_channel: tasks,
  55. }
  56. }
  57. func (el *EventLoop) dispatch(ev event) {
  58. el.event_channel <- ev
  59. }
  60. func (el *EventLoop) commit(t task) {
  61. el.task_channel <- t
  62. }