Provides reactive stream utilities for handling asynchronous data flows. Used internally by @cosmjs/tendermint-rpc for WebSocket subscriptions.
npm install @cosmjs/stream
DefaultValueProducer
An xstream Producer that stores a current value and emits updates. start and stop implement the xstream Producer contract and are normally called by the stream, not application code.
| Method | Parameters | Returns |
|---|
constructor | value: T, callbacks?: DefaultValueProducerCallsbacks | DefaultValueProducer<T> |
update | value: T | void |
error | error: any | void |
start | listener: Listener<T> | void |
stop | — | void |
interface DefaultValueProducerCallsbacks {
readonly onStarted: () => void;
readonly onStop: () => void;
}
The exported identifier DefaultValueProducerCallsbacks contains a spelling typo (“Callsbacks”) that is preserved for backwards compatibility.
ValueAndUpdates
Wraps a DefaultValueProducer to expose both the current value and a MemoryStream of updates.
| Method | Parameters | Returns |
|---|
constructor | producer: DefaultValueProducer<T> | ValueAndUpdates<T> |
waitFor | search: SearchFunction<T> | T | Promise<T> |
| Property | Type |
|---|
value | T |
updates | MemoryStream<T> |
waitFor resolves with the first value matching the predicate (or strict-equal match when a value is passed).
Promise Utilities
| Function | Parameters | Returns |
|---|
toListPromise | stream: Stream<T>, count: number | Promise<readonly T[]> |
firstEvent | stream: Stream<T> | Promise<T> |
fromListPromise | promise: Promise<Iterable<T>> | Stream<T> |
import { toListPromise, firstEvent } from "@cosmjs/stream";
const first = await firstEvent(stream);
const items = await toListPromise(stream, 5);
Stream Operators
| Function | Parameters | Returns |
|---|
concat | ...streams: Array<Stream<T>> | Stream<T> |
dropDuplicates | valueToKey: (x: T) => string | SameTypeStreamOperator<T> |
Reducer
Utilities for applying reducer patterns to streams.
| Export | Kind | Purpose |
|---|
Reducer<T, U> | class | Subscribes to a stream and materializes state of type U from events of type T |
ReducerFunc<T, U> | type | (acc: U, evt: T) => U |
countStream | function | Returns Reducer<T, number> that counts events seen on the stream |
asArray | function | Returns Reducer<T, readonly T[]> that accumulates all events into an array |
lastValue | function | Returns Reducer<T, T | undefined> holding the most recently emitted value |
import { Reducer, countStream, asArray, lastValue } from "@cosmjs/stream";
const counter = countStream(myStream);
// ... later
counter.value(); // number of events seen so far
await counter.finished(); // resolves when stream completes