深入理解node的web stream模块

2023-05-16

深入理解node的web stream模块

  • 提示:需要掌握node传统的流以及事件机制
  • node环境:v16.5.0+
  • 一下内容全部以node v18.12.0实验为基础
  • 如果观看期间发现了一些不认识的api,那就是我在用node18的api,可以自行观看node官方文档,很简单的!😄

专业术语

  • 内置队列或缓存:可以理解为node根据流输入的数据,用一个链表数据结构建立的缓存,读取、写出的内容都需要经过缓存。(参考专业说法:内置队列MDN)

  • highWaterMark水平线或阈值:内置队列需要设立上限,否则会突破node的内存限制大小,从而成为一种攻击手段https://nodejs.org/docs/latest-v18.x/api/stream.html#writablewritechunk-encoding-callback

攻击手段:A给node服务发送world,让它帮A转换html文件,A扮演着发送流,也扮演着接受流;此时A只发送,但是决绝接受,一旦这个文件超过node内存限制,也就意味着这个node服务将会内存泄露,从而宕机,hack成功!

  • ⚠️highWaterMark注意点:传统stream中highWaterMark只是一个警示作用,而不是强制行为,也就是意味着超过了highWaterMark将任然可以继续往内置队列里填充数据,直到超过内存限制

核心知识点

  • node能使用的内存大小?(为什么不说web,虽然没有刻意去了解web,V8内存管理和node一致;但其他GUI渲染内存加上去绝对和node内存不一样)

这个相信大家都知道,新生代(32 位系统分配 16M 的内存空间,64 位系统翻倍 32M),老生代(64位系统下约为1.4GB,32位系统下约为0.7GB),也就是我们能用V8进行内存管理js堆内存只有1.4G;所以如果有大量缓存数据,最好的办法是移除node之外,使用redis处理;如果有1个G的文件需要给前端下载怎么办呢?流式永远是最好的解决方案,对于node,不,对于所有后台开发来说,节省内存最好的办法就是流式,流的作用就是读多少传多少,读1M数据传1M数据给前端,大大减轻了V8内存的负担

  • 为什么不将V8内存设置很大?

该方案,确实是一个解决方案;但是V8的各种垃圾回收算法同时也会降低效率(虽然底层会并发清理,但大内存空间消耗的时间一定是成正比的),本文不会对V8垃圾回收机制展开讲解,感兴趣的同学可以搜相关的只是:新生代的Scavenge算法(from-to通过空间换时间),老生代的Mark-Sweep(标记扫除)Mark-Compact(标记压缩)时间换空间做法,V8确认一个数据需要被垃圾回收而又不影响其他堆数据的使用三色标记法(增量标记、强三原色、写屏障这些来保证一个数据被回收而不影响应用正常运行)

为什么node有了传统的stream又弄出一个web stream模块

  • 此web的含义对应的是前端,而不是web TCP双工流
  • 该流行为与前端(即浏览器内的流行为、api一致)
  • MDN参考前端流:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API

readable可读流

  • 基本使用
const { ReadableStream } = require("node:stream/web");
const { setInterval, setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");
const { Buffer } = require("node:buffer");

const readable = new ReadableStream({
  // 开始事件
  async start(controller) {
    console.log("start.");
  },

  // 当内置队列未满时,一直读取,如果为异步则等待异步完成后再次调用
  async pull(controller) {
    await timer(100); // 500ms 读取一次
    const val = performance.now();
    controller.enqueue(val);
    console.log("队列剩余容量", controller.desiredSize);
  },

  // 取消事件 可以通过reader.cancel()方法取消流pull读取事件
  cancel(reason) {
    console.log(reason);
  },
},
{
  highWaterMark: 5, // 水平线
  // 根据返回的number大小,水平线 - size返回的大小 = 当前剩余容量(controller.desiredSize)
  size(chunk) {
    return 1;
  },
});

(async () => {
  // 消费5次
  const reader = readable.getReader(); // 默认的reader实例,允许js值(如:对象...)
  for (let index = 1; index <= 5; index++) {
    console.log(await reader.read());
  }

  // 2s后消费3次
  setTimeout(async () => {
    console.log(await reader.read());
    console.log(await reader.read());
    console.log(await reader.read());
  }, 2000);
})();

/*
// 开始事件
start.

// 这块生产消费同时在进行所以,内置队列大小没变
队列剩余容量 5
{ value: 201.76770899817348, done: false }
队列剩余容量 5
{ value: 304.59966699779034, done: false }
队列剩余容量 5
{ value: 406.3125419989228, done: false }
队列剩余容量 5
{ value: 508.1209169998765, done: false }
队列剩余容量 5
{ value: 611.398583997041, done: false }

// 一直读取中
队列剩余容量 4
队列剩余容量 3
队列剩余容量 2
队列剩余容量 1
队列剩余容量 0
// 读取完毕到达阈值(内置队列容量为0)

// 定时器2s,开始消费
{ value: 655.7073750011623, done: false }
{ value: 757.9737910032272, done: false }
{ value: 859.5705410018563, done: false }

// 消费了3个自然要读取3个
队列剩余容量 2
队列剩余容量 1
队列剩余容量 0
*/

writeable可写流

  • 将可读流与可写流连通
const { ReadableStream, WritableStream } = require("node:stream/web");
const { setInterval, setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");
const { Buffer } = require("node:buffer");

// 可读流
const readable = new ReadableStream(
  {
    async pull(controller) {
      await timer(500); // 500ms 读取一次
      const val = performance.now();
      controller.enqueue(val);
      console.log("队列剩余容量", controller.desiredSize);
    },
  },
  {
    highWaterMark: 5,
    size(chunk) {
      return 1;
    },
  },
);

// 可写流
const writeable = new WritableStream({
  write(chunk) {
    console.log("写入流接收到的数据", chunk);
  },
});

(async () => {
  const writer = writeable.getWriter();
  // 不使用Reader读取器消费,可以使用for await来进行消费,将读取到的数据写入到写入流里
  for await (const value of readable) {
    writer.write(value);
  }
})();

/*
队列剩余容量 5
写入流接收到的数据 539.5047909989953
队列剩余容量 5
写入流接收到的数据 1051.5886659994721
队列剩余容量 5
写入流接收到的数据 1553.0724160000682
队列剩余容量 5
写入流接收到的数据 2055.640707999468
队列剩余容量 5
写入流接收到的数据 2558.0102079994977
... // 一边生产一边消费
*/

MDNfor await of异步迭代器:https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Statements/for-await…of

ReadableStream支持异步迭代器:https://nodejs.org/docs/latest-v18.x/api/webstreams.html#async-iteration

readable结合writeable测试背压

  • 内置了背压,到达阈值生产者会停止读取,等待消费者消费结束
const { ReadableStream, WritableStream } = require("node:stream/web");
const { setTimeout: timer } = require("node:timers/promises");
const { performance } = require("node:perf_hooks");

// 可读流
const readable = new ReadableStream(
  {
    async pull(controller) {
      await timer(100); // 100ms 读取一次
      const val = performance.now();
      controller.enqueue(val);
      console.log("队列剩余容量", controller.desiredSize);
    },
  },
  {
    highWaterMark: 5,
    size(chunk) {
      return 1;
    },
  },
);

// 可写流 1s钟读取一次
const writeable = new WritableStream({
  async write(chunk, controller) {
    await timer(1000);
    console.log("写入流接收到的数据", chunk);
  },
});

(async () => {
  // 效果:当reader读完满内置队列之后,writer只有写入完成后,reader才会继续读,强制当水平线
  const writer = writeable.getWriter();
  for await (const value of readable) {
    await writer.write(value);
  }
})();

/*
// 生产者读取的很快
队列剩余容量 5
队列剩余容量 4
队列剩余容量 3
队列剩余容量 2
队列剩余容量 1
队列剩余容量 0
// 生产者读取到达阈值,停止读取
写入流接收到的数据 141.93745799735188 // 消费成功
队列剩余容量 0 // 消费一个,读一个
写入流接收到的数据 249.89816699922085
队列剩余容量 0
写入流接收到的数据 351.6370829977095
队列剩余容量 0
写入流接收到的数据 453.2700829990208
队列剩余容量 0
...
*/

Transform双工转换流

  • 这个玩意就比较牛了,不仅是个双工流(即能读也能写如TCP网络流),还可以进行转换
const { TransformStream } = require("node:stream/web");

const transform = new TransformStream(
  {
    // 可写流写入出发转换过程
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },

    // 写入流关闭执行
    flush(controller) {
      console.log("写入流关闭!");
    },
  },
  // 可写流阈值配置
  {
    highWaterMark: 5,
    size() {
      return 1;
    },
  },
  // 可读流阈值配置
  {
    highWaterMark: 5,
    size() {
      return 1;
    },
  },
);

(async () => {
  const writer = transform.writable.getWriter();
  const reader = transform.readable.getReader();
  await writer.write("abc");
  const value = await reader.read();
  console.log(value);
  writer.close();
})();

/*
{ value: 'ABC', done: false }
写入流关闭!
*/

web stream与传统的流的区别

  • highwatermark水平线:web stream对于阈值强制要求,小于等于阈值 <= 内置队列容量将停止读取,反之继续读取

相较于传统的stream,web stream内置了背压机制

  • 如果不用pipe()管道,也无需刻意注意背压机制,web stream底层已经帮我们处理,即highwatermark为强制背压的水平线
  • 将可读流与可写流定义好后,使用管道、或者手动写入读取,这块业务部分与流内部工作解耦
  • 与前端web流式相兼容,前后端统一

官方提供了一些工具流

  • 这些工具流在我的mac node 18.12.0上不起作用,猜测原因可能是未开发完毕且这些工具流完全可以自己用Transform双工转换流来实现,无伤大雅。

参考文献

  • MDN流API文档:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API

  • Nodejs官方web stream API文档:https://nodejs.org/docs/latest-v16.x/api/webstreams.html#class-readablestream

  • 内置队列MDN:https://developer.mozilla.org/zh-CN/docs/Web/API/Streams_API/Concepts#%E5%86%85%E7%BD%AE%E9%98%9F%E5%88%97%E5%92%8C%E9%98%9F%E5%88%97%E7%AD%96%E7%95%A5

  • Node通过传统流的攻击手段:https://nodejs.org/docs/latest-v18.x/api/stream.html#writablewritechunk-encoding-callback

  • MDNfor await of异步迭代器:https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Statements/for-await…of

  • ReadableStream支持异步迭代器:[https://nodejs.org/docs/latest-v18.x/api/webstreams.html#async-iteration](

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

深入理解node的web stream模块 的相关文章

  • 媒体播放器准备时的进度条

    我试图弄清楚如何在我的媒体播放器准备流文件时显示 正在加载 请稍候 的进度条 现在发生的事情是在歌曲准备好后显示 我怎样才能解决这个问题 mediaPlayerLoadingBar ProgressDialog show PlaylistA
  • 在 FFmpeg 中使用 -filter_complex amerge 时混合流

    我目前遇到 ffmpeg 及其过滤器之一的问题 我正在尝试将视频的 2 个音频流合并为一个 为此我尝试了这个命令 ffmpeg i home maniaplanet Videos ManiaPlanet 2014 08 21 20 09 1
  • 函数默认参数有些问题?

    看到这个 let foo outer function bar func x gt foo let foo inner console log func bar outer 我想知道为什么输出是 外部 而不是 内部 我知道 JavaScri
  • Electron 应用程序可以与 java 代码集成吗?

    由于node js仍然缺乏Java中存在的重要功能 因此我想使用Java而不是node js 并使用Web语言 html js css 创建客户端 Electron 是跨平台的 java 也是跨平台的 因此似乎有一个能够两全其美的解决方案
  • 计算流数据的直方图 - 在线直方图计算

    我正在寻找一种算法来生成大量流数据的直方图 最大值和最小值事先未知 但标准差和平均值在特定范围内 我很欣赏你的想法 Cheers 我刚刚找到了一个解决方案 秒 从流式并行决策树算法构建在线直方图 论文的 2 2 该算法由 Hive 项目中的
  • 在 html 中创建子页面 [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 假设我有一个网站http www example com http www example com 如何为此页面创建更多子页面 即 w
  • 使用节点http代理转发http代理

    我正在使用 node http proxy 库来创建转发代理服务器 我最终计划使用一些中间件来动态修改 html 代码 这就是我的代理服务器代码的样子 var httpProxy require http proxy httpProxy c
  • Ruby On Rails - 在控制器中使用关注点

    可能的菜鸟警告 刚接触 RoR 我正在尝试在 RoR 中使用关注点 现在我只写了一个非常简单的问题 app controllers concerns foo rb module Foo extend ActiveSupport Concer
  • 如何在 iOS 中使用 AVPlayer 缓冲音频?

    我想播放来自互联网的流音频 我编写了播放流的代码 但它没有任何缓冲区 因此如果信号较弱 应用程序将停止播放音频 这是我的代码 import UIKit import AVFoundation import MediaPlayer impor
  • C# - 捕获 RTP 流并发送到语音识别

    我正在努力实现的目标 在 C 中捕获 RTP 流 将该流转发到 System Speech SpeechRecognitionEngine 我正在创建一个基于 Linux 的机器人 它将接受麦克风输入 将其发送给 Windows 机器 Wi
  • 从 Harp.js 中的 EJS 模板调用另一个文件上的 javascript 函数

    尝试使用 Harp js 制作一个网站 我使用 ejs 模板 并希望将一些有用的 javascript 函数存储在中央文件中 我怎么做 我尝试使用 但它不起作用 似乎js文件没有被解析 有任何想法吗 谢谢 尽管有多种方法 有时 可以实现这一
  • asp.NET 2.0网站无法访问App_Code中的类

    将我的网站部署到服务器后 我在访问课程时遇到问题 请注意 这是一个网络Site不是网络应用 错误是 编译器错误消息 CS0246 找不到类型或命名空间名称 Order 是否缺少 using 指令或程序集引用 版本信息 Microsoft N
  • 如何保护我的网站免遭 HTTrack 或其他软件的翻录?

    我最近获得了批准的网站模板主题森林 http themeforest net 我的网站流量过多 并注意到我在 Themeforest 上的演示被 HTTrack 等某些软件破坏 如果这种情况持续下去 该产品的销量最终可能会下降 那么 有什么
  • 为什么要使用除 div 以外的任何东西? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 在 Tomcat 上部署 Java Web 项目,无需 WAR 或 EAR

    我有一个 Java Web 项目 Struts Spring 在我的本地主机上完美运行 我必须将其部署在我的网站上 但虚拟主机提供的 Tomcat Manager 界面显示 由于安全原因 它无法上传 WAR 文件 当联系技术支持时 我被告知
  • Java selenium - 如何在 TimeoutException 之后正确刷新网页?

    ChromeOptions options new ChromeOptions options addExtensions new File extension 6 2 5 0 crx ZenMate options addExtensio
  • 烧瓶 - 404 未找到

    我是烧瓶开发的新手 这是我在烧瓶中的第一个程序 但它向我显示了这个错误 在服务器上找不到请求的 URL 如果您输入了网址 请手动检查拼写并重试 这是我的代码 from flask import Flask app Flask name ap
  • 重新加载更新的 javascript> 代码而不完全重新加载 html 页面

    我正在开发一个单页 Web 应用程序 它具有许多不同的功能和形式 当开发一个深度 我的意思是主页上没有的 功能时 我会经历这个循环 开发代码 编辑类和函数 刷新整个页面 一路点击 直到到达我需要测试的部分 有时加起来大约一分钟 测试新代码
  • powershell Invoke-WebRequest WebSession 不起作用

    我无法让以下代码工作 它似乎已登录 但随后返回带有 response 的登录页面 我猜这与回发有关 有办法解决这个问题吗 谢谢 login Invoke WebRequest Uri http www sqlpass org UserLog
  • 如何从文件系统访问 api window.showDirectoryPicker() 获取选定的目录路径

    当我选择一个文件夹时 我确实得到了 dirHandle 但无法弄清楚什么属性或方法将为我提供完整路径 const dirHandle await window showDirectoryPicker 所以类似 let path dirHan

随机推荐