merge.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package rx
  2. func Merge(effects []Effect) Effect {
  3. return Effect { func(sched Scheduler, ob *observer) {
  4. var ctx, dispose = ob.context.CreateChild()
  5. var c = new_collector(ob, dispose)
  6. for _, item := range effects {
  7. c.new_child()
  8. sched.run(item, &observer {
  9. context: ctx,
  10. next: func(x Object) {
  11. c.pass(x)
  12. },
  13. error: func(e Object) {
  14. c.throw(e)
  15. },
  16. complete: func() {
  17. c.delete_child()
  18. },
  19. })
  20. }
  21. c.parent_complete()
  22. } }
  23. }
  24. func (e Effect) MergeMap(f func(Object)Effect) Effect {
  25. return Effect { func(sched Scheduler, ob *observer) {
  26. var ctx, dispose = ob.context.CreateChild()
  27. var c = new_collector(ob, dispose)
  28. sched.run(e, &observer {
  29. context: ctx,
  30. next: func(x Object) {
  31. var item = f(x)
  32. c.new_child()
  33. sched.run(item, &observer {
  34. context: ctx,
  35. next: func(x Object) {
  36. c.pass(x)
  37. },
  38. error: func(e Object) {
  39. c.throw(e)
  40. },
  41. complete: func() {
  42. c.delete_child()
  43. },
  44. })
  45. },
  46. error: func(e Object) {
  47. c.throw(e)
  48. },
  49. complete: func() {
  50. c.parent_complete()
  51. },
  52. })
  53. } }
  54. }
  55. type collector struct {
  56. observer *observer
  57. dispose func()
  58. num_children uint
  59. no_more_children bool
  60. }
  61. func new_collector(ob *observer, dispose func()) *collector {
  62. return &collector {
  63. observer: ob,
  64. dispose: dispose,
  65. num_children: 0,
  66. no_more_children: false,
  67. }
  68. }
  69. func (c *collector) pass(x Object) {
  70. c.observer.next(x)
  71. }
  72. func (c *collector) throw(e Object) {
  73. c.observer.error(e)
  74. c.dispose()
  75. }
  76. func (c *collector) new_child() {
  77. c.num_children += 1
  78. }
  79. func (c *collector) delete_child() {
  80. if c.num_children == 0 { panic("something went wrong") }
  81. c.num_children -= 1
  82. if c.num_children == 0 && c.no_more_children {
  83. c.observer.complete()
  84. c.dispose()
  85. }
  86. }
  87. func (c *collector) parent_complete() {
  88. c.no_more_children = true
  89. if c.num_children == 0 {
  90. c.observer.complete()
  91. c.dispose()
  92. }
  93. }