rx.go 7.4 KB


  1. package rx
  2. type Object = interface{}
  3. type Observable struct {
  4. effect func(Scheduler, *observer)
  5. }
  6. type Scheduler interface {
  7. dispatch(event)
  8. commit(task)
  9. run(Observable, *observer)
  10. }
  11. type observer struct {
  12. context *Context
  13. next func(Object)
  14. error func(Object)
  15. complete func()
  16. }
  17. type Context struct {
  18. children map[*Context] struct{}
  19. disposed bool
  20. dispose chan struct{}
  21. cleaners [] func()
  22. }
  23. type disposeFunc func()
  24. var background = &Context {
  25. children: make(map[*Context] struct{}),
  26. disposed: false,
  27. dispose: nil,
  28. }
  29. func Background() *Context {
  30. return background
  31. }
  32. func (ctx *Context) disposable() bool {
  33. return (ctx != background)
  34. }
  35. func (ctx *Context) dispose_recursively() {
  36. if !(ctx.disposable()) { panic("something went wrong") }
  37. if !(ctx.disposed) {
  38. ctx.disposed = true
  39. for child, _ := range ctx.children {
  40. child.dispose_recursively()
  41. }
  42. close(ctx.dispose)
  43. }
  44. }
  45. func (ctx *Context) create_disposable_child() (*Context, disposeFunc) {
  46. var child = &Context {
  47. children: make(map[*Context] struct{}),
  48. disposed: false,
  49. dispose: make(chan struct{}),
  50. }
  51. if ctx.disposable() && ctx.disposed {
  52. child.dispose_recursively()
  53. return child, func() {}
  54. }
  55. ctx.children[child] = struct{}{}
  56. return child, func() {
  57. if !(child.disposed) {
  58. delete(ctx.children, child)
  59. child.dispose_recursively()
  60. for len(child.cleaners) > 0 {
  61. var l = len(child.cleaners)
  62. var last = (l - 1)
  63. child.cleaners[last]()
  64. child.cleaners[last] = nil
  65. child.cleaners = child.cleaners[:last]
  66. }
  67. }
  68. }
  69. }
  70. func (ctx *Context) DisposeSignal() (<- chan struct{}) {
  71. if ctx.disposable() {
  72. return ctx.dispose
  73. } else {
  74. return nil
  75. }
  76. }
  77. func (ctx *Context) AlreadyDisposed() bool {
  78. if ctx.disposable() {
  79. select {
  80. case <- ctx.dispose:
  81. return true
  82. default:
  83. return false
  84. }
  85. } else {
  86. return false
  87. }
  88. }
  89. func (ctx *Context) WaitDispose(k func()) {
  90. if ctx.disposable() {
  91. select {
  92. case <- ctx.dispose:
  93. k()
  94. }
  95. }
  96. }
  97. func (ctx *Context) append_cleaner(h func()) {
  98. if !(ctx.disposable()) { return }
  99. if !(ctx.disposed) {
  100. ctx.cleaners = append(ctx.cleaners, h)
  101. }
  102. }
  103. func WithCanceller(k func(cancel Observable) Observable) Observable {
  104. return Observable { func(sched Scheduler, ob *observer) {
  105. var ctx, dispose = ob.context.create_disposable_child()
  106. var cancel = NewSync(func() (Object, bool) {
  107. // TODO: should throw an error when cancellation is triggered
  108. dispose()
  109. return nil, true
  110. })
  111. var e = k(cancel)
  112. sched.run(e, &observer {
  113. context: ctx,
  114. next: ob.next,
  115. error: ob.error,
  116. complete: ob.complete,
  117. })
  118. } }
  119. }
  120. type Sender struct {
  121. ob *observer
  122. sched Scheduler
  123. }
  124. type Receiver struct {
  125. Context *Context
  126. Values chan <- Object
  127. Error chan <- Object
  128. Terminate chan <- bool
  129. }
  130. func (s Sender) Context() *Context {
  131. return s.ob.context
  132. }
  133. func (s Sender) Scheduler() Scheduler {
  134. return s.sched
  135. }
  136. func (s Sender) Next(x Object) {
  137. s.sched.dispatch(event {
  138. kind: ev_next,
  139. payload: x,
  140. observer: s.ob,
  141. })
  142. }
  143. func (s Sender) Error(e Object) {
  144. s.sched.dispatch(event {
  145. kind: ev_error,
  146. payload: e,
  147. observer: s.ob,
  148. })
  149. }
  150. func (s Sender) Complete() {
  151. s.sched.dispatch(event {
  152. kind: ev_complete,
  153. payload: nil,
  154. observer: s.ob,
  155. })
  156. }
  157. func Schedule(action Observable, sched Scheduler, r Receiver) {
  158. sched.commit(func() {
  159. sched.run(action, &observer {
  160. context: r.Context,
  161. next: func(x Object) {
  162. if r.Values != nil {
  163. r.Values <- x
  164. }
  165. },
  166. error: func(e Object) {
  167. if r.Error != nil {
  168. r.Error <- e
  169. close(r.Error)
  170. }
  171. if r.Terminate != nil {
  172. r.Terminate <- false
  173. }
  174. },
  175. complete: func() {
  176. if r.Values != nil {
  177. close(r.Values)
  178. }
  179. if r.Terminate != nil {
  180. r.Terminate <- true
  181. }
  182. },
  183. })
  184. })
  185. }
  186. func ScheduleBackground(action Observable, sched Scheduler) {
  187. Schedule(action, sched, Receiver {
  188. Context: Background(),
  189. })
  190. }
  191. func ScheduleBackgroundWaitTerminate(action Observable, sched Scheduler) bool {
  192. var wait = make(chan bool)
  193. Schedule(action, sched, Receiver {
  194. Context: Background(),
  195. Terminate: wait,
  196. })
  197. return <- wait
  198. }
  199. func Noop() Observable {
  200. return Observable { func(sched Scheduler, ob *observer) {
  201. ob.complete()
  202. } }
  203. }
  204. func NewGoroutine(action func(Sender)) Observable {
  205. return Observable { func(sched Scheduler, ob *observer) {
  206. go action(Sender { sched: sched, ob: ob })
  207. } }
  208. }
  209. func NewGoroutineSingle(action func(ctx *Context)(Object,bool)) Observable {
  210. return Observable { func(sched Scheduler, ob *observer) {
  211. var sender = Sender { sched: sched, ob: ob }
  212. go (func() {
  213. var result, ok = action(sender.Context())
  214. if ok {
  215. sender.Next(result)
  216. sender.Complete()
  217. } else {
  218. sender.Error(result)
  219. }
  220. })()
  221. }}
  222. }
  223. func NewQueuedWithContext(w *Worker, action func(*Context)(Object,bool)) Observable {
  224. return Observable { func(sched Scheduler, ob *observer) {
  225. var sender = Sender { sched: sched, ob: ob }
  226. w.Do(func() {
  227. var result, ok = action(ob.context)
  228. if ok {
  229. sender.Next(result)
  230. sender.Complete()
  231. } else {
  232. sender.Error(result)
  233. }
  234. })
  235. } }
  236. }
  237. func NewQueued(w *Worker, action func()(Object,bool)) Observable {
  238. return NewQueuedWithContext(w, func(_ *Context) (Object, bool) {
  239. return action()
  240. })
  241. }
  242. func NewQueuedNoValue(w *Worker, action func()(bool,Object)) Observable {
  243. return Observable { func(sched Scheduler, ob *observer) {
  244. var sender = Sender { sched: sched, ob: ob }
  245. w.Do(func() {
  246. var ok, err = action()
  247. if ok {
  248. sender.Complete()
  249. } else {
  250. sender.Error(err)
  251. }
  252. })
  253. } }
  254. }
  255. func NewCallback(action func(func(Object))) Observable {
  256. return Observable { func(sched Scheduler, ob *observer) {
  257. var sender = Sender { sched: sched, ob: ob }
  258. action(func(value Object) {
  259. sender.Next(value)
  260. sender.Complete()
  261. })
  262. }}
  263. }
  264. func NewSubscription(action func(func(Object))(func())) Observable {
  265. return Observable { func(sched Scheduler, ob *observer) {
  266. var h = action(ob.next)
  267. if h != nil {
  268. ob.context.append_cleaner(h)
  269. }
  270. } }
  271. }
  272. func NewSubscriptionWithSender(action func(Sender)(func())) Observable {
  273. return Observable { func(sched Scheduler, ob *observer) {
  274. var h = action(Sender { sched: sched, ob: ob })
  275. if h != nil {
  276. ob.context.append_cleaner(h)
  277. }
  278. } }
  279. }
  280. func NewSync(action func()(Object,bool)) Observable {
  281. return Observable { func(sched Scheduler, ob *observer) {
  282. var result, ok = action()
  283. if ok {
  284. ob.next(result)
  285. ob.complete()
  286. } else {
  287. ob.error(result)
  288. }
  289. } }
  290. }
  291. func NewSyncSequence(action func(func(Object))(bool,Object)) Observable {
  292. return Observable { func(sched Scheduler, ob *observer) {
  293. var ok, err = action(ob.next)
  294. if ok {
  295. ob.complete()
  296. } else {
  297. ob.error(err)
  298. }
  299. } }
  300. }
  301. func NewSyncWithSender(action func(Sender)) Observable {
  302. return Observable { func(sched Scheduler, ob *observer) {
  303. action(Sender { sched: sched, ob: ob })
  304. } }
  305. }
  306. func NewYield(values... Object) Observable {
  307. return Observable { func(sched Scheduler, ob *observer) {
  308. for _, value := range values {
  309. ob.next(value)
  310. }
  311. ob.complete()
  312. } }
  313. }
  314. func NewYieldEach[T any] (forEach func(func(T))) Observable {
  315. return Observable { func(sched Scheduler, ob *observer) {
  316. forEach(func(value T) {
  317. ob.next(value)
  318. })
  319. ob.complete()
  320. } }
  321. }
  322. func NewPersistent(value Object) Observable {
  323. return Observable { func(sched Scheduler, ob *observer) {
  324. ob.next(value)
  325. } }
  326. }
  327. func NewPersistentThunk(thunk func()(Object)) Observable {
  328. return Observable { func(sched Scheduler, ob *observer) {
  329. ob.next(thunk())
  330. } }
  331. }