1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- package rx
- type TrivialScheduler struct {
- EventLoop *EventLoop
- }
- func (sched TrivialScheduler) dispatch(ev event) {
- sched.EventLoop.dispatch(ev)
- }
- func (sched TrivialScheduler) run(effect Effect, ob *observer) {
- var terminated = false
- effect.action(sched, &observer {
- context: ob.context,
- next: func(x Object) {
- if !terminated && !ob.context.disposed {
- ob.next(x)
- }
- },
- error: func(e Object) {
- if !terminated && !ob.context.disposed {
- terminated = true
- ob.error(e)
- }
- },
- complete: func() {
- if !terminated && !ob.context.disposed {
- terminated = true
- ob.complete()
- }
- },
- })
- }
- func (sched TrivialScheduler) RunTopLevel(e Effect, r Receiver) {
- sched.EventLoop.commit(func() {
- sched.run(e, &observer {
- context: r.Context,
- next: func(x Object) {
- if r.Values != nil {
- r.Values <- x
- }
- },
- error: func(e Object) {
- if r.Error != nil {
- r.Error <- e
- close(r.Error)
- }
- },
- complete: func() {
- if r.Values != nil {
- close(r.Values)
- }
- },
- })
- })
- }
|