深入理解node的web stream模块
- 提示:需要掌握node传统的流以及事件机制
- node环境:v16.5.0+
- 一下内容全部以node v18.12.0实验为基础
- 如果观看期间发现了一些不认识的api,那就是我在用node18的api,可以自行观看node官方文档,很简单的!😄
专业术语
-
内置队列或缓存:可以理解为node根据流输入的数据,用一个链表数据结构建立的缓存,读取、写出的内容都需要经过缓存。(参考专业说法:内置队列MDN)
-
highWaterMark水平线或阈值:内置队列需要设立上限,否则会突破node的内存限制大小,从而成为一种攻击手段
https://nodejs.org/docs/latest-v18.x/api/stream.html#writablewritechunk-encoding-callback
攻击手段:A给node服务发送world,让它帮A转换html文件,A扮演着发送流,也扮演着接受流;此时A只发送,但是决绝接受,一旦这个文件超过node内存限制,也就意味着这个node服务将会内存泄露,从而宕机,hack成功!
- ⚠️highWaterMark注意点:传统stream中highWaterMark只是一个警示作用,而不是强制行为,也就是意味着超过了highWaterMark将任然可以继续往内置队列里填充数据,直到超过内存限制
核心知识点
node
能使用的内存大小?(为什么不说web,虽然没有刻意去了解web,V8内存管理和node一致;但其他GUI渲染内存加上去绝对和node内存不一样)
这个相信大家都知道,新生代(32 位系统分配 16M 的内存空间,64 位系统翻倍 32M),老生代(64位系统下约为1.4GB,32位系统下约为0.7GB),也就是我们能用V8进行内存管理js堆内存只有1.4G;所以如果有大量缓存数据,最好的办法是移除node之外,使用redis
处理;如果有1个G的文件需要给前端下载怎么办呢?流式永远是最好的解决方案,对于node,不,对于所有后台开发来说,节省内存最好的办法就是流式,流的作用就是读多少传多少,读1M数据传1M数据给前端,大大减轻了V8内存的负担
该方案,确实是一个解决方案;但是V8的各种垃圾回收算法同时也会降低效率(虽然底层会并发清理,但大内存空间消耗的时间一定是成正比的),本文不会对V8垃圾回收机制展开讲解,感兴趣的同学可以搜相关的只是:新生代的Scavenge算法(from-to通过空间换时间)
,老生代的Mark-Sweep(标记扫除)
、Mark-Compact(标记压缩)时间换空间做法
,V8确认一个数据需要被垃圾回收而又不影响其他堆数据的使用三色标记法(增量标记、强三原色、写屏障这些来保证一个数据被回收而不影响应用正常运行)
为什么node有了传统的stream又弄出一个web stream模块
- 此web的含义对应的是前端,而不是web TCP双工流
- 该流行为与前端(即浏览器内的流行为、api一致)
- MDN参考前端流:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API
readable可读流
const { ReadableStream } = require("node:stream/web");
const { setInterval, setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");
const { Buffer } = require("node:buffer");
const readable = new ReadableStream({
async start(controller) {
console.log("start.");
},
async pull(controller) {
await timer(100);
const val = performance.now();
controller.enqueue(val);
console.log("队列剩余容量", controller.desiredSize);
},
cancel(reason) {
console.log(reason);
},
},
{
highWaterMark: 5,
size(chunk) {
return 1;
},
});
(async () => {
const reader = readable.getReader();
for (let index = 1; index <= 5; index++) {
console.log(await reader.read());
}
setTimeout(async () => {
console.log(await reader.read());
console.log(await reader.read());
console.log(await reader.read());
}, 2000);
})();
writeable可写流
const { ReadableStream, WritableStream } = require("node:stream/web");
const { setInterval, setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");
const { Buffer } = require("node:buffer");
const readable = new ReadableStream(
{
async pull(controller) {
await timer(500);
const val = performance.now();
controller.enqueue(val);
console.log("队列剩余容量", controller.desiredSize);
},
},
{
highWaterMark: 5,
size(chunk) {
return 1;
},
},
);
const writeable = new WritableStream({
write(chunk) {
console.log("写入流接收到的数据", chunk);
},
});
(async () => {
const writer = writeable.getWriter();
for await (const value of readable) {
writer.write(value);
}
})();
MDNfor await of异步迭代器:https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Statements/for-await…of
ReadableStream支持异步迭代器:https://nodejs.org/docs/latest-v18.x/api/webstreams.html#async-iteration
readable结合writeable测试背压
- 内置了背压,到达阈值生产者会停止读取,等待消费者消费结束
const { ReadableStream, WritableStream } = require("node:stream/web");
const { setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");
const readable = new ReadableStream(
{
async pull(controller) {
await timer(100);
const val = performance.now();
controller.enqueue(val);
console.log("队列剩余容量", controller.desiredSize);
},
},
{
highWaterMark: 5,
size(chunk) {
return 1;
},
},
);
const writeable = new WritableStream({
async write(chunk, controller) {
await timer(1000);
console.log("写入流接收到的数据", chunk);
},
});
(async () => {
const writer = writeable.getWriter();
for await (const value of readable) {
await writer.write(value);
}
})();
Transform双工转换流
- 这个玩意就比较牛了,不仅是个双工流(即能读也能写如TCP网络流),还可以进行转换
const { TransformStream } = require("node:stream/web");
const transform = new TransformStream(
{
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
flush(controller) {
console.log("写入流关闭!");
},
},
{
highWaterMark: 5,
size() {
return 1;
},
},
{
highWaterMark: 5,
size() {
return 1;
},
},
);
(async () => {
const writer = transform.writable.getWriter();
const reader = transform.readable.getReader();
await writer.write("abc");
const value = await reader.read();
console.log(value);
writer.close();
})();
web stream与传统的流的区别
- highwatermark水平线:web stream对于阈值强制要求,小于等于阈值 <= 内置队列容量将停止读取,反之继续读取
相较于传统的stream,web stream内置了背压机制
- 如果不用
pipe()
管道,也无需刻意注意背压机制,web stream底层已经帮我们处理,即highwatermark为强制背压的水平线 - 将可读流与可写流定义好后,使用管道、或者手动写入读取,这块业务部分与流内部工作
解耦
了 - 与前端web流式相兼容,前后端统一
官方提供了一些工具流
- 这些工具流在我的mac node 18.12.0上不起作用,猜测原因可能是未开发完毕且这些工具流完全可以自己用Transform双工转换流来实现,无伤大雅。
参考文献
-
MDN流API文档:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API
-
Nodejs官方web stream API文档:https://nodejs.org/docs/latest-v16.x/api/webstreams.html#class-readablestream
-
内置队列MDN:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API/Concepts#%E5%86%85%E7%BD%AE%E9%98%9F%E5%88%97%E5%92%8C%E9%98%9F%E5%88%97%E7%AD%96%E7%95%A5
-
Node通过传统流的攻击手段:https://nodejs.org/docs/latest-v18.x/api/stream.html#writablewritechunk-encoding-callback
-
MDNfor await of异步迭代器:https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Statements/for-await…of
-
ReadableStream支持异步迭代器:[https://nodejs.org/docs/latest-v18.x/api/webstreams.html#async-iteration](
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)