rxjs / 7.5.5 / api / operators / sharereplay.html /

shareReplay

function stable operator

Share source and replay specified number of emissions on subscription.

shareReplay<T>(configOrBufferSize?: number | ShareReplayConfig, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>

Parameters

configOrBufferSize number | ShareReplayConfig

Optional. Default is undefined.

windowTime number

Optional. Default is undefined.

Maximum time length of the replay buffer in milliseconds.

scheduler SchedulerLike

Optional. Default is undefined.

Scheduler where connected observers within the selector function will be invoked on.

Returns

MonoTypeOperatorFunction<T>: A function that returns an Observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.

Description

This operator is a specialization of replay that connects to a source observable and multicasts through a ReplaySubject constructed with the specified arguments. A successfully completed source will stay cached in the shareReplayed observable forever, but an errored source can be retried.

Why use shareReplay?

You generally want to use shareReplay when you have side-effects or taxing computations that you do not wish to be executed amongst multiple subscribers. It may also be valuable in situations where you know you will have late subscribers to a stream that need access to previously emitted values. This ability to replay values on subscription is what differentiates share and shareReplay.

Reference counting

By default shareReplay will use refCount of false, meaning that it will not unsubscribe the source when the reference counter drops to zero, i.e. the inner ReplaySubject will not be unsubscribed (and potentially run for ever). This is the default as it is expected that shareReplay is often used to keep around expensive to setup observables which we want to keep running instead of having to do the expensive setup again.

As of RXJS version 6.4.0 a new overload signature was added to allow for manual control over what happens when the operators internal reference counter drops to zero. If refCount is true, the source will be unsubscribed from once the reference count drops to zero, i.e. the inner ReplaySubject will be unsubscribed. All new subscribers will receive value emissions from a new ReplaySubject which in turn will cause a new subscription to the source observable.

Examples

Example with a third subscriber coming late to the party

import { interval, take, shareReplay } from 'rxjs';

const shared$ = interval(2000).pipe(
  take(6),
  shareReplay(3)
);

shared$.subscribe(x => console.log('sub A: ', x));
shared$.subscribe(y => console.log('sub B: ', y));

setTimeout(() => {
  shared$.subscribe(y => console.log('sub C: ', y));
}, 11000);

// Logs:
// (after ~2000 ms)
// sub A: 0
// sub B: 0
// (after ~4000 ms)
// sub A: 1
// sub B: 1
// (after ~6000 ms)
// sub A: 2
// sub B: 2
// (after ~8000 ms)
// sub A: 3
// sub B: 3
// (after ~10000 ms)
// sub A: 4
// sub B: 4
// (after ~11000 ms, sub C gets the last 3 values)
// sub C: 2
// sub C: 3
// sub C: 4
// (after ~12000 ms)
// sub A: 5
// sub B: 5
// sub C: 5

Example for refCount usage

import { Observable, tap, interval, shareReplay, take } from 'rxjs';

const log = <T>(name: string, source: Observable<T>) => source.pipe(
  tap({
    subscribe: () => console.log(`${ name }: subscribed`),
    next: value => console.log(`${ name }: ${ value }`),
    complete: () => console.log(`${ name }: completed`),
    finalize: () => console.log(`${ name }: unsubscribed`)
  })
);

const obs$ = log('source', interval(1000));

const shared$ = log('shared', obs$.pipe(
  shareReplay({ bufferSize: 1, refCount: true }),
  take(2)
));

shared$.subscribe(x => console.log('sub A: ', x));
shared$.subscribe(y => console.log('sub B: ', y));

// PRINTS:
// shared: subscribed <-- reference count = 1
// source: subscribed
// shared: subscribed <-- reference count = 2
// source: 0
// shared: 0
// sub A: 0
// shared: 0
// sub B: 0
// source: 1
// shared: 1
// sub A: 1
// shared: completed <-- take(2) completes the subscription for sub A
// shared: unsubscribed <-- reference count = 1
// shared: 1
// sub B: 1
// shared: completed <-- take(2) completes the subscription for sub B
// shared: unsubscribed <-- reference count = 0
// source: unsubscribed <-- replaySubject unsubscribes from source observable because the reference count dropped to 0 and refCount is true

// In case of refCount being false, the unsubscribe is never called on the source and the source would keep on emitting, even if no subscribers
// are listening.
// source: 2
// source: 3
// source: 4
// ...

See Also

© 2015–2022 Google, Inc., Netflix, Inc., Microsoft Corp. and contributors.
Code licensed under an Apache-2.0 License. Documentation licensed under CC BY 4.0.
https://rxjs.dev/api/operators/shareReplay