chain.js 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766
  1. pour(built_in_functions, {
  2. subscribe: f (
  3. 'subscribe',
  4. 'function subscribe (o: Observable, s: Subscriber) -> Arity<0>',
  5. (o, s) => obsv(o).subscribe(s),
  6. 'function subscribe (o: Observable, f: Arity<1>) -> Arity<0>',
  7. (o, f) => obsv(o).subscribe(new_struct(Types.Subscriber, {
  8. next: f
  9. })),
  10. 'function subscribe (o: Observable, f: Arity<0>) -> Arity<0>',
  11. (o, f) => obsv(o).subscribe(new_struct(Types.Subscriber, {
  12. next: fun (
  13. 'function callback (_: Any) -> Object',
  14. _ => call(f, [])
  15. )
  16. }))
  17. ),
  18. iter2obsv: fun (
  19. 'function iter2obsv (i: Iterable) -> Observer',
  20. i => observer(push => {
  21. for (let e of iter(i)) {
  22. push(e)
  23. }
  24. push(Complete)
  25. }),
  26. ),
  27. seq: f (
  28. 'seq',
  29. 'function seq (n: Size) -> Iterator',
  30. n => count(n),
  31. 'function seq (start: Index, amount: Size) -> Iterator',
  32. (start, amount) => map(count(amount), i => start + i)
  33. ),
  34. repeat: fun (
  35. 'function repeat (object: Any, n: Size) -> Iterator',
  36. (object, n) => (function* () {
  37. for (let i = 0; i < n; i++) {
  38. yield object
  39. }
  40. })()
  41. ),
  42. concat: f (
  43. 'concat',
  44. 'function concat (o1: Observable, o2: Observable) -> Observer',
  45. (o1, o2) => observer(push => {
  46. let unsub = obsv(o1).subscribe(subs({
  47. next: x => push(x),
  48. error: e => push(e),
  49. complete: () => {
  50. unsub = obsv(o2).subscribe(subs({
  51. next: x => push(x),
  52. error: e => push(e),
  53. complete: () => push(Complete)
  54. }))
  55. }
  56. }))
  57. return () => unsub()
  58. }),
  59. 'function concat (i1: Iterable, i2: Iterable) -> Iterator',
  60. (i1, i2) => (function* () {
  61. for (let e of iter(i1)) {
  62. yield e
  63. }
  64. for (let e of iter(i2)) {
  65. yield e
  66. }
  67. })()
  68. ),
  69. merge: fun (
  70. 'function merge (o1: Observable, o2: Observable) -> Observer',
  71. (o1, o2) => observer(push => {
  72. let f1 = false
  73. let f2 = false
  74. let nosub2 = false
  75. let unsub1 = obsv(o1).subscribe(subs({
  76. next: x => push(x),
  77. error: e => {
  78. if (typeof unsub2 == 'undefined') {
  79. unsub2()
  80. } else {
  81. nosub2 = true
  82. }
  83. push(e)
  84. },
  85. complete: () => {
  86. f1 = true
  87. if (f2) {
  88. push(Complete)
  89. }
  90. }
  91. }))
  92. let unsub2 = !nosub2? obsv(o2).subscribe(subs({
  93. next: x => push(x),
  94. error: e => {
  95. unsub1()
  96. push(e)
  97. },
  98. complete: () => {
  99. f2 = true
  100. if (f1) {
  101. push(Complete)
  102. }
  103. }
  104. })): () => Void
  105. return () => { unsub1(); unsub2() }
  106. })
  107. ),
  108. range: f (
  109. 'range',
  110. 'function range (begin: Index, end: Index) -> Iterator',
  111. (begin, end) => {
  112. ensure(begin <= end, 'invalid_range', begin, end)
  113. return (function* () {
  114. for (let i = begin; i < end; i += 1) {
  115. yield i
  116. }
  117. })()
  118. },
  119. 'function range (begin: Index, end: Index, step: Size) -> Iterator',
  120. (begin, end, step) => {
  121. ensure(begin <= end, 'invalid_range', begin, end)
  122. return (function* () {
  123. for (let i = begin; i < end; i += step) {
  124. yield i
  125. }
  126. })()
  127. }
  128. ),
  129. map: f (
  130. 'map',
  131. 'function map (o: Observable, f: Arity<2>) -> Observer',
  132. (o, f) => observer(push => {
  133. let i = 0
  134. return obsv(o).subscribe(subs({
  135. next: x => {
  136. push(call(f, [x, i]))
  137. i += 1
  138. },
  139. error: e => push(e),
  140. complete: () => push(Complete)
  141. }))
  142. }),
  143. 'function map (o: Observable, f: Arity<1>) -> Observer',
  144. (o, f) => observer(push => obsv(o).subscribe(subs({
  145. next: x => push(call(f, [x])),
  146. error: e => push(e),
  147. complete: () => push(Complete)
  148. }))),
  149. 'function map (i: Iterable, f: Arity<2>) -> Iterator',
  150. (i, f) => map(iter(i), (e, n) => call(f, [e, n])),
  151. 'function map (i: Iterable, f: Arity<1>) -> Iterator',
  152. (i, f) => map(iter(i), e => call(f, [e]))
  153. ),
  154. filter: f (
  155. 'filter',
  156. 'function filter (o: Observable, f: Arity<2>) -> Observer',
  157. (o, f) => observer(push => {
  158. let i = 0
  159. return obsv(o).subscribe(subs({
  160. next: x => {
  161. let ok = call(f, [x, i])
  162. ensure(is(ok, Types.Bool), 'filter_not_bool')
  163. if (ok) {
  164. push(x)
  165. }
  166. i += 1
  167. },
  168. error: e => push(e),
  169. complete: () => push(Complete)
  170. }))
  171. }),
  172. 'function filter (o: Observable, f: Arity<1>) -> Observer',
  173. (o, f) => observer(push => obsv(o).subscribe(subs({
  174. next: x => {
  175. let ok = call(f, [x])
  176. ensure(is(ok, Types.Bool), 'filter_not_bool')
  177. if (ok) {
  178. push(x)
  179. }
  180. },
  181. error: e => push(e),
  182. complete: () => push(Complete)
  183. }))),
  184. 'function filter (i: Iterable, f: Arity<2>) -> Iterator',
  185. (i, f) => filter(iter(i), (e, n) => {
  186. let ok = call(f, [e, n])
  187. ensure(is(ok, Types.Bool), 'filter_not_bool')
  188. return ok
  189. }),
  190. 'function filter (i: Iterable, f: Arity<1>) -> Iterator',
  191. (i, f) => filter(iter(i), e => {
  192. let ok = call(f, [e])
  193. ensure(is(ok, Types.Bool), 'filter_not_bool')
  194. return ok
  195. })
  196. ),
  197. take: f (
  198. 'take',
  199. 'function take (o: Observable, f: Arity<2>) -> Observer',
  200. (o, f) => observer(push => {
  201. let i = 0
  202. let unsub = obsv(o).subscribe(subs({
  203. next: x => {
  204. let ok = call(f, [x, i])
  205. ensure(is(ok, Types.Bool), 'filter_not_bool')
  206. if (ok) {
  207. push(x)
  208. } else {
  209. unsub()
  210. push(Complete)
  211. }
  212. i += 1
  213. },
  214. error: e => push(e),
  215. complete: () => push(Complete)
  216. }))
  217. return unsub
  218. }),
  219. 'function take (o: Observable, f: Arity<1>) -> Observer',
  220. (o, f) => observer(push => {
  221. let unsub = obsv(o).subscribe(subs({
  222. next: x => {
  223. let ok = call(f, [x])
  224. ensure(is(ok, Types.Bool), 'filter_not_bool')
  225. if (ok) {
  226. push(x)
  227. } else {
  228. unsub()
  229. push(Complete)
  230. }
  231. },
  232. error: e => push(e),
  233. complete: () => push(Complete)
  234. }))
  235. return unsub
  236. }),
  237. 'function take (i: Iterable, f: Arity<2>) -> Iterator',
  238. (i, f) => (function* () {
  239. let n = 0
  240. for (let e of iter(i)) {
  241. let ok = call(f, [e, n])
  242. ensure(is(ok, Types.Bool), 'filter_not_bool')
  243. if (ok) {
  244. yield e
  245. n += 1
  246. } else {
  247. break
  248. }
  249. }
  250. })(),
  251. 'function take (i: Iterable, f: Arity<1>) -> Iterator',
  252. (i, f) => (function* () {
  253. for (let e of iter(i)) {
  254. let ok = call(f, [e])
  255. ensure(is(ok, Types.Bool), 'filter_not_bool')
  256. if (ok) {
  257. yield e
  258. } else {
  259. break
  260. }
  261. }
  262. })()
  263. ),
  264. until: fun (
  265. 'function until (o: Observable, signal: Observable) -> Observer',
  266. (o, signal) => observer(push => {
  267. let unsub = () => Void
  268. let unsub_signal = () => Void
  269. let unsub_all = () => { unsub(); unsub_signal() }
  270. unsub = obsv(o).subscribe(subs({
  271. next: x => push(x),
  272. error: e => { unsub_signal(); push(e) },
  273. complete: () => { unsub_signal(); push(Complete) }
  274. }))
  275. unsub_signal = obsv(signal).subscribe(subs({
  276. next: _ => { unsub_all(); push(Complete) },
  277. error: e => { unsub(); push(e) },
  278. complete: () => Void
  279. }))
  280. return unsub_all
  281. })
  282. ),
  283. find: f (
  284. 'find',
  285. 'function find (i: Iterable, T: Type) -> Object',
  286. (i, T) => {
  287. let r = find(iter(i), e => call(operator_is, [e, T]))
  288. return (r !== NotFound)? r: Types.NotFound
  289. },
  290. 'function find (i: Iterable, f: Arity<2>) -> Object',
  291. (i, f) => {
  292. let r = find(iter(i), (e, n) => {
  293. let c = call(f, [e, n])
  294. ensure(is(c, Types.Bool), 'cond_not_bool')
  295. return c
  296. })
  297. return (r !== NotFound)? r: Types.NotFound
  298. },
  299. 'function find (i: Iterable, f: Arity<1>) -> Object',
  300. (i, f) => {
  301. let r = find(iter(i), e => {
  302. let c = call(f, [e])
  303. ensure(is(c, Types.Bool), 'cond_not_bool')
  304. return c
  305. })
  306. return (r !== NotFound)? r: Types.NotFound
  307. }
  308. ),
  309. count: fun (
  310. 'function count (i: Iterable) -> Size',
  311. i => fold(iter(i), 0, (_, v) => v+1)
  312. ),
  313. scan: f (
  314. 'scan',
  315. 'function scan (o: Observable, base: Any, f: Arity<3>) -> Observer',
  316. (o, base, f) => observer(push => {
  317. let v = base
  318. let n = 0
  319. return obsv(o).subscribe(subs({
  320. next: x => {
  321. v = push(call(f, [x, v, n]))
  322. n += 1
  323. },
  324. error: e => push(e),
  325. complete: () => push(Complete)
  326. }))
  327. }),
  328. 'function scan (o: Observable, base: Any, f: Arity<2>) -> Observer',
  329. (o, base, f) => observer(push => {
  330. let v = base
  331. return obsv(o).subscribe(subs({
  332. next: x => {
  333. v = push(call(f, [x, v]))
  334. },
  335. error: e => push(e),
  336. complete: () => push(Complete)
  337. }))
  338. }),
  339. 'function scan (i: Iterable, base: Any, f: Arity<3>) -> Iterator',
  340. (i, base, f) => (function* () {
  341. let v = base
  342. let n = 0
  343. for (let e of iter(i)) {
  344. v = call(f, [e, v, n])
  345. yield v
  346. n += 1
  347. }
  348. })(),
  349. 'function scan (i: Iterable, base: Any, f: Arity<2>) -> Iterator',
  350. (i, base, f) => (function* () {
  351. let v = base
  352. for (let e of iter(i)) {
  353. v = call(f, [e, v])
  354. yield v
  355. }
  356. })()
  357. ),
  358. fold: f (
  359. 'fold',
  360. 'function fold (o: Observable, base: Any, f: Arity<3>) -> Observer',
  361. (o, base, f) => observer(push => {
  362. let v = base
  363. let n = 0
  364. return obsv(o).subscribe(subs({
  365. next: x => {
  366. v = call(f, [x, v, n])
  367. n += 1
  368. },
  369. error: e => push(e),
  370. complete: () => {
  371. push(v)
  372. push(Complete)
  373. }
  374. }))
  375. }),
  376. 'function fold (o: Observable, base: Any, f: Arity<2>) -> Observer',
  377. (o, base, f) => observer(push => {
  378. let v = base
  379. return obsv(o).subscribe(subs({
  380. next: x => {
  381. v = call(f, [x, v])
  382. },
  383. error: e => push(e),
  384. complete: () => {
  385. push(v)
  386. push(Complete)
  387. }
  388. }))
  389. }),
  390. 'function fold (i: Iterable, base: Any, f: Arity<3>) -> Object',
  391. (i, base, f) => fold(iter(i), base, (e, v, n) => {
  392. return call(f, [e, v, n])
  393. }),
  394. 'function fold (i: Iterable, base: Any, f: Arity<2>) -> Object',
  395. (i, base, f) => fold(iter(i), base, (e,v) => {
  396. return call(f, [e, v])
  397. })
  398. ),
  399. every: f (
  400. 'every',
  401. 'function every (i: Iterable, f: Arity<2>) -> Bool',
  402. (i, f) => forall(iter(i), (e, i) => {
  403. let v = call(f, [e, i])
  404. ensure(is(v, Types.Bool), 'cond_not_bool')
  405. return v
  406. }),
  407. 'function every (i: Iterable, f: Arity<1>) -> Bool',
  408. (i, f) => forall(iter(i), e => {
  409. let v = call(f, [e])
  410. ensure(is(v, Types.Bool), 'cond_not_bool')
  411. return v
  412. })
  413. ),
  414. some: f (
  415. 'some',
  416. 'function some (i: Iterable, f: Arity<2>) -> Bool',
  417. (i, f) => exists(iter(i), (e, i) => {
  418. let v = call(f, [e, i])
  419. ensure(is(v, Types.Bool), 'cond_not_bool')
  420. return v
  421. }),
  422. 'function some (i: Iterable, f: Arity<1>) -> Bool',
  423. (i, f) => exists(iter(i), e => {
  424. let v = call(f, [e])
  425. ensure(is(v, Types.Bool), 'cond_not_bool')
  426. return v
  427. })
  428. ),
  429. join: fun (
  430. 'function join (i: Iterable, sep: String) -> String',
  431. (i, sep) => {
  432. let string = ''
  433. let first = true
  434. for (let e of i) {
  435. ensure(is(e, Types.String), 'element_not_string')
  436. if (first) {
  437. first = false
  438. } else {
  439. string += sep
  440. }
  441. string += e
  442. }
  443. return string
  444. }
  445. ),
  446. reversed: fun (
  447. 'function reversed (i: Iterable) -> Iterator',
  448. i => (function* () {
  449. let buf = []
  450. for (let e of iter(i)) {
  451. buf.push(e)
  452. }
  453. for (let e of rev(buf)) {
  454. yield e
  455. }
  456. })(),
  457. 'function reversed (l: List) -> Iterator',
  458. l => rev(l)
  459. ),
  460. flat: f (
  461. 'flat',
  462. 'function flat (o: Observable, limit: PosInt) -> Observer',
  463. (o, limit) => observer(push => {
  464. let unsub = new Set()
  465. let waiting = []
  466. let stopped = false
  467. let stop = () => Void
  468. let source_complete = false
  469. let unsub_source = obsv(o).subscribe(subs({
  470. next: function next_callback (x) {
  471. if (stopped) { return }
  472. let ok = is(x, Types.Observable)
  473. if (!ok) {
  474. stop()
  475. ensure(false, 'value_not_observable')
  476. }
  477. if (unsub.size < limit) {
  478. let sync = true
  479. let sync_complete = false
  480. let unsub_i = obsv(x).subscribe(subs({
  481. next: y => push(y),
  482. error: e => { stop(); push(e) },
  483. complete: () => {
  484. if (!sync) {
  485. unsub.delete(unsub_i)
  486. } else {
  487. sync_complete = true
  488. }
  489. if (waiting.length > 0) {
  490. next_callback(waiting.shift())
  491. } else if (source_complete) {
  492. if (unsub.size == 0) {
  493. push(Complete)
  494. }
  495. }
  496. }
  497. }))
  498. sync = false
  499. if (!sync_complete) {
  500. unsub.add(unsub_i)
  501. }
  502. } else {
  503. waiting.push(x)
  504. }
  505. },
  506. error: e => { stop(); push(e) },
  507. complete: () => {
  508. source_complete = true
  509. }
  510. }))
  511. stop = () => {
  512. unsub_source()
  513. foreach(unsub, u => u())
  514. unsub.clear()
  515. waiting = []
  516. stopped = true
  517. }
  518. return stop
  519. }),
  520. 'function flat (o: Observable) -> Observer',
  521. o => observer(push => {
  522. let unsub = new Set()
  523. let stopped = false
  524. let stop = () => Void
  525. let source_complete = false
  526. let unsub_source = obsv(o).subscribe(subs({
  527. next: x => {
  528. if (stopped) { return }
  529. let ok = is(x, Types.Observable)
  530. if (!ok) {
  531. stop()
  532. ensure(false, 'value_not_observable')
  533. }
  534. let sync = true
  535. let sync_complete = false
  536. let unsub_i = obsv(x).subscribe(subs({
  537. next: y => push(y),
  538. error: e => { stop(); push(e) },
  539. complete: () => {
  540. if (!sync) {
  541. unsub.delete(unsub_i)
  542. } else {
  543. sync_complete = true
  544. }
  545. if (source_complete && unsub.size == 0) {
  546. push(Complete)
  547. }
  548. }
  549. }))
  550. sync = false
  551. if (!sync_complete) {
  552. unsub.add(unsub_i)
  553. }
  554. },
  555. error: e => { stop(); push(e) },
  556. complete: () => {
  557. source_complete = true
  558. }
  559. }))
  560. stop = () => {
  561. unsub_source()
  562. foreach(unsub, u => u())
  563. unsub.clear()
  564. stopped = true
  565. }
  566. return stop
  567. }),
  568. 'function flat (i: Iterable) -> Iterator',
  569. i => (function* () {
  570. for (let e of iter(i)) {
  571. ensure(is(e, Types.Iterable), 'element_not_iterable')
  572. for (let ee of iter(e)) {
  573. yield ee
  574. }
  575. }
  576. })()
  577. ),
  578. tap: f (
  579. 'tap',
  580. 'function tap (o: Observable, f: Arity<2>) -> Observer',
  581. (o, f) => observer(push => {
  582. let i = 0
  583. return obsv(o).subscribe(subs({
  584. next: x => {
  585. call(f, [x, i])
  586. push(x)
  587. i += 1
  588. },
  589. error: e => push(e),
  590. complete: () => push(Complete)
  591. }))
  592. }),
  593. 'function tap (o: Observable, f: Arity<1>) -> Observer',
  594. (o, f) => observer(push => obsv(o).subscribe(subs({
  595. next: x => {
  596. call(f, [x])
  597. push(x)
  598. },
  599. error: e => push(e),
  600. complete: () => push(Complete)
  601. }))),
  602. 'function tap (i: Iterable, f: Arity<2>) -> Iterator',
  603. (i, f) => (function* () {
  604. let n = 0
  605. for (let e of iter(i)) {
  606. call(f, [e, n])
  607. yield e
  608. n += 1
  609. }
  610. })(),
  611. 'function tap (i: Iterable, f: Arity<1>) -> Iterator',
  612. (i, f) => (function* () {
  613. for (let e of iter(i)) {
  614. call(f, [e])
  615. yield e
  616. }
  617. })()
  618. ),
  619. zip: f (
  620. 'zip',
  621. 'function zip (o1: Observable, o2: Observable) -> Observer',
  622. (o1, o2) => observer(push => {
  623. let q1 = []
  624. let q2 = []
  625. let f1 = false
  626. let f2 = false
  627. let stopped = false
  628. let stop = () => Void
  629. let unsub1 = obsv(o1).subscribe(subs({
  630. next: x => {
  631. if (stopped) { return }
  632. if (q2.length > 0) {
  633. push([x, q2.shift()])
  634. if (q2.length == 0 && f2) {
  635. unsub1()
  636. push(Complete)
  637. }
  638. } else {
  639. q1.push(x)
  640. }
  641. },
  642. error: e => { stop(); push(e) },
  643. complete: () => {
  644. f1 = true
  645. if (f2 || q1.length == 0) {
  646. if (!f2) { unsub2(); stoppd = true; q2 = [] }
  647. push(Complete)
  648. }
  649. }
  650. }))
  651. let unsub2 = obsv(o2).subscribe(subs({
  652. next: x => {
  653. if (stopped) { return }
  654. if (q1.length > 0) {
  655. push([q1.shift(), x])
  656. if (q1.length == 0 && f1) {
  657. unsub2()
  658. push(Complete)
  659. }
  660. } else {
  661. q2.push(x)
  662. }
  663. },
  664. error: e => { stop(); push(e) },
  665. complete: () => {
  666. f2 = true
  667. if (f1 || q2.length == 0) {
  668. if (!f1) { unsub1(); stopped = true; q1 = [] }
  669. push(Complete)
  670. }
  671. }
  672. }))
  673. stop = () => {
  674. unsub1()
  675. unsub2()
  676. q1 = []
  677. q2 = []
  678. stopped = true
  679. }
  680. return stop
  681. }),
  682. 'function zip (i1: Iterable, i2: Iterable) -> Iterator',
  683. (i1, i2) => zip([i1, i2], x => x)
  684. ),
  685. debounce: f (
  686. 'debounce',
  687. 'function debounce (o: Observable, f: Arity<1>) -> Observer',
  688. (o, f) => observer(push => {
  689. let timeout = null
  690. let waiting = false
  691. let end = false
  692. let reset = () => {
  693. if (timeout !== null) {
  694. clearTimeout(timeout)
  695. }
  696. }
  697. let unsub = obsv(o).subscribe(subs({
  698. next: x => {
  699. reset()
  700. let dur = call(f, [x])
  701. if (!is(dur, Types.Size)) {
  702. if (typeof unsub != 'undefined') { unsub() }
  703. ensure(false, 'debounce_invalid_duration')
  704. }
  705. waiting = true
  706. timeout = setTimeout(() => {
  707. push(x)
  708. if (end) {
  709. push(Complete)
  710. }
  711. waiting = false
  712. }, dur)
  713. },
  714. error: e => { reset(); push(e) },
  715. complete: () => {
  716. end = true
  717. if (!waiting) {
  718. push(Complete)
  719. }
  720. }
  721. }))
  722. return unsub
  723. }),
  724. 'function debounce (o: Observable, dur: Size) -> Observer',
  725. (o, dur) => call(built_in_functions.debounce, [o, fun (
  726. 'function get_duration (_: Any) -> Size',
  727. _ => dur
  728. )])
  729. ),
  730. throttle: f (
  731. 'throttle',
  732. 'function throttle (o: Observable, f: Arity<1>) -> Observer',
  733. (o, f) => observer(push => {
  734. let lock = false
  735. let unsub = obsv(o).subscribe(subs({
  736. next: x => {
  737. if (!lock) {
  738. let dur = call(f, [x])
  739. if (!is(dur, Types.Size)) {
  740. if (typeof unsub != 'undefined') { unsub() }
  741. ensure(false, 'throttle_invalid_duration')
  742. }
  743. lock = true
  744. setTimeout(() => {
  745. lock = false
  746. }, dur)
  747. push(x)
  748. }
  749. },
  750. error: e => { push(e) },
  751. complete: () => { push(Complete) }
  752. }))
  753. return unsub
  754. }),
  755. 'function throttle (o: Observable, dur: Size) -> Observer',
  756. (o, dur) => call(built_in_functions.throttle, [o, fun (
  757. 'function get_duration (_: Any) -> Size',
  758. _ => dur
  759. )])
  760. ),
  761. collect: fun (
  762. 'function collect (i: Iterable) -> List',
  763. i => list(iter(i))
  764. )
  765. })