sync.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package rx
  2. const sync_did_not_complete = "An action that assumed synchronous did not complete synchronously"
  3. func runSync(action Observable, sched Scheduler, error func(Object)) (Object,bool) {
  4. var returned = Optional {}
  5. var exception = Optional {}
  6. var completed = false
  7. sched.run(action, &observer {
  8. context: Background(), // chained sync action cannot be interrupted
  9. next: func(x Object) {
  10. if returned.HasValue {
  11. panic(single_multiple_return)
  12. }
  13. returned.HasValue = true
  14. returned.Value = x
  15. },
  16. error: func(e Object) {
  17. if returned.HasValue {
  18. panic(single_unexpected_exception)
  19. }
  20. exception.HasValue = true
  21. exception.Value = e
  22. },
  23. complete: func() {
  24. if !(returned.HasValue) {
  25. panic(single_zero_return)
  26. }
  27. completed = true
  28. },
  29. })
  30. if exception.HasValue {
  31. error(exception.Value)
  32. return nil, false
  33. } else if !(completed) {
  34. panic(sync_did_not_complete)
  35. } else if !(returned.HasValue) {
  36. panic("something went wrong")
  37. } else {
  38. return returned.Value, true
  39. }
  40. }
  41. func (e Observable) SyncThen(f func(Object)(Observable)) Observable {
  42. return Observable { func(sched Scheduler, ob *observer) {
  43. var x, ok = runSync(e, sched, ob.error)
  44. if ok {
  45. var next = f(x)
  46. sched.run(next, ob)
  47. }
  48. } }
  49. }
  50. func (e Observable) ChainSync(f func(Object)(Observable)) Observable {
  51. return Observable { func(sched Scheduler, ob *observer) {
  52. var x, ok = runSync(e, sched, ob.error)
  53. if ok {
  54. var next = f(x)
  55. var y, ok = runSync(next, sched, ob.error)
  56. if ok {
  57. ob.next(y)
  58. ob.complete()
  59. }
  60. }
  61. } }
  62. }
  63. func (e Observable) TakeOneAsSingleAssumeSync() Observable {
  64. return Observable { func(sched Scheduler, ob *observer) {
  65. var completed = false
  66. sched.run(e.TakeOneAsSingle(), &observer {
  67. context: ob.context,
  68. next: ob.next,
  69. error: ob.error,
  70. complete: func() {
  71. ob.complete()
  72. completed = true
  73. },
  74. })
  75. if !(completed) {
  76. panic(sync_did_not_complete)
  77. }
  78. } }
  79. }