fromObservable.ts
Purpose
Convert an RxJS Observable into a reactive Atom.
Overview
fromObservable is the counterpart to observe. It provides synchronous reactive access to values produced by an Observable by subscribing to it. The returned Atom always holds the most recent value emitted by the Observable. If the Observable emits an error, the Atom will throw that error when accessed.
The returned Atom is "destroyable", meaning it can be unsubscribed from the source Observable when it is no longer needed.
Conceptual Architecture
Internally, fromObservable uses createAtomSubject from @nrgyjs/core. It subscribes to the source Observable and forwards all emitted values (next) and errors (error) to the internal AtomSubject.
When the returned Atom is destroyed (e.g., via a Scope), it automatically unsubscribes from the source Observable to prevent memory leaks. This is handled via the onDestroy hook provided to createAtomSubject.
Public API Description
fromObservable<T>(source: Observable<T> | Subscribable<T>): DestroyableAtom<T | undefined>
Subscribes to the source and returns an Atom containing the latest value. Since no initial value is provided, the Atom starts with undefined.
source: The RxJSObservableorSubscribableto subscribe to.
fromObservable<T>(source: Observable<T> | Subscribable<T>, initialValue: T): DestroyableAtom<T>
Subscribes to the source and returns an Atom starting with initialValue.
source: The RxJSObservableorSubscribableto subscribe to.initialValue: Initial value for the atom.
Usage Examples
Basic usage
import { fromObservable } from '@nrgyjs/rxjs';
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(1);
const count = fromObservable(subject);
console.log(count()); // 1
subject.next(2);
console.log(count()); // 2With initial value
import { fromObservable } from '@nrgyjs/rxjs';
import { Subject } from 'rxjs';
const subject = new Subject<number>();
const count = fromObservable(subject, 0);
console.log(count()); // 0
subject.next(10);
console.log(count()); // 10Managing lifecycle with Scope
import { createScope } from '@nrgyjs/core';
import { fromObservable } from '@nrgyjs/rxjs';
import { interval } from 'rxjs';
const scope = createScope();
const timer = fromObservable(interval(1000), 0);
scope.add(timer);
// Later, when scope is destroyed, the subscription to interval is also closed.
scope.destroy();