123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- package rx
- type KeyTrackedActionVector struct {
- HasKey func(key string) bool
- IterateKeys func(func(string))
- CloneKeys func() ([] string)
- GetAction func(key string, index_source Observable) Observable
- }
- func (e Observable) WithLatestFrom(source Observable) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var ctx, dispose = ob.context.create_disposable_child()
- var c = new_collector(ob, dispose)
- var current Optional
- c.new_child()
- sched.run(source, &observer {
- context: ctx,
- next: func(value Object) {
- current = Optional { true, value }
- },
- error: func(e Object) {
- c.throw(e)
- },
- complete: func() {
- c.delete_child()
- },
- })
- c.new_child()
- sched.run(e, &observer {
- context: ctx,
- next: func(obj Object) {
- c.pass(Pair { obj, current })
- },
- error: func(e Object) {
- c.throw(e)
- },
- complete: func() {
- c.delete_child()
- },
- })
- c.parent_complete()
- } }
- }
- func CombineLatest(actions ([] Observable)) Observable {
- if len(actions) == 0 {
- return NewYield([] Optional {})
- }
- return Observable { func(sched Scheduler, ob *observer) {
- var ctx, dispose = ob.context.create_disposable_child()
- var c = new_collector(ob, dispose)
- var values = make([] Optional, len(actions))
- for i_, e := range actions {
- var i = i_
- c.new_child()
- sched.run(e, &observer {
- context: ctx,
- next: func(obj Object) {
- var has_saved = &(values[i].HasValue)
- var saved_latest = &(values[i].Value)
- *saved_latest = obj
- *has_saved = true
- var values_clone = make([] Optional, len(values))
- copy(values_clone, values)
- c.pass(values_clone)
- },
- error: func(e Object) {
- c.throw(e)
- },
- complete: func() {
- c.delete_child()
- },
- })
- }
- c.parent_complete()
- } }
- }
- func CombineLatestWaitReady(actions ([] Observable)) Observable {
- return CombineLatest(actions).ConcatMap(func(values_ Object) Observable {
- var values = values_.([] Optional)
- var ready_values = make([] Object, len(values))
- var ok = true
- for i := 0; i < len(values); i += 1 {
- var opt = values[i]
- ready_values[i] = opt.Value
- if !(opt.HasValue) {
- ok = false
- }
- }
- if ok {
- return NewSyncSequence(func(next func(Object)) (bool, Object) {
- next(ready_values)
- return true, nil
- })
- } else {
- return NewSyncSequence(func(next func(Object)) (bool, Object) {
- return true, nil
- })
- }
- })
- }
- type ListCacheProvider struct {
- CreateMap func() Object
- MapLength func(*Object) uint
- MapLookup func(*Object, Object) (Object, bool)
- MapInsert func(*Object, Object, Object)
- MapDelete func(*Object, Object) // no-op when entry not present
- MapIterate func(*Object, func(Object,Object))
- CreateList func(uint) Object
- ListFill func(*Object, uint, Object)
- ListIterate func(*Object, func(uint,Object))
- }
- func (e Observable) ListMap(p *ListCacheProvider, f func(Object)(Observable)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var ctx, dispose = ob.context.create_disposable_child()
- var c = new_collector(ob, dispose)
- var running = p.CreateMap()
- var values = p.CreateMap()
- var keys = p.CreateMap()
- var first = true
- var emit_if_all_ready = func() {
- var ready_values = p.CreateList(p.MapLength(&keys))
- var all_ready = true
- p.MapIterate(&keys, func(key Object, i Object) {
- var index = i.(uint)
- var v, exists = p.MapLookup(&values, key)
- if exists {
- p.ListFill(&ready_values, index, v)
- } else {
- all_ready = false
- }
- })
- if all_ready {
- c.pass(ready_values)
- } else {
- // do nothing
- }
- }
- sched.run(e, &observer {
- context: ctx,
- next: func(new_keys_list Object) {
- var old_keys = keys
- var new_keys = p.CreateMap()
- p.ListIterate(&new_keys_list, func(index uint, key Object) {
- p.MapInsert(&new_keys, key, index)
- })
- var keys_changed = false
- p.MapIterate(&old_keys, func(key Object, _ Object) {
- var _, preserved = p.MapLookup(&new_keys, key)
- if !(preserved) {
- keys_changed = true
- var dispose_this, _ = p.MapLookup(&running, key)
- dispose_this.(disposeFunc)()
- p.MapDelete(&running, key)
- p.MapDelete(&values, key)
- }
- })
- var new_subscriptions = make([] func(), 0)
- p.MapIterate(&new_keys, func(key Object, index_ Object) {
- var index = index_.(uint)
- var old_index_, is_existing = p.MapLookup(&old_keys, key)
- if is_existing {
- var old_index = old_index_.(uint)
- if index != old_index {
- keys_changed = true
- }
- } else {
- keys_changed = true
- var this_ctx, this_dispose = ctx.create_disposable_child()
- p.MapInsert(&running, key, this_dispose)
- c.new_child()
- var this_action = f(key)
- var run = func() {
- sched.run(this_action, &observer {
- context: this_ctx,
- next: func(obj Object) {
- p.MapInsert(&values, key, obj)
- emit_if_all_ready()
- },
- error: func(e Object) {
- c.throw(e)
- },
- complete: func() {
- c.delete_child()
- },
- })
- }
- new_subscriptions = append(new_subscriptions, run)
- }
- })
- if keys_changed {
- keys = new_keys
- if len(new_subscriptions) == 0 {
- emit_if_all_ready()
- }
- } else {
- if first {
- c.pass(p.CreateList(0))
- }
- }
- first = false
- for _, subscribe := range new_subscriptions {
- // subscription should happen after `keys` updated
- subscribe()
- }
- },
- error: func(e Object) {
- c.throw(e)
- },
- complete: func() {
- c.parent_complete()
- },
- })
- } }
- }
|