Skip to main content
Provides reactive stream utilities for handling asynchronous data flows. Used internally by @cosmjs/tendermint-rpc for WebSocket subscriptions.
npm install @cosmjs/stream

DefaultValueProducer

An xstream Producer that stores a current value and emits updates. start and stop implement the xstream Producer contract and are normally called by the stream, not application code.
MethodParametersReturns
constructorvalue: T, callbacks?: DefaultValueProducerCallsbacksDefaultValueProducer<T>
updatevalue: Tvoid
errorerror: anyvoid
startlistener: Listener<T>void
stopvoid
PropertyType
valueT
interface DefaultValueProducerCallsbacks {
  readonly onStarted: () => void;
  readonly onStop: () => void;
}
The exported identifier DefaultValueProducerCallsbacks contains a spelling typo (“Callsbacks”) that is preserved for backwards compatibility.

ValueAndUpdates

Wraps a DefaultValueProducer to expose both the current value and a MemoryStream of updates.
MethodParametersReturns
constructorproducer: DefaultValueProducer<T>ValueAndUpdates<T>
waitForsearch: SearchFunction<T> | TPromise<T>
PropertyType
valueT
updatesMemoryStream<T>
waitFor resolves with the first value matching the predicate (or strict-equal match when a value is passed).

Promise Utilities

FunctionParametersReturns
toListPromisestream: Stream<T>, count: numberPromise<readonly T[]>
firstEventstream: Stream<T>Promise<T>
fromListPromisepromise: Promise<Iterable<T>>Stream<T>
import { toListPromise, firstEvent } from "@cosmjs/stream";

const first = await firstEvent(stream);
const items = await toListPromise(stream, 5);

Stream Operators

FunctionParametersReturns
concat...streams: Array<Stream<T>>Stream<T>
dropDuplicatesvalueToKey: (x: T) => stringSameTypeStreamOperator<T>

Reducer

Utilities for applying reducer patterns to streams.
ExportKindPurpose
Reducer<T, U>classSubscribes to a stream and materializes state of type U from events of type T
ReducerFunc<T, U>type(acc: U, evt: T) => U
countStreamfunctionReturns Reducer<T, number> that counts events seen on the stream
asArrayfunctionReturns Reducer<T, readonly T[]> that accumulates all events into an array
lastValuefunctionReturns Reducer<T, T | undefined> holding the most recently emitted value
import { Reducer, countStream, asArray, lastValue } from "@cosmjs/stream";

const counter = countStream(myStream);
// ... later
counter.value(); // number of events seen so far
await counter.finished(); // resolves when stream completes