rx.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package rx
  2. import "context"
  3. type Object = interface{}
  4. type Effect struct {
  5. action func(Scheduler, *observer)
  6. }
  7. type Scheduler interface {
  8. dispatch(event)
  9. run(Effect, *observer)
  10. RunTopLevel(Effect, Receiver)
  11. }
  12. type observer struct {
  13. context *Context
  14. next func(Object)
  15. error func(Object)
  16. complete func()
  17. }
  18. type Context struct {
  19. raw context.Context
  20. disposed bool
  21. children map[*Context] struct{}
  22. }
  23. type Dispose func()
  24. func Background() *Context {
  25. return &Context {
  26. raw: context.Background(),
  27. children: make(map[*Context] struct{}),
  28. }
  29. }
  30. func (ctx *Context) CreateChild() (*Context, Dispose) {
  31. var dispose_recursively func(*Context)
  32. dispose_recursively = func(ctx *Context) {
  33. ctx.disposed = true
  34. for child, _ := range ctx.children {
  35. dispose_recursively(child)
  36. }
  37. }
  38. var child_raw, cancel_raw = context.WithCancel(ctx.raw)
  39. var child = &Context {
  40. raw: child_raw,
  41. disposed: false,
  42. children: make(map[*Context] struct{}),
  43. }
  44. ctx.children[child] = struct{}{}
  45. return child, func() {
  46. if ctx.disposed { return }
  47. delete(ctx.children, child)
  48. dispose_recursively(ctx)
  49. cancel_raw()
  50. }
  51. }
  52. type Sender struct {
  53. raw *observer
  54. sched Scheduler
  55. }
  56. type Receiver struct {
  57. Context *Context
  58. Values chan <- Object
  59. Error chan <- Object
  60. }
  61. func (s Sender) Context() context.Context {
  62. return s.raw.context.raw
  63. }
  64. func (s Sender) Next(x Object) {
  65. s.sched.dispatch(event {
  66. kind: ev_next,
  67. payload: x,
  68. observer: s.raw,
  69. })
  70. }
  71. func (s Sender) Error(e Object) {
  72. s.sched.dispatch(event {
  73. kind: ev_error,
  74. payload: e,
  75. observer: s.raw,
  76. })
  77. }
  78. func (s Sender) Complete() {
  79. s.sched.dispatch(event {
  80. kind: ev_complete,
  81. payload: nil,
  82. observer: s.raw,
  83. })
  84. }
  85. func CreateEffect(action func(Sender)) Effect {
  86. return Effect { func (sched Scheduler, ob *observer) {
  87. go action(Sender { sched: sched, raw: ob })
  88. } }
  89. }
  90. func CreateBlockingEffect(action func()([]Object,error)) Effect {
  91. return Effect { func (sched Scheduler, ob *observer) {
  92. var result, err = action()
  93. if err != nil {
  94. ob.error(err)
  95. } else {
  96. for _, item := range result {
  97. ob.next(item)
  98. }
  99. ob.complete()
  100. }
  101. } }
  102. }