123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- package rx
- type Object = interface{}
- type Observable struct {
- effect func(Scheduler, *observer)
- }
- type Scheduler interface {
- dispatch(event)
- commit(task)
- run(Observable, *observer)
- }
- type observer struct {
- context *Context
- next func(Object)
- error func(Object)
- complete func()
- }
- type Context struct {
- children map[*Context] struct{}
- disposed bool
- dispose chan struct{}
- cleaners [] func()
- }
- type disposeFunc func()
- var background = &Context {
- children: make(map[*Context] struct{}),
- disposed: false,
- dispose: nil,
- }
- func Background() *Context {
- return background
- }
- func (ctx *Context) disposable() bool {
- return (ctx != background)
- }
- func (ctx *Context) dispose_recursively() {
- if !(ctx.disposable()) { panic("something went wrong") }
- if !(ctx.disposed) {
- ctx.disposed = true
- for child, _ := range ctx.children {
- child.dispose_recursively()
- }
- close(ctx.dispose)
- }
- }
- func (ctx *Context) create_disposable_child() (*Context, disposeFunc) {
- var child = &Context {
- children: make(map[*Context] struct{}),
- disposed: false,
- dispose: make(chan struct{}),
- }
- if ctx.disposable() && ctx.disposed {
- child.dispose_recursively()
- return child, func() {}
- }
- ctx.children[child] = struct{}{}
- return child, func() {
- if !(child.disposed) {
- delete(ctx.children, child)
- child.dispose_recursively()
- for len(child.cleaners) > 0 {
- var l = len(child.cleaners)
- var last = (l - 1)
- child.cleaners[last]()
- child.cleaners[last] = nil
- child.cleaners = child.cleaners[:last]
- }
- }
- }
- }
- func (ctx *Context) DisposeSignal() (<- chan struct{}) {
- if ctx.disposable() {
- return ctx.dispose
- } else {
- return nil
- }
- }
- func (ctx *Context) AlreadyDisposed() bool {
- if ctx.disposable() {
- select {
- case <- ctx.dispose:
- return true
- default:
- return false
- }
- } else {
- return false
- }
- }
- func (ctx *Context) WaitDispose(k func()) {
- if ctx.disposable() {
- select {
- case <- ctx.dispose:
- k()
- }
- }
- }
- func (ctx *Context) append_cleaner(h func()) {
- if !(ctx.disposable()) { return }
- if !(ctx.disposed) {
- ctx.cleaners = append(ctx.cleaners, h)
- }
- }
- func WithCanceller(k func(cancel Observable) Observable) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var ctx, dispose = ob.context.create_disposable_child()
- var cancel = NewSync(func() (Object, bool) {
- // TODO: should throw an error when cancellation is triggered
- dispose()
- return nil, true
- })
- var e = k(cancel)
- sched.run(e, &observer {
- context: ctx,
- next: ob.next,
- error: ob.error,
- complete: ob.complete,
- })
- } }
- }
- type Sender struct {
- ob *observer
- sched Scheduler
- }
- type Receiver struct {
- Context *Context
- Values chan <- Object
- Error chan <- Object
- Terminate chan <- bool
- }
- func (s Sender) Context() *Context {
- return s.ob.context
- }
- func (s Sender) Scheduler() Scheduler {
- return s.sched
- }
- func (s Sender) Next(x Object) {
- s.sched.dispatch(event {
- kind: ev_next,
- payload: x,
- observer: s.ob,
- })
- }
- func (s Sender) Error(e Object) {
- s.sched.dispatch(event {
- kind: ev_error,
- payload: e,
- observer: s.ob,
- })
- }
- func (s Sender) Complete() {
- s.sched.dispatch(event {
- kind: ev_complete,
- payload: nil,
- observer: s.ob,
- })
- }
- func Schedule(action Observable, sched Scheduler, r Receiver) {
- sched.commit(func() {
- sched.run(action, &observer {
- context: r.Context,
- next: func(x Object) {
- if r.Values != nil {
- r.Values <- x
- }
- },
- error: func(e Object) {
- if r.Error != nil {
- r.Error <- e
- close(r.Error)
- }
- if r.Terminate != nil {
- r.Terminate <- false
- }
- },
- complete: func() {
- if r.Values != nil {
- close(r.Values)
- }
- if r.Terminate != nil {
- r.Terminate <- true
- }
- },
- })
- })
- }
- func ScheduleBackground(action Observable, sched Scheduler) {
- Schedule(action, sched, Receiver {
- Context: Background(),
- })
- }
- func ScheduleBackgroundWaitTerminate(action Observable, sched Scheduler) bool {
- var wait = make(chan bool)
- Schedule(action, sched, Receiver {
- Context: Background(),
- Terminate: wait,
- })
- return <- wait
- }
- func Noop() Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- ob.complete()
- } }
- }
- func NewGoroutine(action func(Sender)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- go action(Sender { sched: sched, ob: ob })
- } }
- }
- func NewGoroutineSingle(action func(ctx *Context)(Object,bool)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var sender = Sender { sched: sched, ob: ob }
- go (func() {
- var result, ok = action(sender.Context())
- if ok {
- sender.Next(result)
- sender.Complete()
- } else {
- sender.Error(result)
- }
- })()
- }}
- }
- func NewQueuedWithContext(w *Worker, action func(*Context)(Object,bool)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var sender = Sender { sched: sched, ob: ob }
- w.Do(func() {
- var result, ok = action(ob.context)
- if ok {
- sender.Next(result)
- sender.Complete()
- } else {
- sender.Error(result)
- }
- })
- } }
- }
- func NewQueued(w *Worker, action func()(Object,bool)) Observable {
- return NewQueuedWithContext(w, func(_ *Context) (Object, bool) {
- return action()
- })
- }
- func NewQueuedNoValue(w *Worker, action func()(bool,Object)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var sender = Sender { sched: sched, ob: ob }
- w.Do(func() {
- var ok, err = action()
- if ok {
- sender.Complete()
- } else {
- sender.Error(err)
- }
- })
- } }
- }
- func NewCallback(action func(func(Object))) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var sender = Sender { sched: sched, ob: ob }
- action(func(value Object) {
- sender.Next(value)
- sender.Complete()
- })
- }}
- }
- func NewSubscription(action func(func(Object))(func())) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var h = action(ob.next)
- if h != nil {
- ob.context.append_cleaner(h)
- }
- } }
- }
- func NewSubscriptionWithSender(action func(Sender)(func())) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var h = action(Sender { sched: sched, ob: ob })
- if h != nil {
- ob.context.append_cleaner(h)
- }
- } }
- }
- func NewSync(action func()(Object,bool)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var result, ok = action()
- if ok {
- ob.next(result)
- ob.complete()
- } else {
- ob.error(result)
- }
- } }
- }
- func NewSyncSequence(action func(func(Object))(bool,Object)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- var ok, err = action(ob.next)
- if ok {
- ob.complete()
- } else {
- ob.error(err)
- }
- } }
- }
- func NewSyncWithSender(action func(Sender)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- action(Sender { sched: sched, ob: ob })
- } }
- }
- func NewYield(values... Object) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- for _, value := range values {
- ob.next(value)
- }
- ob.complete()
- } }
- }
- func NewYieldEach[T any] (forEach func(func(T))) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- forEach(func(value T) {
- ob.next(value)
- })
- ob.complete()
- } }
- }
- func NewPersistent(value Object) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- ob.next(value)
- } }
- }
- func NewPersistentThunk(thunk func()(Object)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- ob.next(thunk())
- } }
- }
|