const {
ConnectableObservable,
merge,
MonoTypeOperatorFunction,
Observable,
of,
Subject,
Subscription,
timer
} = rxjs;
const {
concatMap,
debounce,
mapTo,
publish,
startWith,
switchMap
} = rxjs.operators;
// The pipeable operator:
function waitUntilQuietButNotTooLong(
quietDuration,
tooLongDuration
) {
return source => new Observable(observer => {
let tooLongTimer;
// Debounce the source using a notifier that emits after `quietDuration`
// milliseconds since the last source emission or `tooLongDuration`
// milliseconds since the observable returned by the operator last
// emitted.
const debounced = source.pipe(
debounce(() => merge(
timer(quietDuration),
tooLongTimer
))
);
// Each time the source emits, `debounce` will subscribe to the notifier.
// Use `publish` to create a `ConnectableObservable` so that the too-long
// timer will continue independently of the subscription from `debounce`
// implementation.
tooLongTimer = debounced.pipe(
startWith(undefined),
switchMap(() => timer(tooLongDuration)),
publish()
);
// Connect the `tooLongTimer` observable and subscribe the observer to
// the `debounced` observable. Compose a subscription so that
// unsubscribing from the observable returned by the operator will
// disconnect from `tooLongTimer` and unsubscribe from `debounced`.
const subscription = new Subscription();
subscription.add(tooLongTimer.connect());
subscription.add(debounced.subscribe(observer));
return subscription;
});
}
// For a harness, create a subject and apply the operator:
const since = Date.now();
const source = new Subject();
source.pipe(
waitUntilQuietButNotTooLong(100, 500)
).subscribe(value => console.log(`received ${value} @ ${Date.now() - since} ms`));
// And create an observable that emits at a particular time and subscribe
// the subject to it:
const emissions = of(0, 50, 100, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950, 1000, 1050, 1100, 1150);
emissions.pipe(
concatMap((value, index) => timer(new Date(since + value)).pipe(
mapTo(index)
))
).subscribe(source);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>