Rxjs 操作符实践指南





  • 统计总数
import { range } from 'rxjs';
import { count } from 'rxjs/operators';

const numbers = range(1, 7);
const result = numbers.pipe(count(i => i % 2 === 1));
result.subscribe(x => console.log(x));
// Results in:
// 4


  • 累计
import { fromEvent, interval } from 'rxjs';
import { reduce, takeUntil, mapTo } from 'rxjs/operators';

const clicksInFiveSeconds = fromEvent(document, 'click').pipe(
const ones = clicksInFiveSeconds.pipe(mapTo(1));
const seed = 0;
const count = ones.pipe(reduce((acc, one) => acc + one, seed));
count.subscribe(x => console.log(x));


import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';

const obs$ = of(5, 4, 7, 2, 8);
).pipe(tap((val) => {

result.... 8
result.... 2


of 是一个个输出的

import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';

const obs$ = of(5, 4, 7, 2, 8);
    next:(val) => {
    error:() => {

    complete:() => {


val 5
val 4
val 7
val 2


重复 === 多次订阅

import { tap } from 'rxjs/operators';
// RxJS v6+
import { repeat, delay } from 'rxjs/operators';
import { of } from 'rxjs';

const delayedThing = of('delayed value').pipe(
        tap(() => {
            console.log("time..1.",new Date().toLocaleTimeString());

    tap(() => {
        console.log("time...2",new Date().toLocaleTimeString());

time..1. 4:42:45 PM
time...2 4:42:47 PM
delayed value
time..1. 4:42:47 PM
time...2 4:42:49 PM
delayed value
time..1. 4:42:49 PM
time...2 4:42:51 PM
delayed value

subscribeOn, observeOn

  • 调整执行时机,
import { of, merge } from 'rxjs';

const a = of(1, 2, 3, 4);
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
// 1 2 3 4 5 6 7 8 9

import { of, merge, asyncScheduler } from 'rxjs';
import { subscribeOn } from 'rxjs/operators';

const a = of(1, 2, 3, 4).pipe(subscribeOn(asyncScheduler));
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
//5 6 7 8 9 1 2 3 4

import { interval } from 'rxjs';
import { observeOn } from 'rxjs/operators';

const intervals = interval(10);                // Intervals are scheduled
                                              // with async scheduler by default...
  observeOn(animationFrameScheduler),          // ...but we will observe on animationFrame
)                                              // scheduler to ensure smooth animation.
.subscribe(val => {
  someDiv.style.height = val + 'px';


  • 用默认对象包箱, dematerialize 开箱
import { of } from 'rxjs';
import { materialize, map } from 'rxjs/operators';

const letters = of('a', 'b', '13', 'd');
const upperCase = letters.pipe(map(x => x.toUpperCase()));
const materialized = upperCase.pipe(materialize());
materialized.subscribe(x => console.log(x));

Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
Notification { kind: 'N', value: '13', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'D', error: undefined, hasValue: true }
Notification { kind: 'C', value: undefined, error: undefined, hasValue: false }


  • 添加时间戳
 import { of } from 'rxjs';
 import { materialize, map, timestamp, tap } from 'rxjs/operators';

 const letters = of('a', 'b', '13', 'd');

 const times = letters.pipe(timestamp());
 times.subscribe(res => {

 res... Timestamp { value: 'a', timestamp: 1594074567694 }
 res... Timestamp { value: 'b', timestamp: 1594074567700 }
 res... Timestamp { value: '13', timestamp: 1594074567700 }
 res... Timestamp { value: 'd', timestamp: 1594074567700 }



 import { interval } from 'rxjs';
 import { toArray, take } from 'rxjs/operators';

 const source = interval(1000);
 const example = source.pipe(

 const subscribe = example.subscribe(val => console.log(val));

 // output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

