trivial.go 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package rx
  2. type TrivialScheduler struct {
  3. EventLoop *EventLoop
  4. }
  5. func (sched TrivialScheduler) dispatch(ev event) {
  6. sched.EventLoop.dispatch(ev)
  7. }
  8. func (sched TrivialScheduler) run(effect Effect, ob *observer) {
  9. var terminated = false
  10. effect.action(sched, &observer {
  11. context: ob.context,
  12. next: func(x Object) {
  13. if !terminated && !ob.context.disposed {
  14. ob.next(x)
  15. }
  16. },
  17. error: func(e Object) {
  18. if !terminated && !ob.context.disposed {
  19. terminated = true
  20. ob.error(e)
  21. }
  22. },
  23. complete: func() {
  24. if !terminated && !ob.context.disposed {
  25. terminated = true
  26. ob.complete()
  27. }
  28. },
  29. })
  30. }
  31. func (sched TrivialScheduler) RunTopLevel(e Effect, r Receiver) {
  32. sched.EventLoop.commit(func() {
  33. sched.run(e, &observer {
  34. context: r.Context,
  35. next: func(x Object) {
  36. if r.Values != nil {
  37. r.Values <- x
  38. }
  39. },
  40. error: func(e Object) {
  41. if r.Error != nil {
  42. r.Error <- e
  43. close(r.Error)
  44. }
  45. },
  46. complete: func() {
  47. if r.Values != nil {
  48. close(r.Values)
  49. }
  50. },
  51. })
  52. })
  53. }