123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- package rx
- func (e Observable) ExhaustMap(f func(Object) Observable) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var ctx, ctx_dispose = ob.context.create_disposable_child()
- var idle = true
- var no_more_items = false
- sched.run(e, &observer {
- context: ctx,
- next: func(x Object) {
- if idle {
- idle = false
- var item = f(x)
- sched.run(item, &observer {
- context: ctx,
- next: func(x Object) {
- ob.next(x)
- },
- error: func(e Object) {
- ctx_dispose()
- ob.error(e)
- },
- complete: func() {
- idle = true
- if no_more_items {
- ctx_dispose()
- ob.complete()
- }
- },
- })
- }
- },
- error: func(e Object) {
- ctx_dispose()
- ob.error(e)
- },
- complete: func() {
- if idle {
- ctx_dispose()
- ob.complete()
- } else {
- no_more_items = true
- }
- },
- })
- } }
- }
|