Bull队列:当作业失败时,如何停止队列处理剩余作业?

2023-12-10

我在用bull队列来处理一些作业。在当前场景中,每个作业都是某种操作。因此,每当队列中的操作列表中的操作(作业)失败时,队列就必须停止处理剩余的作业(操作)。

到目前为止我尝试过什么?

所以我尝试在特定作业失败时暂停队列。接下来,队列在耗尽时恢复。现在,当恢复时,队列不会从失败的作业开始,而是拾取下一个作业。

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' },
  limiter: { max: 1, duration: 1000 }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  setTimeout(() => {
    done(job.data.error);
  }, job.data.time);
});

let options = {
  attempts: 3,
  removeOnComplete: false, // removes job from queue on success
  removeOnFail: false // removes job from queue on failure
}

setTimeout(() => {
  myQueue.add('Type-1', { time: 10000, description: "Type-1 One", error: false }, options);
}, 1 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 5000, description: "Type-1 two", error: true }, options);
}, 2 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 3000, description: "Type-1 three", error: false }, options);
}, 3 * 1000);


myQueue.on('completed', function (job, result) {
  console.log("Completed: " + job.data.description);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: " + job.data.description);
  try {
    await myQueue.pause();
  } catch (error) {
    console.log(error);
  }
});

myQueue.on('drained', async function () {
  try {
    await myQueue.resume();
  } catch (error) {
    console.log(error);
  }
});

电流输出:

Current result

预期输出:如果Type-1 two第三次尝试成功完成。

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Completed: Type-1 two
Completed: Type-1 three

预期输出:如果Type-1 two第三次尝试也失败了。

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Failed: Type-1 two

我想要的只是队列必须停止处理新作业,直到当前作业完成且没有任何失败。如果发生任何故障,失败的作业必须运行一些x时间数。在x+1尝试它必须清除(删除所有作业)队列。那么如何在队列中实现这种线性行为。


In bull不可能在失败后立即重复相同的作业,然后再拾取队列中的下一个作业。

解决方案:

  1. 创建新作业并将其优先级设置为小于当前作业类型的值。
  2. 释放失败的作业(resolve() or done())
  3. 这项新工作将立即由bull进行加工。

示例代码:在下面的代码中Job-3将会失败并创造新的工作,依此类推,直到“工作的目的”在某个时间点成功。

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  console.log(`Processing Job-${job.id} Attempt: ${job.attemptsMade}`);
  downloadFile(job, async function (error) {
    if (error) {
      await repeatSameJob(job, done);
    } else {
      done();
    }
  });
});

async function repeatSameJob(job, done) {
  let newJob = await myQueue.add('Type-1', job.data, { ...{ priority: 1 }, ...job.opts });
  console.log(`Job-${job.id} failed. Creating new Job-${newJob.id} with highest priority for same data.`);
  done(true);
}

function downloadFile(job, done) {
  setTimeout(async () => {
    done(job.data.error)
  }, job.data.time);
}

myQueue.on('completed', function (job, result) {
  console.log("Completed: Job-" + job.id);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: Job-" + job.id);
});

let options = {
  removeOnComplete: true, // removes job from queue on success
  removeOnFail: true // removes job from queue on failure
}

for (let i = 1; i <= 5; i++) {
  let error = false;
  if (i == 3) { error = true; }

  setTimeout(i => {
    let jobData = {
      time: i * 2000,
      error: error,
      description: `Job-${i}`
    }
    myQueue.add('Type-1', jobData, options);
  }, i * 2000, i);
}

Output:

Output

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Bull队列:当作业失败时,如何停止队列处理剩余作业? 的相关文章

随机推荐

  • 如何在 PHP 中通过 SFTP 获取文件的最后几行

    我需要登录生产服务器检索文件并使用该文件中的数据更新我的数据库 由于这是一个生产数据库 我不想每 5 分钟获取一次整个文件 因为该文件可能很大 这可能会影响服务器 我需要每 5 分钟间隔获取该文件的最后 30 行 并尽可能减少影响 以下是我
  • 使用gson反序列化json数组?

    我的 json 为 status 200 data catId 638 catName Helena Bonham Carter catUniqueName helena bonham carter catSlug 我的类别模型为 publ
  • 如何在 R 的 apply 中取消类型转换(bit64 示例)

    我在一些 R 代码中使用 bit64 包 我创建了一个向量 64位整数 然后尝试使用sapply迭代这些 向量中的整数 这是一个例子 v c as integer64 1 as integer64 2 as integer64 3 sapp
  • F#:检查一个值是否是字符串数组、字符串数组的数组或字符串

    我需要使用match检查一个值是字符串数组还是字符串 我尝试过一些徒劳的事情 string gt string gt array
  • 二维数组,推入一个数组会出现在所有数组中吗?

    我是红宝石新手 所以我显然误解了一些东西 我打算创建一个大小为 2 的数组 其中每个元素本身就是一个数组 然后将项目推送到一个或另一个子数组 usr bin env ruby arr Array new 2 Array new puts d
  • 如何更新 Web Profiler 工具栏以显示有关 ajax 请求的数据

    我目前正在构建一个完全支持 ajax 页面加载的应用程序 初始页面加载后 浏览网站仅加载内容 而不加载标题或菜单 整个应用程序运行良好 但我想刷新 Web Profiler 工具栏以显示最后的 ajax 请求信息 我从响应标头中获取了 xd
  • python 将 pandas 数据帧、参数和函数传递给 scipy.optimize.minimize

    我正在尝试使用 SciPy 的 scipy optimize minimize 函数来最小化我创建的函数 然而 我试图优化的函数本身是由其他函数构建的 这些函数基于 pandas DataFrame 执行计算 据我所知 SciPy 的最小化
  • 编译 C++ 代码时出现 ios::nocreate 错误

    同时 在 RHEL 5 0 上编译一个用 C 编写的包 我收到以下错误 gt 错误 nocreate不是以下成员std ios 源代码对应于 ifstream tempStr argv 4 ios in ios nocreate 我努力了
  • 停止应用程序洞察

    我们为 Azure 门户中托管的 Web 应用程序安装了 Application Insights 现在它发送报告等 应用程序刚刚启动 我们不需要所有这些数据 有没有办法完全停止 禁用 Application Insights 的所有数据收
  • Node js 在复制之前检查文件是否打开

    在将文件复制到另一个位置之前 我需要检查文件是否已打开 下面的代码告诉您打开文件时是否显示错误代码 EBUSY 但如果文件未打开 它将删除文件的内容 有没有更好的方法来获取这些信息 fs open my file dwg w functio
  • 如何估计 R 中图形线上的点的坐标?

    假设我有数据 x lt c 1900 1930 1944 1950 1970 1980 1983 1984 y lt c 100 300 500 1500 2500 3500 4330 6703 然后 我绘制这些数据并在已知的 x 和 y
  • Ivy 无法解析依赖关系的范围,该依赖关系是传递依赖关系的依赖关系

    我向 ivy xml 添加一个依赖项 让我们将其命名为 A 该文件在 Maven 中心有一个 pom 文件 Ivy 使用 ibiblio 来解决 Maven 依赖关系 添加到ivy xml 的依赖项 A 具有传递依赖项 B 到目前为止一切顺
  • $_Session 登录和注销 php 的“复杂性”

    我正在为大学做一个数据库项目 我在这里遇到了一个问题 我试图在没有会话时显示 登录 在有会话时显示 注销 但实际上即使在我登录后它仍然显示我 登录 并且我尝试 print r SESSION 它告诉我该变量未定义 我有一个注销文件 和一个
  • PHP 发送 UTF-8 邮件而不使用 PEAR::Mail PEAR::Mail_Mime

    我希望能够使用包含 8 位字符 的 PHP mail 发送电子邮件 它们将用在主题 消息和 发件人 标题中 在不使用 PEAR 包的情况下如何做到这一点 如果您不介意对不需要的单词进行编码 最简单的解决方案是将所有内容放入 base64 R
  • 上传文件之前生成 Firebase 存储下载 url

    Firebase Storage 有没有办法生成一个指向任何内容的下载网址 以便稍后将文件上传到该网址 类似的东西 在 Kotlin 中 fun generateItemPhotoUrl id String storageRef child
  • 调用链内的空合并

    如果我有一长串对象 每个对象都有可能在 Linq where 子句中返回 null 例如 SomeSource Where srcItem gt srcItem DataMembers SomeText Connection Connect
  • Realm 返回空对象列表

    我想从 Farmer 对象中获取 CropDataList 当我获取 Farmer 对象时 它不为空 但与 Farmer 对象关联的作物数据列表返回空 我可以通过 Stetho 查看数据库条目 并且列表中有一个条目 这是我的代码 publi
  • 使用 LINQ(赛程列表)形成锦标赛表

    I have an array of players string and now I need to get an array of pairs representing games playerN playerM to orginize
  • httr POST 请求 API 返回 400 错误

    我正在使用 R 中的 httr 包来尝试查询 postcode io API http postcodes io docs 我可以按照说明成功查询单个邮政编码 sample4 lt GET api postcodes io postcode
  • Bull队列:当作业失败时,如何停止队列处理剩余作业?

    我在用bull队列来处理一些作业 在当前场景中 每个作业都是某种操作 因此 每当队列中的操作列表中的操作 作业 失败时 队列就必须停止处理剩余的作业 操作 到目前为止我尝试过什么 所以我尝试在特定作业失败时暂停队列 接下来 队列在耗尽时恢复