如何将多个 api 请求中的多个可读流传输到单个可写流?

2024-05-18

- 期望的行为
- 实际行为
- 我尝试过的
- 重现步骤
- 研究


期望的行为

将从多个 api 请求接收到的多个可读流传输到单个可写流。

API 响应来自 ibm-watsontextToSpeech.synthesize() https://cloud.ibm.com/apidocs/text-to-speech?code=node#synthesize-audio method.

需要多个请求的原因是该服务有一个5KB文本输入限制。

因此一串18KB例如,需要四个请求才能完成。

实际行为

可写流文件不完整,出现乱码。

该应用程序似乎“挂起”。

当我尝试打开不完整的.mp3文件在音频播放器中,它说它已损坏。

打开和关闭文件的过程似乎会增加其文件大小 - 就像打开文件会以某种方式提示更多数据流入其中一样。

输入越大,不良行为越明显,例如四个 4000 字节或更少的字符串。

我尝试过的

我尝试了多种方法使用 npm 包将可读流传输到单个可写流或多个可写流组合流 https://www.npmjs.com/package/combined-stream, 组合流2 https://www.npmjs.com/package/combined-stream2, 多流 https://www.npmjs.com/package/multistream and archiver https://www.npmjs.com/package/archiver它们都会导致文件不完整。我的最后一次尝试没有使用任何软件包,并显示在Steps To Reproduce下面的部分。

因此,我质疑我的应用程序逻辑的每个部分:

01.Watson 文本转语音 api 请求的响应类型是什么?

The 文本转语音文档 https://cloud.ibm.com/apidocs/text-to-speech?code=node#synthesize-audio,假设 api 响应类型是:

Response type: NodeJS.ReadableStream|FileObject|Buffer

我很困惑,响应类型是三种可能的事情之一。

在我所有的尝试中,我一直假设它是readable stream.

02.我可以在一个地图函数中发出多个 api 请求吗?

03.我可以将每个请求包装在一个promise()并解决response?

04.我可以将结果数组分配给promises多变的?

05.我可以声明吗var audio_files = await Promise.all(promises)?

06.在此声明之后,所有响应都“完成”了吗?

07.如何正确地将每个响应传送到可写流?

08.如何检测所有管道何时完成,以便我可以将文件发送回客户端?

对于问题 2 - 6,我假设答案是“是”。

我认为我的失败与问题7和8有关。

重现步骤

您可以使用四个随机生成的文本字符串的数组来测试此代码,每个字符串的字节大小为3975, 3863, 3974 and 3629字节 -这是该数组的粘贴箱 https://pastebin.com/raw/JkK8ehwV.

// route handler
app.route("/api/:api_version/tts")
    .get(api_tts_get);

// route handler middleware
const api_tts_get = async (req, res) => {

    var query_parameters = req.query;

    var file_name = query_parameters.file_name;
    var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV

    var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
    var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root

    // for each string in an array, send it to the watson api  
    var promises = text_string_array.map(text_string => {

        return new Promise((resolve, reject) => {

            // credentials
            var textToSpeech = new TextToSpeechV1({
                iam_apikey: iam_apikey,
                url: tts_service_url
            });

            // params  
            var synthesizeParams = {
                text: text_string,
                accept: 'audio/mp3',
                voice: 'en-US_AllisonV3Voice'
            };

            // make request  
            textToSpeech.synthesize(synthesizeParams, (err, audio) => {
                if (err) {
                    console.log("synthesize - an error occurred: ");
                    return reject(err);
                }
                resolve(audio);
            });

        });
    });

    try {
        // wait for all responses
        var audio_files = await Promise.all(promises);
        var audio_files_length = audio_files.length;

        var write_stream = fs.createWriteStream(`${relative_path}.mp3`);

        audio_files.forEach((audio, index) => {

            // if this is the last value in the array, 
            // pipe it to write_stream, 
            // when finished, the readable stream will emit 'end' 
            // then the .end() method will be called on write_stream  
            // which will trigger the 'finished' event on the write_stream    
            if (index == audio_files_length - 1) {
                audio.pipe(write_stream);
            }
            // if not the last value in the array, 
            // pipe to write_stream and leave open 
            else {
                audio.pipe(write_stream, { end: false });
            }

        });

        write_stream.on('finish', function() {

            // download the file (using absolute_path)  
            res.download(`${absolute_path}.mp3`, (err) => {
                if (err) {
                    console.log(err);
                }
                // delete the file (using relative_path)  
                fs.unlink(`${relative_path}.mp3`, (err) => {
                    if (err) {
                        console.log(err);
                    }
                });
            });

        });


    } catch (err) {
        console.log("there was an error getting tts");
        console.log(err);
    }

}

The 官方示例 https://cloud.ibm.com/apidocs/text-to-speech?code=node#synthesize-audio shows:

textToSpeech.synthesize(synthesizeParams)
  .then(audio => {
    audio.pipe(fs.createWriteStream('hello_world.mp3'));
  })
  .catch(err => {
    console.log('error:', err);
  });

据我所知,这似乎适用于单个请求,但不适用于多个请求。

Research

关于可读和可写流、可读流模式(流动和暂停)、'data'、'end'、'drain' 和 'finish' 事件、pipe()、fs.createReadStream() 和 fs.createWriteStream()


几乎所有 Node.js 应用程序,无论多么简单,都以某种方式使用流......

const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream

let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');

// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});

// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});

https://nodejs.org/api/stream.html#stream_api_for_stream_consumers https://nodejs.org/api/stream.html#stream_api_for_stream_consumers


可读流有两种主要模式影响我们使用它们的方式......它们可以是paused模式或在flowing模式。默认情况下,所有可读流都以暂停模式启动,但可以轻松切换到flowing然后回到paused当需要时...只需添加一个data事件处理程序将暂停的流切换到flowing模式并删除data事件处理程序将流切换回paused模式。

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93 https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


以下是可与可读和可写流一起使用的重要事件和函数的列表

可读流上最重要的事件是:

The data事件,每当流将一块数据传递给消费者时就会发出该事件 这end事件,当流中不再有数据可消耗时发出该事件。

可写流上最重要的事件是:

The drainevent,这是可写流可以接收更多数据的信号。 这finish事件,当所有数据都已刷新到底层系统时发出。

https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93 https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93


.pipe()负责监听来自的“数据”和“结束”事件fs.createReadStream().

https://github.com/substack/stream-handbook#why-you-should-use-streams https://github.com/substack/stream-handbook#why-you-should-use-streams


.pipe()只是一个函数,它接受可读源流 src 并将输出挂钩到目标可写流dst

https://github.com/substack/stream-handbook#pipe https://github.com/substack/stream-handbook#pipe


返回值pipe()method 是目标流

https://flaviocopes.com/nodejs-streams/#pipe https://flaviocopes.com/nodejs-streams/#pipe


默认情况下,流.end() https://nodejs.org/api/stream.html#stream_writable_end_chunk_encoding_callback在目的地被调用Writable源时流Readable流发出'end',使得目的地不再可写。要禁用此默认行为,end选项可以传递为false,导致目标流保持打开状态:

https://nodejs.org/api/stream.html#stream_read_pipe_destination_options https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options


The 'finish'事件在之后发出stream.end()方法已被调用,所有数据已刷新到底层系统。

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.log('All writes are now complete.');
});

https://nodejs.org/api/stream.html#stream_event_finish https://nodejs.org/api/stream.html#stream_event_finish


如果您尝试读取多个文件并将它们传输到可写流,则必须将每个文件传输到可写流并传递end: false这样做时,因为默认情况下,当没有更多数据可供读取时,可读流会结束可写流。这是一个例子:

var ws = fs.createWriteStream('output.pdf');

fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);

https://stackoverflow.com/a/30916248 https://stackoverflow.com/a/30916248


您想将第二次读取添加到事件监听器中,以便第一次读取完成......

var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
  b.pipe(c)
}

https://stackoverflow.com/a/28033554 https://stackoverflow.com/a/28033554


节点流简史 - 部分one https://medium.com/the-node-js-collection/a-brief-history-of-node-streams-pt-1-3401db451f21 and two https://medium.com/the-node-js-collection/a-brief-history-of-node-streams-pt-2-bcb6b1fd7468.


相关谷歌搜索:

如何将多个可读流传输到单个可写流?节点js

涉及相同或相似主题的问题,没有权威答案(或者可能“过时”):

如何将多个 ReadableStream 传输到单个 WriteStream? https://stackoverflow.com/questions/54486160/how-to-pipe-multiple-readablestreams-to-a-single-writestream

通过不同的可读流两次管道传输到相同的可写流 https://stackoverflow.com/questions/46504589/piping-to-same-writable-stream-twice-via-different-readable-stream

通过管道将多个文件传输到一个响应 https://stackoverflow.com/questions/10462649/pipe-multiple-files-to-one-response

从两个管道流创建 Node.js 流 https://stackoverflow.com/questions/17471659/creating-a-node-js-stream-from-two-piped-streams


这里要解决的核心问题是异步性。您几乎已经完成了:您发布的代码的问题是您将所有源流并行且无序地输送到目标流中。这意味着data块将从不同的音频流中随机流动 - 甚至是您的end事件将超过pipe没有end过早关闭目标流,这可能解释了为什么它在重新打开后会增加。

你想要的是按顺序管道它们 - 你甚至在引用时发布了解决方案

您想将第二次读取添加到事件监听器中,以便第一次读取完成......

或作为代码:

a.pipe(c, { end:false });
a.on('end', function() {
  b.pipe(c);
}

这会将源流按顺序传输到目标流中。

采取你的代码这意味着替换audio_files.forEach循环:

await Bluebird.mapSeries(audio_files, async (audio, index) => {  
    const isLastIndex = index == audio_files_length - 1;
    audio.pipe(write_stream, { end: isLastIndex });
    return new Promise(resolve => audio.on('end', resolve));
});

注意使用bluebird.js 地图系列 http://bluebirdjs.com/docs/api/promise.mapseries.html here.

关于您的代码的进一步建议:

  • 你应该考虑使用洛达什.js https://lodash.com/docs
  • 你应该使用const & let代替var并考虑使用camelCase
  • 当您注意到“它适用于一个事件,但无法处理多个事件”时,请始终思考:异步性、排列、竞争条件。

进一步阅读,组合本机节点流的局限性:https://github.com/nodejs/node/issues/93 https://github.com/nodejs/node/issues/93

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将多个 api 请求中的多个可读流传输到单个可写流? 的相关文章

随机推荐

  • 哪个 Swing 布局管理器可以获得我想要的布局?

    我正在尝试按照这个模型制作一个基本的登录菜单 我决定将整个菜单放入 JPanel 中 以便在连接成功后我可以切换到另一个面板 所以我决定使用 Borderlayout 将标题放在北区 将连接按钮放在南区 我将边框布局的中心本身设置为面板 我
  • 实体框架 - 绑定 WPF 树视图控件

    在服务类别表中 ParentCategoryId 是 ServiceCategoryId 它是父类别 我的类别可以有第 n 级层次结构 因此我需要使用树视图控件来呈现它 我怎样才能做到这一点 Thanks 你可以使用分层数据模板 http
  • gem 无法访问 ruby​​gems.org

    我有一个带有 Rackspace 的服务器 用于一些 Ruby 站点 当我尝试时bundle install在一个新网站上我得到 Retrying download gem from http rubygems org due to err
  • Dart 从 UInt8List 获取扩展

    我正在使用该包图像选择器 https pub dev packages image picker接受来自用户的图像 这会产生 PickedFile 和字节数组 由于图像随后被上传 我想知道如何从字节数组中猜测 mime 类型 PickedF
  • conio.h 不包含 textcolor()?

    我一直在考虑在我用 C 编写的 DOS 程序中使用颜色 有人告诉我conio h有textcolor 函数 但是当我在代码中使用它时 编译器 链接器会向我抛出错误 说我对该函数有未定义的引用 Does conio h真的有这个功能还是有人告
  • UTF-8、PHP、Win7 - 现在是否有解决方案可以使用 php 在 Win 7 上保存 UTF-8 文件名?

    更新 只是为了不让您阅读所有内容 PHP 开头 7 1 0alpha2 在 Windows 上支持 UTF 8 文件名 感谢阿纳托尔 贝尔斯基 根据 stackoverflow 上的一些链接 我找到了部分答案 https stackover
  • 从镜像创建 Docker 容器而不启动它

    作为我的部署策略的一部分 我使用 Upstart 管理 Docker 容器 为此 我需要从注册表中提取图像并创建一个命名容器 如建议的那样 用于运行容器的新贵脚本不会管理生命周期 https stackoverflow com questi
  • 在javascript中访问函数内的实例变量?

    如何以最简单的方式访问函数内的实例变量 function MyObject Instance variables this handler Methods this enableHandler function var button doc
  • Spring3/Hibernate3/TestNG:有些测试给出 LazyInitializationException,有些则没有

    前言 我在单元测试中遇到了 LazyInitializationException 的问题 而且我很难理解它 正如你从我的问题中看到的那样Spring 中的数据库会话 https stackoverflow com questions 13
  • 获取GridView中选定行的索引

    我想使用复选框获取从 gridview 选择的行 复选框是这样的
  • 这种类型注释在没有 TypeScript 的 React 代码中如何工作?

    我在看这段代码示例 https reacttraining com react router web example auth workflow在 ReactRouter 页面上 这篇文章很有趣 const PrivateRoute com
  • 将过滤器添加到 Eclipse 中的 Project Explorer

    我想向 Project Explorer 添加一个新的过滤器 以向用户隐藏一些在 Eclipse RCP 应用程序中自动创建的项目 到目前为止我已经找到了两个扩展点 org eclipse ui ide resourceFilters 允许
  • 电报机器人预填充文本供用户编辑

    Telegram 机器人向我发送一条带有文本片段的消息 我想要编辑该文本片段 然后发送回机器人进行进一步处理 复制和粘贴需要时间 重新输入消息需要时间 理想情况下 我想按机器人消息上的内联按钮 编辑 并使消息文本出现在我的回复输入框中进行编
  • 如何用约束标记一大组“传递群”?

    在 NealB解决方案之后进行编辑 与以下解决方案相比 NealB的解决方案非常非常快任何另一个 https stackoverflow com q 18033115 answers and 提出了关于 添加约束以提高性能 的新问题 Nea
  • Java 正则表达式 - 字母数字,最多一个连字符,句点或下划线,七个字符长

    我是 Java 正则表达式工具的新手 尽管它们潜力巨大 但我很难完成这项任务 我想编写一个正则表达式来验证遵循以下语法的输入字符串 小写字母和数字的任意组合 仅一个下划线 一个破折号或一个句号 无其他特殊字符 最小长度为 5 我想出了以下解
  • 重写 jmeter.properties

    是否可以在不关闭并重新启动JMeter的情况下重新读取jmeter properties 也许有任何额外的类会触发这个过程 有时我需要使用属性的值 并且每次都重新启动 JMeter 并不方便 所以我想自动化此过程 无需从文件中重新读取 jm
  • Swagger for Micronaut 与 Maven

    我想从 Spring Boot 2 切换到 Micronaut 框架 而且我在 Swagger 设置上遇到了困难 在 Spring Boot 2 项目中 我有以下依赖项
  • magento成功页面变量

    我正在尝试捕获一些 magento 成功页面变量以传递给我们的广告公司 到目前为止 我已经得到了这个 但变量没有输出任何内容 数据需要采用以下格式 price1 price2 price3 qty1 qty2 qty3 sku1 sku2
  • 使用特定的类/函数预加载 Jupyter Notebook

    我想预加载一个笔记本 其中包含我在另一个文件中定义的特定类 函数 更具体地说 我想用 python 来做到这一点 比如加载一个配置文件 包含所有相关的类 函数 目前 我正在使用 python 生成笔记本并在服务器上自动启动它们 因为不同的
  • 如何将多个 api 请求中的多个可读流传输到单个可写流?

    期望的行为 实际行为 我尝试过的 重现步骤 研究 期望的行为 将从多个 api 请求接收到的多个可读流传输到单个可写流 API 响应来自 ibm watsontextToSpeech synthesize https cloud ibm c