123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766 |
- pour(built_in_functions, {
- subscribe: f (
- 'subscribe',
- 'function subscribe (o: Observable, s: Subscriber) -> Arity<0>',
- (o, s) => obsv(o).subscribe(s),
- 'function subscribe (o: Observable, f: Arity<1>) -> Arity<0>',
- (o, f) => obsv(o).subscribe(new_struct(Types.Subscriber, {
- next: f
- })),
- 'function subscribe (o: Observable, f: Arity<0>) -> Arity<0>',
- (o, f) => obsv(o).subscribe(new_struct(Types.Subscriber, {
- next: fun (
- 'function callback (_: Any) -> Object',
- _ => call(f, [])
- )
- }))
- ),
- iter2obsv: fun (
- 'function iter2obsv (i: Iterable) -> Observer',
- i => observer(push => {
- for (let e of iter(i)) {
- push(e)
- }
- push(Complete)
- }),
- ),
- seq: f (
- 'seq',
- 'function seq (n: Size) -> Iterator',
- n => count(n),
- 'function seq (start: Index, amount: Size) -> Iterator',
- (start, amount) => map(count(amount), i => start + i)
- ),
- repeat: fun (
- 'function repeat (object: Any, n: Size) -> Iterator',
- (object, n) => (function* () {
- for (let i = 0; i < n; i++) {
- yield object
- }
- })()
- ),
- concat: f (
- 'concat',
- 'function concat (o1: Observable, o2: Observable) -> Observer',
- (o1, o2) => observer(push => {
- let unsub = obsv(o1).subscribe(subs({
- next: x => push(x),
- error: e => push(e),
- complete: () => {
- unsub = obsv(o2).subscribe(subs({
- next: x => push(x),
- error: e => push(e),
- complete: () => push(Complete)
- }))
- }
- }))
- return () => unsub()
- }),
- 'function concat (i1: Iterable, i2: Iterable) -> Iterator',
- (i1, i2) => (function* () {
- for (let e of iter(i1)) {
- yield e
- }
- for (let e of iter(i2)) {
- yield e
- }
- })()
- ),
- merge: fun (
- 'function merge (o1: Observable, o2: Observable) -> Observer',
- (o1, o2) => observer(push => {
- let f1 = false
- let f2 = false
- let nosub2 = false
- let unsub1 = obsv(o1).subscribe(subs({
- next: x => push(x),
- error: e => {
- if (typeof unsub2 == 'undefined') {
- unsub2()
- } else {
- nosub2 = true
- }
- push(e)
- },
- complete: () => {
- f1 = true
- if (f2) {
- push(Complete)
- }
- }
- }))
- let unsub2 = !nosub2? obsv(o2).subscribe(subs({
- next: x => push(x),
- error: e => {
- unsub1()
- push(e)
- },
- complete: () => {
- f2 = true
- if (f1) {
- push(Complete)
- }
- }
- })): () => Void
- return () => { unsub1(); unsub2() }
- })
- ),
- range: f (
- 'range',
- 'function range (begin: Index, end: Index) -> Iterator',
- (begin, end) => {
- ensure(begin <= end, 'invalid_range', begin, end)
- return (function* () {
- for (let i = begin; i < end; i += 1) {
- yield i
- }
- })()
- },
- 'function range (begin: Index, end: Index, step: Size) -> Iterator',
- (begin, end, step) => {
- ensure(begin <= end, 'invalid_range', begin, end)
- return (function* () {
- for (let i = begin; i < end; i += step) {
- yield i
- }
- })()
- }
- ),
- map: f (
- 'map',
- 'function map (o: Observable, f: Arity<2>) -> Observer',
- (o, f) => observer(push => {
- let i = 0
- return obsv(o).subscribe(subs({
- next: x => {
- push(call(f, [x, i]))
- i += 1
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))
- }),
- 'function map (o: Observable, f: Arity<1>) -> Observer',
- (o, f) => observer(push => obsv(o).subscribe(subs({
- next: x => push(call(f, [x])),
- error: e => push(e),
- complete: () => push(Complete)
- }))),
- 'function map (i: Iterable, f: Arity<2>) -> Iterator',
- (i, f) => map(iter(i), (e, n) => call(f, [e, n])),
- 'function map (i: Iterable, f: Arity<1>) -> Iterator',
- (i, f) => map(iter(i), e => call(f, [e]))
- ),
- filter: f (
- 'filter',
- 'function filter (o: Observable, f: Arity<2>) -> Observer',
- (o, f) => observer(push => {
- let i = 0
- return obsv(o).subscribe(subs({
- next: x => {
- let ok = call(f, [x, i])
- ensure(is(ok, Types.Bool), 'filter_not_bool')
- if (ok) {
- push(x)
- }
- i += 1
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))
- }),
- 'function filter (o: Observable, f: Arity<1>) -> Observer',
- (o, f) => observer(push => obsv(o).subscribe(subs({
- next: x => {
- let ok = call(f, [x])
- ensure(is(ok, Types.Bool), 'filter_not_bool')
- if (ok) {
- push(x)
- }
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))),
- 'function filter (i: Iterable, f: Arity<2>) -> Iterator',
- (i, f) => filter(iter(i), (e, n) => {
- let ok = call(f, [e, n])
- ensure(is(ok, Types.Bool), 'filter_not_bool')
- return ok
- }),
- 'function filter (i: Iterable, f: Arity<1>) -> Iterator',
- (i, f) => filter(iter(i), e => {
- let ok = call(f, [e])
- ensure(is(ok, Types.Bool), 'filter_not_bool')
- return ok
- })
- ),
- take: f (
- 'take',
- 'function take (o: Observable, f: Arity<2>) -> Observer',
- (o, f) => observer(push => {
- let i = 0
- let unsub = obsv(o).subscribe(subs({
- next: x => {
- let ok = call(f, [x, i])
- ensure(is(ok, Types.Bool), 'filter_not_bool')
- if (ok) {
- push(x)
- } else {
- unsub()
- push(Complete)
- }
- i += 1
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))
- return unsub
- }),
- 'function take (o: Observable, f: Arity<1>) -> Observer',
- (o, f) => observer(push => {
- let unsub = obsv(o).subscribe(subs({
- next: x => {
- let ok = call(f, [x])
- ensure(is(ok, Types.Bool), 'filter_not_bool')
- if (ok) {
- push(x)
- } else {
- unsub()
- push(Complete)
- }
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))
- return unsub
- }),
- 'function take (i: Iterable, f: Arity<2>) -> Iterator',
- (i, f) => (function* () {
- let n = 0
- for (let e of iter(i)) {
- let ok = call(f, [e, n])
- ensure(is(ok, Types.Bool), 'filter_not_bool')
- if (ok) {
- yield e
- n += 1
- } else {
- break
- }
- }
- })(),
- 'function take (i: Iterable, f: Arity<1>) -> Iterator',
- (i, f) => (function* () {
- for (let e of iter(i)) {
- let ok = call(f, [e])
- ensure(is(ok, Types.Bool), 'filter_not_bool')
- if (ok) {
- yield e
- } else {
- break
- }
- }
- })()
- ),
- until: fun (
- 'function until (o: Observable, signal: Observable) -> Observer',
- (o, signal) => observer(push => {
- let unsub = () => Void
- let unsub_signal = () => Void
- let unsub_all = () => { unsub(); unsub_signal() }
- unsub = obsv(o).subscribe(subs({
- next: x => push(x),
- error: e => { unsub_signal(); push(e) },
- complete: () => { unsub_signal(); push(Complete) }
- }))
- unsub_signal = obsv(signal).subscribe(subs({
- next: _ => { unsub_all(); push(Complete) },
- error: e => { unsub(); push(e) },
- complete: () => Void
- }))
- return unsub_all
- })
- ),
- find: f (
- 'find',
- 'function find (i: Iterable, T: Type) -> Object',
- (i, T) => {
- let r = find(iter(i), e => call(operator_is, [e, T]))
- return (r !== NotFound)? r: Types.NotFound
- },
- 'function find (i: Iterable, f: Arity<2>) -> Object',
- (i, f) => {
- let r = find(iter(i), (e, n) => {
- let c = call(f, [e, n])
- ensure(is(c, Types.Bool), 'cond_not_bool')
- return c
- })
- return (r !== NotFound)? r: Types.NotFound
- },
- 'function find (i: Iterable, f: Arity<1>) -> Object',
- (i, f) => {
- let r = find(iter(i), e => {
- let c = call(f, [e])
- ensure(is(c, Types.Bool), 'cond_not_bool')
- return c
- })
- return (r !== NotFound)? r: Types.NotFound
- }
- ),
- count: fun (
- 'function count (i: Iterable) -> Size',
- i => fold(iter(i), 0, (_, v) => v+1)
- ),
- scan: f (
- 'scan',
- 'function scan (o: Observable, base: Any, f: Arity<3>) -> Observer',
- (o, base, f) => observer(push => {
- let v = base
- let n = 0
- return obsv(o).subscribe(subs({
- next: x => {
- v = push(call(f, [x, v, n]))
- n += 1
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))
- }),
- 'function scan (o: Observable, base: Any, f: Arity<2>) -> Observer',
- (o, base, f) => observer(push => {
- let v = base
- return obsv(o).subscribe(subs({
- next: x => {
- v = push(call(f, [x, v]))
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))
- }),
- 'function scan (i: Iterable, base: Any, f: Arity<3>) -> Iterator',
- (i, base, f) => (function* () {
- let v = base
- let n = 0
- for (let e of iter(i)) {
- v = call(f, [e, v, n])
- yield v
- n += 1
- }
- })(),
- 'function scan (i: Iterable, base: Any, f: Arity<2>) -> Iterator',
- (i, base, f) => (function* () {
- let v = base
- for (let e of iter(i)) {
- v = call(f, [e, v])
- yield v
- }
- })()
- ),
- fold: f (
- 'fold',
- 'function fold (o: Observable, base: Any, f: Arity<3>) -> Observer',
- (o, base, f) => observer(push => {
- let v = base
- let n = 0
- return obsv(o).subscribe(subs({
- next: x => {
- v = call(f, [x, v, n])
- n += 1
- },
- error: e => push(e),
- complete: () => {
- push(v)
- push(Complete)
- }
- }))
- }),
- 'function fold (o: Observable, base: Any, f: Arity<2>) -> Observer',
- (o, base, f) => observer(push => {
- let v = base
- return obsv(o).subscribe(subs({
- next: x => {
- v = call(f, [x, v])
- },
- error: e => push(e),
- complete: () => {
- push(v)
- push(Complete)
- }
- }))
- }),
- 'function fold (i: Iterable, base: Any, f: Arity<3>) -> Object',
- (i, base, f) => fold(iter(i), base, (e, v, n) => {
- return call(f, [e, v, n])
- }),
- 'function fold (i: Iterable, base: Any, f: Arity<2>) -> Object',
- (i, base, f) => fold(iter(i), base, (e,v) => {
- return call(f, [e, v])
- })
- ),
- every: f (
- 'every',
- 'function every (i: Iterable, f: Arity<2>) -> Bool',
- (i, f) => forall(iter(i), (e, i) => {
- let v = call(f, [e, i])
- ensure(is(v, Types.Bool), 'cond_not_bool')
- return v
- }),
- 'function every (i: Iterable, f: Arity<1>) -> Bool',
- (i, f) => forall(iter(i), e => {
- let v = call(f, [e])
- ensure(is(v, Types.Bool), 'cond_not_bool')
- return v
- })
- ),
- some: f (
- 'some',
- 'function some (i: Iterable, f: Arity<2>) -> Bool',
- (i, f) => exists(iter(i), (e, i) => {
- let v = call(f, [e, i])
- ensure(is(v, Types.Bool), 'cond_not_bool')
- return v
- }),
- 'function some (i: Iterable, f: Arity<1>) -> Bool',
- (i, f) => exists(iter(i), e => {
- let v = call(f, [e])
- ensure(is(v, Types.Bool), 'cond_not_bool')
- return v
- })
- ),
- join: fun (
- 'function join (i: Iterable, sep: String) -> String',
- (i, sep) => {
- let string = ''
- let first = true
- for (let e of i) {
- ensure(is(e, Types.String), 'element_not_string')
- if (first) {
- first = false
- } else {
- string += sep
- }
- string += e
- }
- return string
- }
- ),
- reversed: fun (
- 'function reversed (i: Iterable) -> Iterator',
- i => (function* () {
- let buf = []
- for (let e of iter(i)) {
- buf.push(e)
- }
- for (let e of rev(buf)) {
- yield e
- }
- })(),
- 'function reversed (l: List) -> Iterator',
- l => rev(l)
- ),
- flat: f (
- 'flat',
- 'function flat (o: Observable, limit: PosInt) -> Observer',
- (o, limit) => observer(push => {
- let unsub = new Set()
- let waiting = []
- let stopped = false
- let stop = () => Void
- let source_complete = false
- let unsub_source = obsv(o).subscribe(subs({
- next: function next_callback (x) {
- if (stopped) { return }
- let ok = is(x, Types.Observable)
- if (!ok) {
- stop()
- ensure(false, 'value_not_observable')
- }
- if (unsub.size < limit) {
- let sync = true
- let sync_complete = false
- let unsub_i = obsv(x).subscribe(subs({
- next: y => push(y),
- error: e => { stop(); push(e) },
- complete: () => {
- if (!sync) {
- unsub.delete(unsub_i)
- } else {
- sync_complete = true
- }
- if (waiting.length > 0) {
- next_callback(waiting.shift())
- } else if (source_complete) {
- if (unsub.size == 0) {
- push(Complete)
- }
- }
- }
- }))
- sync = false
- if (!sync_complete) {
- unsub.add(unsub_i)
- }
- } else {
- waiting.push(x)
- }
- },
- error: e => { stop(); push(e) },
- complete: () => {
- source_complete = true
- }
- }))
- stop = () => {
- unsub_source()
- foreach(unsub, u => u())
- unsub.clear()
- waiting = []
- stopped = true
- }
- return stop
- }),
- 'function flat (o: Observable) -> Observer',
- o => observer(push => {
- let unsub = new Set()
- let stopped = false
- let stop = () => Void
- let source_complete = false
- let unsub_source = obsv(o).subscribe(subs({
- next: x => {
- if (stopped) { return }
- let ok = is(x, Types.Observable)
- if (!ok) {
- stop()
- ensure(false, 'value_not_observable')
- }
- let sync = true
- let sync_complete = false
- let unsub_i = obsv(x).subscribe(subs({
- next: y => push(y),
- error: e => { stop(); push(e) },
- complete: () => {
- if (!sync) {
- unsub.delete(unsub_i)
- } else {
- sync_complete = true
- }
- if (source_complete && unsub.size == 0) {
- push(Complete)
- }
- }
- }))
- sync = false
- if (!sync_complete) {
- unsub.add(unsub_i)
- }
- },
- error: e => { stop(); push(e) },
- complete: () => {
- source_complete = true
- }
- }))
- stop = () => {
- unsub_source()
- foreach(unsub, u => u())
- unsub.clear()
- stopped = true
- }
- return stop
- }),
- 'function flat (i: Iterable) -> Iterator',
- i => (function* () {
- for (let e of iter(i)) {
- ensure(is(e, Types.Iterable), 'element_not_iterable')
- for (let ee of iter(e)) {
- yield ee
- }
- }
- })()
- ),
- tap: f (
- 'tap',
- 'function tap (o: Observable, f: Arity<2>) -> Observer',
- (o, f) => observer(push => {
- let i = 0
- return obsv(o).subscribe(subs({
- next: x => {
- call(f, [x, i])
- push(x)
- i += 1
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))
- }),
- 'function tap (o: Observable, f: Arity<1>) -> Observer',
- (o, f) => observer(push => obsv(o).subscribe(subs({
- next: x => {
- call(f, [x])
- push(x)
- },
- error: e => push(e),
- complete: () => push(Complete)
- }))),
- 'function tap (i: Iterable, f: Arity<2>) -> Iterator',
- (i, f) => (function* () {
- let n = 0
- for (let e of iter(i)) {
- call(f, [e, n])
- yield e
- n += 1
- }
- })(),
- 'function tap (i: Iterable, f: Arity<1>) -> Iterator',
- (i, f) => (function* () {
- for (let e of iter(i)) {
- call(f, [e])
- yield e
- }
- })()
- ),
- zip: f (
- 'zip',
- 'function zip (o1: Observable, o2: Observable) -> Observer',
- (o1, o2) => observer(push => {
- let q1 = []
- let q2 = []
- let f1 = false
- let f2 = false
- let stopped = false
- let stop = () => Void
- let unsub1 = obsv(o1).subscribe(subs({
- next: x => {
- if (stopped) { return }
- if (q2.length > 0) {
- push([x, q2.shift()])
- if (q2.length == 0 && f2) {
- unsub1()
- push(Complete)
- }
- } else {
- q1.push(x)
- }
- },
- error: e => { stop(); push(e) },
- complete: () => {
- f1 = true
- if (f2 || q1.length == 0) {
- if (!f2) { unsub2(); stoppd = true; q2 = [] }
- push(Complete)
- }
- }
- }))
- let unsub2 = obsv(o2).subscribe(subs({
- next: x => {
- if (stopped) { return }
- if (q1.length > 0) {
- push([q1.shift(), x])
- if (q1.length == 0 && f1) {
- unsub2()
- push(Complete)
- }
- } else {
- q2.push(x)
- }
- },
- error: e => { stop(); push(e) },
- complete: () => {
- f2 = true
- if (f1 || q2.length == 0) {
- if (!f1) { unsub1(); stopped = true; q1 = [] }
- push(Complete)
- }
- }
- }))
- stop = () => {
- unsub1()
- unsub2()
- q1 = []
- q2 = []
- stopped = true
- }
- return stop
- }),
- 'function zip (i1: Iterable, i2: Iterable) -> Iterator',
- (i1, i2) => zip([i1, i2], x => x)
- ),
- debounce: f (
- 'debounce',
- 'function debounce (o: Observable, f: Arity<1>) -> Observer',
- (o, f) => observer(push => {
- let timeout = null
- let waiting = false
- let end = false
- let reset = () => {
- if (timeout !== null) {
- clearTimeout(timeout)
- }
- }
- let unsub = obsv(o).subscribe(subs({
- next: x => {
- reset()
- let dur = call(f, [x])
- if (!is(dur, Types.Size)) {
- if (typeof unsub != 'undefined') { unsub() }
- ensure(false, 'debounce_invalid_duration')
- }
- waiting = true
- timeout = setTimeout(() => {
- push(x)
- if (end) {
- push(Complete)
- }
- waiting = false
- }, dur)
- },
- error: e => { reset(); push(e) },
- complete: () => {
- end = true
- if (!waiting) {
- push(Complete)
- }
- }
- }))
- return unsub
- }),
- 'function debounce (o: Observable, dur: Size) -> Observer',
- (o, dur) => call(built_in_functions.debounce, [o, fun (
- 'function get_duration (_: Any) -> Size',
- _ => dur
- )])
- ),
- throttle: f (
- 'throttle',
- 'function throttle (o: Observable, f: Arity<1>) -> Observer',
- (o, f) => observer(push => {
- let lock = false
- let unsub = obsv(o).subscribe(subs({
- next: x => {
- if (!lock) {
- let dur = call(f, [x])
- if (!is(dur, Types.Size)) {
- if (typeof unsub != 'undefined') { unsub() }
- ensure(false, 'throttle_invalid_duration')
- }
- lock = true
- setTimeout(() => {
- lock = false
- }, dur)
- push(x)
- }
- },
- error: e => { push(e) },
- complete: () => { push(Complete) }
- }))
- return unsub
- }),
- 'function throttle (o: Observable, dur: Size) -> Observer',
- (o, dur) => call(built_in_functions.throttle, [o, fun (
- 'function get_duration (_: Any) -> Size',
- _ => dur
- )])
- ),
- collect: fun (
- 'function collect (i: Iterable) -> List',
- i => list(iter(i))
- )
- })
|