import { SubscriberFunction, Subscription, SubscriptionLike} from "./Subscription.ts";import { Observer } from "./Observer.ts";import { isAsyncIterable, isIterable, isObservableLike} from "./symbol.ts";import { isObserver, assertIsObserver } from "./utils.ts";
export interface ObservableLike<Value = unknown> { [Symbol.observable](): Observable<Value>;}
export class Observable<Value = unknown> implements ObservableLike<Value> { private _subscriber: SubscriberFunction<Value>; constructor(subscriber: SubscriberFunction<Value>) { if (!(this instanceof Observable)) { throw new TypeError("Observable cannot be called as a function"); } if (typeof subscriber !== "function") { throw new TypeError("Observable initializer must be a function"); } this._subscriber = subscriber; }
subscribe(observer: Observer<Value>): SubscriptionLike; subscribe( onNext: (value: Value) => void, onError?: (error: unknown) => void, onComplete?: () => void, ): SubscriptionLike; subscribe( observerOrOnNext: Observer<Value> | ((value: Value) => void), onError?: (error: unknown) => void, onComplete?: () => void, ): SubscriptionLike { const observer = isObserver(observerOrOnNext) ? observerOrOnNext : { next: observerOrOnNext, error: onError, complete: onComplete, }; assertIsObserver(observer); return new Subscription(observer, {subscriber: this._subscriber}); }
[Symbol.observable](): Observable<Value> { return this; }
static of<Value = unknown>(...items: Value[]): Observable<Value> { return new Observable<Value>((observer) => { for (const iterator of items) observer.next(iterator); observer.complete(); }); }
static from<Value = unknown>( observable: ObservableLike<Value>, ): Observable<Value>; static from<Value = unknown>( iterable: Iterable<Value>, ): Observable<Value>; static from<Value = unknown>( iterable: AsyncIterable<Value>, ): Observable<Value>; static from<Value = unknown>( argument: | Iterable<Value> | AsyncIterable<Value> | ObservableLike<Value>, ): Observable<Value> { if (isObservableLike(argument)) { const observable = argument[Symbol.observable](); return new Observable<Value>((observer) => observable.subscribe(observer) ); } if (isIterable(argument)) { return new Observable<Value>((observer) => { for (const value of argument) observer.next(value); observer.complete(); }); } if (isAsyncIterable(argument)) { return new Observable<Value>((observe) => { (async () => { for await (const value of argument) observe.next(value); })(); observe.complete(); }); } throw new TypeError( "Observable.from expects to receive an ObservableLike, Iterable or AsyncIterable", ); }}