exhaust.go 920 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package rx
  2. func (e Observable) ExhaustMap(f func(Object) Observable) Observable {
  3. return Observable { func(sched Scheduler, ob *observer) {
  4. var ctx, ctx_dispose = ob.context.create_disposable_child()
  5. var idle = true
  6. var no_more_items = false
  7. sched.run(e, &observer {
  8. context: ctx,
  9. next: func(x Object) {
  10. if idle {
  11. idle = false
  12. var item = f(x)
  13. sched.run(item, &observer {
  14. context: ctx,
  15. next: func(x Object) {
  16. ob.next(x)
  17. },
  18. error: func(e Object) {
  19. ctx_dispose()
  20. ob.error(e)
  21. },
  22. complete: func() {
  23. idle = true
  24. if no_more_items {
  25. ctx_dispose()
  26. ob.complete()
  27. }
  28. },
  29. })
  30. }
  31. },
  32. error: func(e Object) {
  33. ctx_dispose()
  34. ob.error(e)
  35. },
  36. complete: func() {
  37. if idle {
  38. ctx_dispose()
  39. ob.complete()
  40. } else {
  41. no_more_items = true
  42. }
  43. },
  44. })
  45. } }
  46. }