在 C# 中实现生产者/消费者模式

2024-02-26

我该如何实施生产者/消费者C# 中的模式使用活动和代表?使用这些设计模式时,在资源方面需要注意什么?有什么我需要注意的边缘情况吗?


我知道这个线程有点老了,但由于我有时在搜索中遇到它,我决定为那些想知道如何实现简单的通用生产者-消费者作业队列的人分享这个生产者-消费者代码。

The Job类用于以委托的形式“存储”对象的方法调用。然后在处理作业时调用委托。任何相关参数也存储在该 Job 类中。

通过这个简单的模式,可以在入队和出队过程中实现多线程。实际上这只是最简单的部分:多线程给您的代码带来了新的挑战,您稍后会注意到它们;-)

我最初在此发布了这段代码thread https://stackoverflow.com/questions/46862475/changing-thread-context-in-c-sharp-console-application/46863615#46863615.

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;

// Compiled and tested in: Visual Studio 2017, DotNET 4.6.1

namespace MyNamespace
{
    public class Program
    {
        public static void Main(string[] args)
        {
            MyApplication app = new MyApplication();
            app.Run();
        }
    }

    public class MyApplication
    {
        private BlockingCollection<Job> JobQueue = new BlockingCollection<Job>();
        private CancellationTokenSource JobCancellationTokenSource = new CancellationTokenSource();
        private CancellationToken JobCancellationToken;
        private Timer Timer;
        private Thread UserInputThread;



        public void Run()
        {
            // Give a name to the main thread:
            Thread.CurrentThread.Name = "Main";

            // Fires a Timer thread:
            Timer = new Timer(new TimerCallback(TimerCallback), null, 1000, 2000);

            // Fires a thread to read user inputs:
            UserInputThread = new Thread(new ThreadStart(ReadUserInputs))
            {
                Name = "UserInputs",
                IsBackground = true
            };
            UserInputThread.Start();

            // Prepares a token to cancel the job queue:
            JobCancellationToken = JobCancellationTokenSource.Token;

            // Start processing jobs:
            ProcessJobs();

            // Clean up:
            JobQueue.Dispose();
            Timer.Dispose();
            UserInputThread.Abort();

            Console.WriteLine("Done.");
        }



        private void ProcessJobs()
        {
            try
            {
                // Checks if the blocking collection is still up for dequeueing:
                while (!JobQueue.IsCompleted)
                {
                    // The following line blocks the thread until a job is available or throws an exception in case the token is cancelled:
                    JobQueue.Take(JobCancellationToken).Run();
                }
            }
            catch { }
        }



        private void ReadUserInputs()
        {
            // User input thread is running here.
            ConsoleKey key = ConsoleKey.Enter;

            // Reads user inputs and queue them for processing until the escape key is pressed:
            while ((key = Console.ReadKey(true).Key) != ConsoleKey.Escape)
            {
                Job userInputJob = new Job("UserInput", this, new Action<ConsoleKey>(ProcessUserInputs), key);
                JobQueue.Add(userInputJob);
            }
            // Stops processing the JobQueue:
            JobCancellationTokenSource.Cancel();
        }

        private void ProcessUserInputs(ConsoleKey key)
        {
            // Main thread is running here.
            Console.WriteLine($"You just typed '{key}'. (Thread: {Thread.CurrentThread.Name})");
        }



        private void TimerCallback(object param)
        {
            // Timer thread is running here.
            Job job = new Job("TimerJob", this, new Action<string>(ProcessTimer), "A job from timer callback was processed.");
            JobQueue.TryAdd(job); // Just enqueues the job for later processing
        }

        private void ProcessTimer(string message)
        {
            // Main thread is running here.
            Console.WriteLine($"{message} (Thread: {Thread.CurrentThread.Name})");
        }
    }



    /// <summary>
    /// The Job class wraps an object's method call, with or without arguments. This method is called later, during the Job execution.
    /// </summary>
    public class Job
    {
        public string Name { get; }
        private object TargetObject;
        private Delegate TargetMethod;
        private object[] Arguments;

        public Job(string name, object obj, Delegate method, params object[] args)
        {
            Name = name;
            TargetObject = obj;
            TargetMethod = method;
            Arguments = args;
        }

        public void Run()
        {
            try
            {
                TargetMethod.Method.Invoke(TargetObject, Arguments);
            }
            catch(Exception ex)
            {
                Debug.WriteLine($"Unexpected error running job '{Name}': {ex}");
            }
        }

    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 C# 中实现生产者/消费者模式 的相关文章

随机推荐

  • 输入二进制而不是十六进制[重复]

    这个问题在这里已经有答案了 可能的重复 我可以在 C 或 C 中使用二进制文字吗 https stackoverflow com questions 2611764 can i use a binary literal in c or c
  • Gradle 和多项目结构

    我试图了解应该如何进行以下项目设置 Top Android Project Project 1 Pure Java Modules Module A1 Module B1 Module Z1 Project 2 Android Librar
  • 如何在rails中后台运行rake任务

    这是我的命令 bundle exec rake resque work QUEUE trace 我想在我的服务器上作为后台进程运行此命令 请帮我 我经常使用的一个方法是 nohup bundle exec rake resque work
  • 如何创建 gzip 压缩的 HTTP::Response?

    我需要使用压缩数据创建 HTTP Response 我该如何对内容进行压缩 我是否只需添加适当的标头并使用 Compress Zlib 自行压缩它 或者 LWP 模块是否提供了处理此问题的方法 这是你需要的吗 您对数据进行压缩 设置内容编码
  • 从 Python 中打开的 Excel 文件中读取

    我有一个脚本每隔几个小时从 Excel 文件中提取数据 但是 如果这些 Excel 文件之一打开 我仍然希望能够读取它 在 python openpyxl 中 当我尝试执行此操作时 出现权限错误 以下是我尝试过的 我认为我可以捕获异常并创建
  • 暂停交易是什么意思?

    如果我们使用 Propagation Requires new 那么它会挂起现有事务并创建一个新事务 那么这意味着什么suspends a transaction 暂停的交易会怎样 幕后究竟发生了什么 update 暂停的事务持有的资源会发
  • scala 列表地图与mapConserve

    我试图理解mapConserve 据说 像xs map f 但如果函数f将所有元素映射到自身 则返回xs不变 来自List http www scala lang org api current index html scala colle
  • Java 8 - 无法在数组类型 Enum[] 上调用stream() [重复]

    这个问题在这里已经有答案了 为什么我无法调用stream 关于数组类型Enum DummyEnum array DummyEnum values array stream Compile Error ENUM public enum Dum
  • 如何使用 Java 获取我的电脑中可用串行端口的列表?

    我只是运行一些代码来获取计算机上的可用端口列表 当我有 3 个空闲的 com 端口时 它返回 false 我该如何解决这个问题 我的代码 public static void main String args SerialParameter
  • JavaScript 让 Firefox 开发者工具调试视图中的块作用域

    我正在调查该机构的工作情况let 块作用域在 JavaScript 中 特别是浏览器的调试视图如何显示信息 Using let在一个for循环创建一个块作用域 其中回调function timeoutHandler 可以访问 一切都很好 I
  • 格式化复数

    对于我的一个课程中的一个项目 我们必须输出最多五位小数的数字 输出可能是一个复数 而我无法弄清楚如何输出具有五位小数的复数 对于花车我知道它只是 print 0 5f variable name 复数有类似的东西吗 您可以使用如下所示的方法
  • 使用 Gradle 调用 powershell 脚本

    我是 Gradle 新手 所以请耐心等待 我只是想调用 ps1 文件来使用 gradle 执行 我将如何设置 build gradle 文件来执行同一目录中的 ps1 文件 提前致谢 你可以使用gradleExec https docs g
  • python in 和比较的运算符优先级

    以下比较产生True gt gt gt 1 in 11 True gt gt gt 1 in 11 True True 如果使用括号 我会得到一个 TypeError gt gt gt 1 in 11 True Traceback most
  • IBM Integration 总线 mqsicreatebar 及参考

    我对在我的环境中使用 mqsicreatebar 感到有点困惑 例如 我有以下文件结构 root Libraries Library1 Apps App1 project App1 是参考图书馆1 我想运行 mqsicreatebar 使其
  • 应用程序中的各个阶段在 Spark 中并行运行吗?

    我有一个疑问 阶段如何在 Spark 应用程序中执行 程序员可以定义的阶段执行是否具有一致性 或者是否由 Spark 引擎导出 检查这张图中的实体 阶段 分区 图片来源 http alvincjin blogspot in 2014 12
  • 显示宏选项

    在 Excel VBA 中 可以使用 MacroOptions 函数定义与宏或函数相关的一些信息 通过 VBA 输入后是否可以访问此类信息 谢谢 我已经搜索了一段时间但没有发现什么很棒的东西 我发现的唯一解决方法是使用 Chip Pears
  • 通过 URL 发布到 Reddit

    是否可以通过 URL 发布 Reddit 链接 例如对于 Facebook 你可以这样做 a href Share Stackoverflow on your profile a Reddit 是否有一个可以让我共享 URL 的等效端点 有
  • python 管道中的特征选择:如何确定特征名称?

    我使用 pipeline 和 grid search 选择最佳参数 然后使用这些参数来拟合最佳管道 best pipe 然而 由于 feature selection SelectKBest 处于管道中 因此尚未对 SelectKBest
  • 如何在 Android 上动态更新 ListView [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 在 Android 上 我怎样才能L
  • 在 C# 中实现生产者/消费者模式

    我该如何实施生产者 消费者C 中的模式使用活动和代表 使用这些设计模式时 在资源方面需要注意什么 有什么我需要注意的边缘情况吗 我知道这个线程有点老了 但由于我有时在搜索中遇到它 我决定为那些想知道如何实现简单的通用生产者 消费者作业队列的