操作符实战
1.工具方法型
count
import { range } from 'rxjs';
import { count } from 'rxjs/operators';
const numbers = range(1, 7);
const result = numbers.pipe(count(i => i % 2 === 1));
result.subscribe(x => console.log(x));
// Results in:
// 4
reduce
import { fromEvent, interval } from 'rxjs';
import { reduce, takeUntil, mapTo } from 'rxjs/operators';
const clicksInFiveSeconds = fromEvent(document, 'click').pipe(
takeUntil(interval(5000)),
);
const ones = clicksInFiveSeconds.pipe(mapTo(1));
const seed = 0;
const count = ones.pipe(reduce((acc, one) => acc + one, seed));
count.subscribe(x => console.log(x));
max\min
import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';
const obs$ = of(5, 4, 7, 2, 8);
merge(
obs$.pipe(max()),
obs$.pipe(min()),
).pipe(tap((val) => {
console.log("result....",val);
})).subscribe(console.log);
//output
result.... 8
8
result.... 2
2
tap
日志输出
of 是一个个输出的
import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';
const obs$ = of(5, 4, 7, 2, 8);
obs$.pipe(tap({
next:(val) => {
console.log("val",val);
},
error:() => {
},
complete:() => {
}
})).subscribe(console.log)
//output
val 5
5
val 4
4
val 7
7
val 2
2
repeat
重复 === 多次订阅
import { tap } from 'rxjs/operators';
// RxJS v6+
import { repeat, delay } from 'rxjs/operators';
import { of } from 'rxjs';
const delayedThing = of('delayed value').pipe(
tap(() => {
console.log("time..1.",new Date().toLocaleTimeString());
}),
delay(2000)
);
delayedThing
.pipe(
tap(() => {
console.log("time...2",new Date().toLocaleTimeString());
}),
repeat(3)
)
.subscribe(console.log);
//output
time..1. 4:42:45 PM
time...2 4:42:47 PM
delayed value
time..1. 4:42:47 PM
time...2 4:42:49 PM
delayed value
time..1. 4:42:49 PM
time...2 4:42:51 PM
delayed value
subscribeOn, observeOn
import { of, merge } from 'rxjs';
const a = of(1, 2, 3, 4);
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
// 1 2 3 4 5 6 7 8 9
import { of, merge, asyncScheduler } from 'rxjs';
import { subscribeOn } from 'rxjs/operators';
const a = of(1, 2, 3, 4).pipe(subscribeOn(asyncScheduler));
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
//5 6 7 8 9 1 2 3 4
import { interval } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const intervals = interval(10); // Intervals are scheduled
// with async scheduler by default...
intervals.pipe(
observeOn(animationFrameScheduler), // ...but we will observe on animationFrame
) // scheduler to ensure smooth animation.
.subscribe(val => {
someDiv.style.height = val + 'px';
});
materialize
- 用默认对象包箱, dematerialize 开箱
import { of } from 'rxjs';
import { materialize, map } from 'rxjs/operators';
const letters = of('a', 'b', '13', 'd');
const upperCase = letters.pipe(map(x => x.toUpperCase()));
const materialized = upperCase.pipe(materialize());
materialized.subscribe(x => console.log(x));
Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
Notification { kind: 'N', value: '13', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'D', error: undefined, hasValue: true }
Notification { kind: 'C', value: undefined, error: undefined, hasValue: false }
timestamp
import { of } from 'rxjs';
import { materialize, map, timestamp, tap } from 'rxjs/operators';
const letters = of('a', 'b', '13', 'd');
const times = letters.pipe(timestamp());
times.subscribe(res => {
console.log("res...",res)
});
//output
res... Timestamp { value: 'a', timestamp: 1594074567694 }
res... Timestamp { value: 'b', timestamp: 1594074567700 }
res... Timestamp { value: '13', timestamp: 1594074567700 }
res... Timestamp { value: 'd', timestamp: 1594074567700 }
toArray
最终结果toArray,取决于source是一个个产生的,map,filter,interval
import { interval } from 'rxjs';
import { toArray, take } from 'rxjs/operators';
const source = interval(1000);
const example = source.pipe(
take(10),
toArray()
);
const subscribe = example.subscribe(val => console.log(val));
// output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]