一般情况下,经常会遇到一个单线程程序时执行对CPU,MEMORY,IO利用率上不来,且速度慢下问题;那么,怎么解决这些问题呢?
据我个人经验来说有以下两种方式:
1、并行、多线程(Parallel、Task、ThreadPool)
2、多进程MultipleProcess
恰好工作中又一次遇到单线程程序性能低的问题,本次我主要想尝试使用ThreadPool来实现多线程,并且在实现多线程任务同步结束。
一个ManualResetEvent结合Interlocked来实现线程同步结束。
1 static void Main(string[] args)
2 {
3 using (ManualResetEvent finish = new ManualResetEvent(false))
4 {
5 int maxThreadCount = 100;
6 for (var i = 0; i < 100; i++) {
7 ThreadPool.QueueUserWorkItem((Object state)=> {
8 Console.WriteLine("task:{0}",state);
9
10 // 以原子操作的形式递减指定变量的值并存储结果。
11 if (Interlocked.Decrement(ref maxThreadCount) == 0) {
12 // 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。
13 finish.Set();
14 }
15 }, i);
16 }
17
18 // 阻止当前线程,直到当前 System.Threading.WaitHandle 收到信号。
19 finish.WaitOne();
20 }
21
22 Console.WriteLine("Complete!");
23 Console.ReadKey();
上边的代码是可行性,当系统线程数超过系统允许最大数时,线程会被在线程池中排队等待。
ManualResetEvent集合(每一个线程由集合中的唯一一个ManualResetEvent对象来实现线程的同步跟踪)结合WaitHandle.WaitAll(ManualResetEvent集合)来实现线程同步结束。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
namespace ThreadPoolTest
{
class MyTask
{
private ManualResetEvent finish = null;
public MyTask(ManualResetEvent finish)
{
this.finish = finish;
}
public void MyTaskThreadPoolCallback(Object state)
{
Console.WriteLine("task:{0}", state);
// 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。
this.finish.Set();
}
}
class Program
{
static void Main(string[] args)
{
const int maxThreadCount = 64;
ManualResetEvent[] finishItems = new ManualResetEvent[maxThreadCount];
MyTask[] myTaskItems = new MyTask[maxThreadCount]
;
for (var i = 0; i < maxThreadCount; i++)
{
finishItems[i] = new ManualResetEvent(false);
MyTask myTask = new MyTask(finishItems[i]);
myTaskItems[i] = myTask;
ThreadPool.QueueUserWorkItem(myTask.MyTaskThreadPoolCallback, i);
}
// 等待指定数组中的所有元素都收到信号。
WaitHandle.WaitAll(finishItems);
Console.WriteLine("Complete!");
Console.ReadKey();
}
}
}
尽管这种想法不错,但是存在一些问题:比如ManualResetEvent集合数量不允许超过系统允许的最大数量,我的计算机系统允许的最大数量是64,当我把配置超过64时(const int maxThreadCount = 65;),就会抛出异常。
可是一般情况下遇到这种业务的情况下,只要修改多线程,必然会遇到某个对象不允许被多个线程操作的问题。
比如:
1、多个线程同时向一个文件中写入内容,这种情况一般使用锁来包成被访问对象的安全性。比如:互斥锁(lock、Mutex)、读写锁(ReadWriteLock)、Monitor、Semaphore(信号灯)、Interlocked(内存共享)等。
2、多个线程同时修改一个非线程安全集合对象(List,Collection,Dictionary,Bag,Queue,Stack,ArrayList,Array,HashTable等)时,往往会抛出异常。针对这种情况,需要使用命名空间System.Collections.Concurrent.*下支持线程安全的集合、字典、队列、栈等对象来替代。
我们需要对一个多行文本文件进行解析,根据具体地址解析其中的经纬度信息。如果解析过程中解析失败的行,需要记录到一个_error.txt;解析成功的记录行,记录到_result.txt。使用单线程分析过程中已经遇到了性能低问题,需求解决方案是使用ThreadPool技术。
1 private static int maxThreadCount = 0;
2 private static int fakeMaxThreadCount = int.MaxValue;
3 private static ManualResetEvent finish = new ManualResetEvent(false);
4 private static object errorLocker = new object();
5 private static object resultLocker = new object();
6 private static object maxThreadCountLcker = new object();
7
8 public void ParserFile(string filePath)
9 {
10 using (StreamWriter writerError = new StreamWriter(filePath + "_error"))
11 {
12 using (StreamWriter writerResult = new StreamWriter(filePath + "_result"))
13 {
14 finish = new ManualResetEvent(false);
15 using (StreamReader reader = new StreamReader(filePath))
16 {
17 string line = reader.ReadLine();
18 while (line != null)
19 {
20 maxThreadCount++;
21 ThreadPool.QueueUserWorkItem(DoWork, new object[] { line, writerError, writerResult
22 });
23
24 line = reader.ReadLine();
25 }
26 }
27
28 maxThreadCount++;
29 lock (maxThreadCountLcker)
30 {
31 fakeMaxThreadCount = maxThreadCount;
32 }
33
34 ThreadPool.QueueUserWorkItem(DoWork, new object[] { });
35
36 finish.WaitOne();
37 finish.Close();
38 finish.Dispose();
39 }
40 }
41 }
42
43
44
45 private void DoWork(object state)
46 {
47 object[] objectItem = state as object[];
48 if (objectItem.Length != 3)
49 {
50 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
51 {
52 finish.Set();
53 }
54 return;
55 }
56 string line = objectItem[0].ToString();
57 StreamWriter writerError = objectItem[1] as StreamWriter;
58 StreamWriter writerResult = objectItem[2] as StreamWriter;
59
60 try
61 {
62 string[] fields = line.Split(new char[] { '|' });
63
64 string imsi = fields[0];
65 string city = fields[1];
66 string county = fields[2];
67 string address = fields[3];
68
69 // http://restapi.amap.com/v3/geocode/geo?key=7de8697669288fc848e12a08f58d995e&s=rsv3&city=**市&address=**省**市**区**路23号
70 string uri = " http://restapi.amap.com/v3/geocode/geo";
71 string parameter = string.Format("key={0}&s={1}&city={2}&address={3}", "7de8697669288fc848e12a08f58d995e", "rsv3", "**(市名称)", address);
72
73 // {"status":"1","info":"OK","infocode":"10000","count":"1","geocodes":[{"formatted_address":"***省**市**区***路|23号","province":"***","citycode":"***","city":"***市","district":"***区","township":[],"neighborhood":{"name":[],"type":[]},"building":{"name":[],"type":[]},"adcode":"330105","street":[],"number":[],"location":"120.151367,30.362293","level":"门牌号"}]}
74 string result = GetRequesetContext(uri, parameter);
75 if (string.IsNullOrEmpty(result) || result.IndexOf("location") == -1)
76 {
77 lock (errorLocker)
78 {
79 writerError.WriteLine(result);
80 }
81 }
82 else
83 {
84 int indexCount = 0;
85 List<string> lnglatItems = new List<string>();
86 foreach (string resultItem in result.Split(new string[] { "\",\"", ",\"" }, StringSplitOptions.RemoveEmptyEntries))
87 {
88 if (resultItem.IndexOf("location") != -1)
89 {
90 indexCount++;
91 lnglatItems.Add(resultItem.Split(new char[] { ':' })[1].Replace("\"", string.Empty));
92 }
93 }
94 if (indexCount == 1)
95 {
96 lock (resultLocker)
97 {
98 writerResult.WriteLine(address + "|" + lnglatItems[0] + "|" + imsi);
99 }
100 }
101 else
102 {
103 lock (resultLocker)
104 {
105 writerError.WriteLine(address + "|" + string.Join(",", lnglatItems) + "|" + imsi);
106 }
107 }
108 }
109 }
110 catch (Exception ex)
111 {
112 logger.Error("{0}\r\n{1}", ex.Message, ex.StackTrace);
113 lock (errorLocker)
114 {
115 writerError.WriteLine(line);
116 }
117 }
118 finally
119 {
120 lock (maxThreadCountLcker)
121 {
122 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
123 {
124 finish.Set();
125 }
126 }
127 }
128 }
备注:
关于ThreadPool线程池内最大线程控制函数:SetMaxThreads 设置可以同时处于活动状态的线程池的请求数目。 所有大于此数目的请求将保持排队状态,直到线程池线程变为可用。
[SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)]
public static bool SetMaxThreads(
int workerThreads,
int completionPortThreads
)
workerThreads:线程池中辅助线程的最大数目。
completionPortThreads:线程池中异步 I/O 线程的最大数目。
但是,需要注意事项:
不能将辅助线程的数目或 I/O 完成线程的数目设置为小于计算机的处理器数目。
如果承载了公共语言运行时,例如由 Internet 信息服务 (IIS) 或 SQL Server 承载,主机可能会限制或禁止更改线程池大小。
更改线程池中的最大线程数时需谨慎。 虽然这类更改可能对您的代码有益,但对您使用的代码库可能会有不利的影响。
将线程池大小设置得太大可能导致性能问题。 如果同时执行的线程太多,任务切换开销就成为影响性能的一个主要因素。