c#:ThreadPool实现并行分析,并实现线程同步结束

2023-11-19

  • 背景:

一般情况下,经常会遇到一个单线程程序时执行对CPU,MEMORY,IO利用率上不来,且速度慢下问题;那么,怎么解决这些问题呢?

据我个人经验来说有以下两种方式:

1、并行、多线程(Parallel、Task、ThreadPool)

2、多进程MultipleProcess

恰好工作中又一次遇到单线程程序性能低的问题,本次我主要想尝试使用ThreadPool来实现多线程,并且在实现多线程任务同步结束。

  • ThreadPool线程同步结束示例一:

一个ManualResetEvent结合Interlocked来实现线程同步结束。

 1  static void Main(string[] args)
 2         {
 3             using (ManualResetEvent finish = new ManualResetEvent(false))
 4             {
 5                 int maxThreadCount = 100;
 6                 for (var i = 0; i < 100; i++) {
 7                     ThreadPool.QueueUserWorkItem((Object state)=> {
 8                         Console.WriteLine("task:{0}",state);
 9 
10                         // 以原子操作的形式递减指定变量的值并存储结果。
11                         if (Interlocked.Decrement(ref maxThreadCount) == 0) {
12                             // 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。
13                             finish.Set();
14                         }                        
15                     }, i);
16                 }
17 
18                 // 阻止当前线程,直到当前 System.Threading.WaitHandle 收到信号。
19                 finish.WaitOne();
20             }
21 
22             Console.WriteLine("Complete!");
23             Console.ReadKey();

上边的代码是可行性,当系统线程数超过系统允许最大数时,线程会被在线程池中排队等待。

  • ThreadPool线程同步结束示例二: 

ManualResetEvent集合(每一个线程由集合中的唯一一个ManualResetEvent对象来实现线程的同步跟踪)结合WaitHandle.WaitAll(ManualResetEvent集合)来实现线程同步结束。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadPoolTest
{
    class MyTask
    {
        private ManualResetEvent finish = null;
        public MyTask(ManualResetEvent finish)
        {
            this.finish = finish;
        }

        public void MyTaskThreadPoolCallback(Object state)
        {
            Console.WriteLine("task:{0}", state);

            // 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。
            this.finish.Set();
        }
    }


    class Program
    {
        static void Main(string[] args)
        {
            const int maxThreadCount = 64;
            ManualResetEvent[] finishItems = new ManualResetEvent[maxThreadCount];
            MyTask[] myTaskItems = new MyTask[maxThreadCount]
                ;
            for (var i = 0; i < maxThreadCount; i++)
            {
                finishItems[i] = new ManualResetEvent(false);

                MyTask myTask = new MyTask(finishItems[i]);
                myTaskItems[i] = myTask;

                ThreadPool.QueueUserWorkItem(myTask.MyTaskThreadPoolCallback, i);
            }

            // 等待指定数组中的所有元素都收到信号。
            WaitHandle.WaitAll(finishItems);

            Console.WriteLine("Complete!");
            Console.ReadKey();
        }


    }
}

尽管这种想法不错,但是存在一些问题:比如ManualResetEvent集合数量不允许超过系统允许的最大数量,我的计算机系统允许的最大数量是64,当我把配置超过64时(const int maxThreadCount = 65;),就会抛出异常。

 

 

  • 实现多线程时,需要注意事项:

可是一般情况下遇到这种业务的情况下,只要修改多线程,必然会遇到某个对象不允许被多个线程操作的问题。

比如:

1、多个线程同时向一个文件中写入内容,这种情况一般使用锁来包成被访问对象的安全性。比如:互斥锁(lock、Mutex)、读写锁(ReadWriteLock)、Monitor、Semaphore(信号灯)、Interlocked(内存共享)等。

2、多个线程同时修改一个非线程安全集合对象(List,Collection,Dictionary,Bag,Queue,Stack,ArrayList,Array,HashTable等)时,往往会抛出异常。针对这种情况,需要使用命名空间System.Collections.Concurrent.*下支持线程安全的集合、字典、队列、栈等对象来替代。

  • 业务场景:

我们需要对一个多行文本文件进行解析,根据具体地址解析其中的经纬度信息。如果解析过程中解析失败的行,需要记录到一个_error.txt;解析成功的记录行,记录到_result.txt。使用单线程分析过程中已经遇到了性能低问题,需求解决方案是使用ThreadPool技术。

  • 业务实现:
  1         private static int maxThreadCount = 0;
  2         private static int fakeMaxThreadCount = int.MaxValue;
  3         private static ManualResetEvent finish = new ManualResetEvent(false);
  4         private static object errorLocker = new object();
  5         private static object resultLocker = new object();
  6         private static object maxThreadCountLcker = new object();
  7 
  8         public void ParserFile(string filePath)
  9         {
 10             using (StreamWriter writerError = new StreamWriter(filePath + "_error"))
 11             {
 12                 using (StreamWriter writerResult = new StreamWriter(filePath + "_result"))
 13                 {
 14                     finish = new ManualResetEvent(false);
 15                     using (StreamReader reader = new StreamReader(filePath))
 16                     {
 17                         string line = reader.ReadLine();
 18                         while (line != null)
 19                         {
 20                             maxThreadCount++;
 21                             ThreadPool.QueueUserWorkItem(DoWork, new object[] { line, writerError, writerResult
 22 });
 23 
 24                             line = reader.ReadLine();
 25                         }
 26                     }
 27 
 28                     maxThreadCount++;
 29                     lock (maxThreadCountLcker)
 30                     {
 31                         fakeMaxThreadCount = maxThreadCount;
 32                     }
 33 
 34                     ThreadPool.QueueUserWorkItem(DoWork, new object[] { });
 35 
 36                     finish.WaitOne();
 37                     finish.Close();
 38                     finish.Dispose();
 39                 }
 40             }
 41         }
 42 
 43 
 44 
 45         private void DoWork(object state)
 46         {
 47             object[] objectItem = state as object[];
 48             if (objectItem.Length != 3)
 49             {
 50                 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
 51                 {
 52                     finish.Set();
 53                 }
 54                 return;
 55             }
 56             string line = objectItem[0].ToString();
 57             StreamWriter writerError = objectItem[1] as StreamWriter;
 58             StreamWriter writerResult = objectItem[2] as StreamWriter;
 59 
 60             try
 61             {
 62                 string[] fields = line.Split(new char[] { '|' });
 63 
 64                 string imsi = fields[0];
 65                 string city = fields[1];
 66                 string county = fields[2];
 67                 string address = fields[3];
 68 
 69                 // http://restapi.amap.com/v3/geocode/geo?key=7de8697669288fc848e12a08f58d995e&s=rsv3&city=**市&address=**省**市**区**路23号
 70                 string uri = " http://restapi.amap.com/v3/geocode/geo";
 71                 string parameter = string.Format("key={0}&s={1}&city={2}&address={3}", "7de8697669288fc848e12a08f58d995e", "rsv3", "**(市名称)", address);
 72 
 73                 // {"status":"1","info":"OK","infocode":"10000","count":"1","geocodes":[{"formatted_address":"***省**市**区***路|23号","province":"***","citycode":"***","city":"***市","district":"***区","township":[],"neighborhood":{"name":[],"type":[]},"building":{"name":[],"type":[]},"adcode":"330105","street":[],"number":[],"location":"120.151367,30.362293","level":"门牌号"}]}
 74                 string result = GetRequesetContext(uri, parameter);
 75                 if (string.IsNullOrEmpty(result) || result.IndexOf("location") == -1)
 76                 {
 77                     lock (errorLocker)
 78                     {
 79                         writerError.WriteLine(result);
 80                     }
 81                 }
 82                 else
 83                 {
 84                     int indexCount = 0;
 85                     List<string> lnglatItems = new List<string>();
 86                     foreach (string resultItem in result.Split(new string[] { "\",\"", ",\"" }, StringSplitOptions.RemoveEmptyEntries))
 87                     {
 88                         if (resultItem.IndexOf("location") != -1)
 89                         {
 90                             indexCount++;
 91                             lnglatItems.Add(resultItem.Split(new char[] { ':' })[1].Replace("\"", string.Empty));
 92                         }
 93                     }
 94                     if (indexCount == 1)
 95                     {
 96                         lock (resultLocker)
 97                         {
 98                             writerResult.WriteLine(address + "|" + lnglatItems[0] + "|" + imsi);
 99                         }
100                     }
101                     else
102                     {
103                         lock (resultLocker)
104                         {                            
105                             writerError.WriteLine(address + "|" + string.Join(",", lnglatItems) + "|" + imsi);
106                         }
107                     }
108                 }
109             }
110             catch (Exception ex)
111             {
112                 logger.Error("{0}\r\n{1}", ex.Message, ex.StackTrace);
113                 lock (errorLocker)
114                 {
115                     writerError.WriteLine(line);
116                 }
117             }
118             finally
119             {
120                 lock (maxThreadCountLcker)
121                 {
122                     if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
123                     {
124                         finish.Set();
125                     }
126                 }
127             }
128         }

 备注:

关于ThreadPool线程池内最大线程控制函数:SetMaxThreads 设置可以同时处于活动状态的线程池的请求数目。 所有大于此数目的请求将保持排队状态,直到线程池线程变为可用。

[SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)]
public static bool SetMaxThreads(
    int workerThreads,
    int completionPortThreads
)

workerThreads:线程池中辅助线程的最大数目。

completionPortThreads:线程池中异步 I/O 线程的最大数目。

但是,需要注意事项:

不能将辅助线程的数目或 I/O 完成线程的数目设置为小于计算机的处理器数目。

如果承载了公共语言运行时,例如由 Internet 信息服务 (IIS) 或 SQL Server 承载,主机可能会限制或禁止更改线程池大小。

更改线程池中的最大线程数时需谨慎。 虽然这类更改可能对您的代码有益,但对您使用的代码库可能会有不利的影响。

将线程池大小设置得太大可能导致性能问题。 如果同时执行的线程太多,任务切换开销就成为影响性能的一个主要因素。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

c#:ThreadPool实现并行分析,并实现线程同步结束 的相关文章

  • 删除字符串 C 的第一个字符

    我试图删除字符串的第一个字符并保留其余部分 我当前的代码无法编译 我对如何修复它感到困惑 My code char newStr char charBuffer int len strlen charBuffer int i 1 char
  • VSTS 构建失败/发布无法在 bin 文件夹中找到 roslyn\csc.exe

    我们有一个网站项目 安装了以下 nuget 软件包 Microsoft CodeDom Providers DotNetCompilerPlatform 1 0 8 Microsoft Net Compilers 2 4 0 The web
  • 如何在 MFC 中调整对话框大小时移动控件?

    我已经在 MFC 中创建了对话框视图 从下图中可以清楚地看到 如滑块控件和编辑框等 当我调整对话框大小时 这些控件不会移动 在此输入图像描述 https i stack imgur com 7OxAK jpg 我想移动控件以适应对话框 但不
  • 如何“杀死”Pthread?

    我正在学习 Pthreads 并且想知道杀死这样一个对象的最佳方法是什么 在寻找类似的问题后 我无法找到 明确 的答案 但请随时向我指出任何相关问题 我正在使用一个小型客户端服务器应用程序 其中服务器主线程正在侦听套接字上的客户端连接 每次
  • C++ 非类型参数包扩展

    我正在编写由单一类型参数化的模板函数 并且具有可变数量的相同类型 而不是不同类型 的参数 它应该检查第一个值是否在其余值中 我想这样写 include
  • 返回指向 std::vector 中的对象的 a

    我有一个关于返回对向量元素的引用的非常基本的问题 有一个向量vec存储类的实例Foo 我想访问这个向量中的一个元素 不想使用向量索引 我应该如何编码该方法getFoo here include
  • 'goto *foo' 其中 foo 不是指针。这是什么?

    我正在玩标签作为值 https gcc gnu org onlinedocs gcc Labels as Values html并最终得到这段代码 int foo 0 goto foo 我的 C C 经验告诉我 foo means dere
  • 如何在 C++ 中对静态缓冲区执行字符串格式化?

    我正在处理一段对性能要求非常高的代码 我需要执行一些格式化的字符串操作 但我试图避免内存分配 甚至是内部库的内存分配 在过去 我会做类似以下的事情 假设是 C 11 constexpr int BUFFER SIZE 200 char bu
  • 正则表达式删除某些字符周围不需要的空格

    我正在尝试从 JavaScript 文件中删除一些不需要的空格 并在将文件发送到客户端之前使用 C 和 Regex 组合文件 我有一个JavascriptHandler处理 js 文件 效果很好 这是我用来 打包 JavaScript 的函
  • 如何防止字符串被截留

    我的理解 可能是错误的 是 在 C 中 当你创建一个字符串时 它会被实习到 实习生池 中 这保留了对字符串的引用 以便多个相同的字符串可以共享操作内存 但是 我正在处理很多很可能是唯一的字符串 一旦完成每个字符串 我需要将它们从操作内存中完
  • 替换 JSON 中的转义字符

    我想用空格替换 JSON 字符串中的 字符 我怎样才能做到这一点 我发现从 JSON 字符串中删除所有转义字符的最简单 最好的方法是将字符串传递到正则表达式 Unescape 方法 此方法返回一个没有转义字符的新字符串 甚至删除了 n t
  • 如何用C++解析复杂的字符串?

    我试图弄清楚如何使用 解析这个字符串sstream 和C 其格式为 string int int 我需要能够将包含 IP 地址的字符串的第一部分分配给 std string 以下是该字符串的示例 std string 127 0 0 1 1
  • C 的“char”使用什么字符集? [复制]

    这个问题在这里已经有答案了 简单的问题 我最近开始用 C 编程 有一个简单的问题 C 编程语言在其 char 类型中使用什么字符集 例如 ASCII 还是取决于软件 操作系统 char 本质上是 1 个字节 主要在所有操作系统上 所以默认情
  • System.diagnostics.process 进程在托管后无法在 IIS 上运行?

    我正在尝试从网络应用程序安装 exe 当我在本地运行应用程序 从 asp 开发服务器 时 它安装正确 但当我托管在 IIS 上时 它不起作用 我在asp net页面的Page load方法上编写了这段代码 想要在客户端计算机上安装Test
  • C 中的 N 依赖注入 - 比链接器定义的数组更好的方法?

    Given a 库模块 在下文中称为Runner 它作为可重复使用的组件 无需重新编译 即静态链接库 中应用程序分区架构的 而不是主分区 请注意 它仅包含main 出于演示目的 Given a set 顺序无关 调用的其他模块 对象Call
  • 需要使用 openssl 加密和解密文件的示例 C 代码

    我正在用 Linux C 编写代码 我需要使用以下命令来加密和解密文件 openssl 目前 我使用系统命令 des3 e nosalt k 0123456789012345 in inp file out out file 进行加密 使用
  • win32 API 和 .NET 框架之间的选择

    我必须开发一个适用于 Windows 的应用程序 该应用程序将能够通过网络摄像头识别手势来控制鼠标 我将使用 vc 2008 进行开发 但我很困惑是使用 NET 框架还是核心 win32 API 性能对于我的应用程序非常重要 根据 Ivor
  • 在类中使用 std::chrono::high_resolution_clock 播种 std::mt19937 的正确方法是什么?

    首先 大家好 这是我在这里提出的第一个问题 所以我希望我没有搞砸 在写这篇文章之前我用谷歌搜索了很多 我对编码 C 很陌生 我正在自学 考虑到有人告诉我 只为任何随机引擎播种一次是一个很好的做法 我在这里可能是错的 什么是正确 最佳 更有效
  • 致命错误 C1001:编译器中发生内部错误(编译器文件“msc1.cpp”,第 1325 行)

    当我编译代码时 错误指向以下类 该错误在两行上突出显示 如下所示 tm validFrom tm validUntil struct t SslCertData final struct t Contact TCHAR Organizati
  • 具有多种类型的 C# 泛型类型推断

    我有以下通用方法 用于将一种类型的输入对象序列化为超类型 如下所示 public string SerialiseAs

随机推荐

  • Pycharm官网下载安装

    下载链接 pycharm官网 https www jetbrains com pycharm 然后来到这个界面 点击Download 下载按钮 然后点击开源版本 Community 下载安装就好了 接下来就创建项目 点击Create 这样就
  • FISCO BCOS 2.0新特性解读

    FISCO BCOS是完全开源的联盟区块链底层技术平台 由金融区块链合作联盟 深圳 简称金链盟 成立开源工作组通力打造 开源工作组成员包括博彦科技 华为 深证通 神州数码 四方精创 腾讯 微众银行 亦笔科技和越秀金科等金链盟成员机构 代码仓
  • Nacos、ZooKeeper和Dubbo的区别

    Nacos ZooKeeper和Dubbo是三个不同的分布式系统组件 它们之间有以下几点区别 功能定位 Nacos主要提供服务发现 配置管理和服务治理等功能 而ZooKeeper主要是分布式协调服务 提供了分布式锁 分布式队列等原语 Dub
  • 本地部署LLaMA-中文LoRA部署详细说明

    在Ubuntu18 04 部署中文LLaMA模型 环境准备 硬件环境 AMD 5950X 128GB RAM RTX 3090 24G VRAM 操作系统 Ubuntu 18 04 编译环境 可选 llama cpp 编译 cd llama
  • GoJS学习

    简介 GoJS是一个可视化JavaScript库 用于浏览器中创建交互图形 比如流程图 树图 关系图 力导图等等 GoJS不依赖于任何JS库或框架 例如bootstrap jquery等 可与任何HTML或JS框架配合工作 甚至可以不用框架
  • Cuda 代码中的 函数前缀 device global host 使用

    众所周知 CUDA并行可以使代码加速很多倍 其文件类型为 cu 结尾 在编写cu 文件时 常用的函数前缀关键字有 device global host host C或者C 中相同 是由CPU调用 由CPU执行的函数 global 表示一个内
  • 为啥国内互联网公司都用centos而不是ubuntu?

    一直以来都很好奇ubuntu和centos有啥区别 上学时接触的都是ubuntu 自己每次装virtual box的时候都会下个ubuntu 但是公司的服务器上装的都是centos 今天查了下知乎网友的精彩回答 呵呵 简单总结下主要有几个原
  • 解释执行与编译执行语言有什么区别?

    一 主体不同 1 编译执行 由编译程序将目标代码一次性编译成目标程序 再由机器运行目标程序 2 解释执行 将源语言直接作为源程序输入 解释执行 解释一句后就提交计算机执行一句 并不形成目标程序 二 优势不同 1 编译执行 相比解释执行编译执
  • 常用的偏微分方程

    偏微分方程通常包含两个以上的自变量 若自变量同时间相关 或者无关 称其为发展型 或者稳态 的 下面 我们罗列出一些典型的偏微分方程 如 热传导方程 一阶双曲守恒律方程 二阶波动方程 椭圆型偏微分方程等 抛物型偏微分方程通常刻画 个物理系统的
  • 前端学科面试题大全

    作用域和值类型引用类型的传递 变量作用域 作用域变量访问区域 变量值存在栈中 变量赋值相当于值赋值 值传递与引用传递有哪些区别 函数内部 变量会先声明 形式参数变量声明提升 整个函数体有var声明的变量 如果没有访问全局定义的num2 函数
  • 服务器环境初始化配置

    工程实践经验积累 服务器环境初始化配置 1 新建环境 新建环境 是为了使自己的程序在一个相对独立的环境中运行 不影响服务器上其他用户 并不受其他用户影响 新建环境的语句为 conda create n your env name pytho
  • 【Unity Shader】屏幕后处理1.0:调整亮度/饱和度/对比度

    1 Unity中实现屏幕特效的基本步骤 什么叫屏幕后处理 Screen post processing effects 渲染完整个场景得到屏幕图像后对图像进行一系列操作 实现各种屏幕特效 这一步我们可以添加很多例如景深 Depth of F
  • session销毁

    session invalidate session invalidate的销毁是把这个session所带的用户彻底的销毁 这个session跟用户已经紧密联合在一起 所以就一起销毁了 这样就算换了个session 也是登陆不了的 以前我的
  • 设计模式之享元模式

    享元模式 就是共享技术 对于系统中存在大量相同的对象 把他们抽取成一个对象放在缓存中进行使用 这样可以大大节省系统资源 例如 围棋棋盘上有两种棋子 一个是黑子 一个是白子 如果在下棋的时候每下一个棋子就要new一个棋子对象 那么就会有大量的
  • C#比较两个list集合,两集合同时存在或A集合存在B集合中无

    using System using System Collections Generic using System Linq using System Text using System Threading using System Th
  • iOS灵动岛【电商秒杀】开发实践

    一 基本概述 名词基础知识 苹果在 iPhone 14 Pro 系列中增加一个灵动岛 主要目的是隐藏挖孔造型的高端 感叹号屏 通过动画的视觉差异 用户找不到原来的挖孔屏 灵动岛是一种巧妙的设计 模糊了软件和硬件之间的界限 它可以在锁屏的情况
  • Python: 转换文本编码

    最近在做周报的时候 需要把csv文本中的数据提取出来制作表格后生产图表 在获取csv文本内容的时候 基本上都是用with open filename encoding UTF 8 as f 来打开csv文本 但是实际使用过程中发现有些csv
  • python网络爬虫实战——实时抓取西刺免费代理ip

    参考网上高手示例程序 利用了多线程技术 Python版本为2 7 coding utf8 import urllib2 import re import threading import time rawProxyList checkedP
  • Git切换分支报错:error: you need to resolve your current index first 以及needs merge

    当想从子分支切换到dev分支时git checkout dev 报错 error you need to resolve your current index first xxx java needs merge xxx xml needs
  • c#:ThreadPool实现并行分析,并实现线程同步结束

    背景 一般情况下 经常会遇到一个单线程程序时执行对CPU MEMORY IO利用率上不来 且速度慢下问题 那么 怎么解决这些问题呢 据我个人经验来说有以下两种方式 1 并行 多线程 Parallel Task ThreadPool 2 多进