长时间运行的任务被取消后如何正确清理

2023-11-21

我创建了一个类,其目的是抽象出对队列的并发访问的控制。

该类设计为在单个线程上实例化,由多个线程写入,然后从后续的单个线程中读取。

我在类中生成了一个长时间运行的任务,它将执行阻塞循环,并在项目成功出列时触发一个事件。

我的问题是:我是否实现了取消长时间运行的任务以及随后的清理/重置的正确使用CancellationTokenSource object?

理想情况下,我希望能够停止和重新启动活动对象,同时保持添加到队列的可用性。

我以 Peter Bromberg 的文章为基础:C# 4.0 中的生产者/消费者队列和 BlockingCollection

代码如下:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    public delegate void DeliverNextQueuedItemHandler<T>(T item);

public sealed class SOQueueManagerT<T> 
{

    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning { get; private set; }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public CancellationTokenSource CancellationTokenSource
    {
        get
        {
            if (_canceller == null)
                _canceller = new CancellationTokenSource();

            return _canceller;
        }
    }

    public SOQueueManagerT()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);

        IsRunning = false;
    }

    public void Start()
    {
        if (_listener == null)
        {


            IsRunning = true;

            _listener = Task.Factory.StartNew(() =>
            {

                while (!CancellationTokenSource.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {

                            OnNextItem(item);
                        }
                    }

                }
            },
            CancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            CancellationTokenSource.Cancel();
            CleanUp();
        }
    }

    public void Add(T item)
    {
        _queue.Add(item);
    }

    private void CleanUp()
    {
        _listener.Wait(2000);
        if (_listener.IsCompleted)
        {
            IsRunning = false;
            _listener = null;
            _canceller = null;
        }
    }


 }
}

UPDATE这就是我最终采取的做法。它并不完美,但到目前为止正在完成这项工作。

public sealed class TaskQueueManager<T> 
{
    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning
    {
        get
        {
            if (_listener == null)
                return false;
            else if (_listener.Status == TaskStatus.Running ||
                _listener.Status == TaskStatus.Created ||
                _listener.Status == TaskStatus.WaitingForActivation ||
                _listener.Status == TaskStatus.WaitingToRun ||
                _listener.IsCanceled)
                return true;
            else
                return false;
        }
    }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public TaskQueueManager()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);
    }

    public void Start()
    {
        if (_listener == null)
        {
            _canceller = new CancellationTokenSource();

            _listener = Task.Factory.StartNew(() =>
            {
                while (!_canceller.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {
                            try
                            {
                                OnNextItem(item);
                            }
                            catch (Exception e)
                            {
                                //log or call an event
                            }
                        }
                    }
                }
            },
            _canceller.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            _canceller.Cancel();

            if (_listener.IsCanceled && !_listener.IsCompleted)
                _listener.Wait();

            _listener = null;
            _canceller = null;
        }
    }

    public void Add(T item)
    {
        if (item != null)
        {
            _queue.Add(item);
        }
        else
        {
            throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
        }
    }
}

仔细的编程是唯一能够解决问题的方法。即使您取消操作,您也可能有一个未在流行时间内完成的待处理操作。这很可能是一个陷入僵局的阻塞操作。在这种情况下,您的程序实际上不会终止。

例如,如果我多次调用 CleanUp 方法或者没有先调用 Start,我就会感觉它会崩溃。

清理期间的 2 秒超时,感觉比计划的更随意,而且我实际上会尽可能确保事物正确关闭或崩溃/挂起(你永远不想让并发事物处于未知状态)。

另外,IsRunning是显式设置的,而不是从对象的状态推断出来的。

为了获得灵感,我希望您看看我最近编写的一个类似的类,它是一个生产者/消费者模式,在后台线程中工作。您可以在以下位置找到该源代码CodePlex。不过,这是为了解决一个非常具体的问题而设计的。

在这里,取消是通过查询只有工作线程识别的特定类型来解决的,从而开始关闭。这也确保我永远不会取消待处理的工作,只考虑整个工作单元。

为了稍微改善这种情况,您可以为当前工作设置一个单独的计时器,并在取消时中止或回滚未完成的工作。现在,实施一个交易类似的行为需要一些尝试和错误,因为您需要查看每个可能的极端情况并问自己,如果程序在这里崩溃会发生什么?理想情况下,所有这些代码路径都会导致可恢复或已知状态,您可以从中恢复工作。但正如我想您已经猜到的那样,这将需要仔细的编程和大量的测试。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

长时间运行的任务被取消后如何正确清理 的相关文章

随机推荐

  • 将 varchar 转换为数据类型 numeric 时出现算术溢出错误。 ‘10’ <= 9.00

    下面是我正在使用的表结构和数据的子集 CREATE TABLE Test Val varchar 5 Type varchar 5 INSERT Test VALUES Yes Text INSERT Test VALUES 10 Int
  • Quicklook/QLPreviewController 委托方法在 iOS 10 Xcode 8 中不调用

    目前我正在 iOS10 中测试我当前的版本 我在用Xcode 8beta 6 用于测试 这里Quicklook QLPreviewController 委托方法未调用 该代码集适用于 XCode 7 和 iOS 9 3 版本 我在苹果开发者
  • Jetpack Compose 具有动态操作的 TopAppBar

    Composable fun TopAppBar title Composable gt Unit modifier Modifier Modifier navigationIcon Composable gt Unit null acti
  • 如何在python2和python3中运行程序

    我的机器上当前安装了 python 2 6 6 和 python 3 1 3 Windows Vista 64 位 我的路径变量包括两个版本的目录 如何指定要在哪个 python 中运行程序 例如 如果我想在 python 3 中运行程序
  • 静态类变量存储在内存中的什么位置?

    这是一个后续问题静态数组如何存储在Java内存中 所以C C 中的全局变量存储在内存的静态数据段中 但是 Java C 中的静态类变量又如何呢 它不能是静态数据段 因为您不知道在整个程序持续时间内将引用什么 多少个类 由于反射 这绝对不是堆
  • iOS 9 上的 GIDSignIn 白屏

    我实现了 Google 登录 并且在 iOS 8 上一切正常 但是当我在 iOS 9 上调用此行时 GIDSignIn sharedInstance signIn 我可以第一次登录 但如果我取消 下次尝试登录时 它会显示一个白色屏幕 其中是
  • r - data.table 和 testthat 包

    我正在构建一个与 data table 一起使用的包 并且应该使用包 testthat 对其进行测试 虽然从命令行调用时代码工作正常 但从测试用例调用时遇到了问题 运行测试时似乎使用了基础包中的 函数 即 data frames 的函数 我
  • 将 HTTP POST 请求重定向到 HTTPS POST 请求

    我最近刚刚将我的服务器设置为使用 SSL 证书通过 HTTPS 运行 该网站是一个图像托管服务 ShareX 的开发人员已将我的网站包含在他们的应用程序中 我的问题是 所有 HTTP 请求都会自动重定向到 HTTPS 该网站运行良好 但 S
  • HtmlAgilityPack.HtmlDocument Cookie

    这与脚本内 可能在脚本标签内 设置的cookie有关 System Windows Forms HtmlDocument执行这些脚本和cookies集 比如document cookie etc 可以通过其检索Cookies财产 我假设Ht
  • 为什么随机访问迭代器的算术运算符接受/返回 int 而不是 size_t?

    由于大多数操作都在std vector要求 返回size t 这就是我用于索引的类型 但现在我已经启用了所有编译器警告来修复一些我知道的签名 未签名转换问题 这条消息让我感到惊讶 警告 C4365 参数 从 size t 转换为 w64 i
  • Java的notify()在wait()之前运行?

    public class ThreadA public static void main String args ThreadB b new ThreadB b start synchronized b try System out pri
  • 以编程方式确定在另一个进程中加载​​哪些模块? (操作系统)

    我觉得我想做的事情非常简单 我只是不确定到底该怎么做 具体来说 我只想获取在另一个进程中加载 的模块 共享 动态库 的列表 以及获取该模块在给定进程中所在位置的起始地址 使用 GDB 获取这些信息非常简单 您只需连接到该进程 然后输入 信息
  • 如何在 Excel 中表示日期时间

    代表一个事物的最佳方式是什么DateTime在 Excel 中 我们使用同步融合基本 XlsIO将值输出到 Excel 文档 效果很好 但我不知道如何显示DateTime在一列中 当我自己直接在 Excel 中执行此操作时也不会 难道不可能
  • codeigniter 获取所有声明的路由

    如何获取codeigniter中所有声明的路由 像前一样 print r 路线 因为这是问题所在 如果客户将其用户名注册为 facebook 他将被路由到帐户 facebook login 而不是他的个人资料 如果我更改路由顺序 所有链接将
  • Common Lisp:#+nil 是什么?

    前几天 也许是昨天 我对此感到很困惑 nil读取时间条件发现于https github com billstclair defperson blob master defperson lisp L289 经过一番深入思考 我得出的结论是 这
  • 为什么从 python 游标执行 sql 查询时需要使用 3 个引号?

    我遇到过一些连接到 MySQL 数据库的 Python 程序 在代码中 我看到了查询execute 函数用 3 个引号括起来 我想知道其中的原因 我还注意到 仅在创建 插入和更新表时使用 3 个引号 而不是在选择行时使用 cursor ex
  • 如果饼图中的值为 0%,如何删除线条

    我正在制作饼图 为此 我正在使用MPAndroid图表库中 任何数据或多个数据的值可能包含 0 并且我使用饼图之外的方式显示值setYValuePosition PieDataSet ValuePosition OUTSIDE SLICE
  • 双引号之间的 Razor 代码

    在 Razor View Engine 模板中 我想要执行以下操作 我想要放置一些codehtml 的双引号之间属性 问题是我想要插入的代码片段本身包含一些双引号 a href Url Action page a 你可以很容易地看到事情是如
  • 在 Keras 中使用自定义损失函数时的批量大小问题

    我正在通过定义自定义损失函数对标准神经网络进行轻微修改 自定义损失函数不仅取决于 y true 和 y pred 还取决于训练数据 我使用描述的包装解决方案实现了它here 具体来说 我想定义一个自定义损失函数 它是标准 mse 加上输入与
  • 长时间运行的任务被取消后如何正确清理

    我创建了一个类 其目的是抽象出对队列的并发访问的控制 该类设计为在单个线程上实例化 由多个线程写入 然后从后续的单个线程中读取 我在类中生成了一个长时间运行的任务 它将执行阻塞循环 并在项目成功出列时触发一个事件 我的问题是 我是否实现了取