IO密集型和CPU密集型程序-概念与实现

2023-11-16


欢迎关注笔者的微信公众号


概念

在计算机科学中,有两种不同类型的程序:IO 密集型和 CPU 密集型。这两种程序的主要差别在于它们在执行任务时瓶颈所在的地方。

  • IO 密集型:这类程序主要通过读写磁盘文件、网络通信等外部设备来完成任务,因此它们大多数时间都在等待外部设备的响应。这些程序在处理等待时间方面效率较低,但对于存储和传输数据方面效率较高。
  • CPU 密集型:这类程序主要通过使用CPU完成任务,因此它们大多数时间都在计算。这些程序在处理计算任务方面效率较高,但对于读写磁盘文件、网络通信等外部设备的响应方面效率较低。

需要注意的是,大多数程序都是一种折衷的方式,即同时存在 IO 密集型和 CPU 密集型的特征。因此,在设计程序时需要适当考虑两者的平衡,以确保程序的效率和可用性。

如何设计 IO 密集型程序?

在设计 IO 密集型程序时,需要考虑如何提高程序的并发性和线程安全性,以最大化利用外部设备的性能。下面是一些建议:

  • 使用线程池:可以使用线程池技术来提高程序的并发性。线程池可以重复利用线程,从而减少线程创建和销毁的开销。
  • 采用异步 I/O:异步 I/O 技术可以有效提高程序的线程安全性。当程序读写外部设备时,异步 I/O 会立即返回,而不是等待操作完成,从而使得线程可以继续执行其他任务。
  • 使用缓存:缓存可以大大提高程序的性能,尤其是在处理频繁读写的数据时。通过将数据读入内存缓存,可以减少对磁盘的访问,从而提高程序的速度。
  • 使用预读:预读可以提前读取数据,从而减少后续读取操作的时间。当程序读取大量数据时,可以考虑使用预读技术。

下面是一个使用 Java 实现的简单 IO 密集型程序的例子:

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class IoIntensiveProgram {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        File file = new File("data.txt");
        List<String> lines = Files.readAllLines(Paths.get(file.getAbsolutePath()), StandardCharsets.UTF_8);
        for (String line : lines) {
            executorService.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    processLine(line);
                    return null;
                }
            });
        }
        executorService.shutdown();
    }
    private static void processLine(String line) throws IOException {
        // ...
    }
}

如果把上面的例子改成 CPU 密集型程序,只需要把 processLine 函数内部改成大量的计算操作,就可以得到一个 CPU 密集型程序。

下面是一个使用 Java 实现的简单 CPU 密集型程序的例子:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CpuIntensiveProgram {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10000; i++) {
            executorService.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    processData();
                    return null;
                }
            });
        }
        executorService.shutdown();
    }

    private static void processData() {
        double result = 0;
        for (int i = 0; i < 1000000; i++) {
            result += Math.sin(i);
        }
    }
}

当然,上面只是一个简单的例子,实际上你可以根据你的需求来实现 CPU 密集型程序。

总的来说,如果你的程序更多地依赖于 I/O 操作,那么应该使用 IO 密集型程序;如果你的程序更多地依赖于 CPU 计算,那么应该使用 CPU 密集型程序。希望以上内容能帮助你更好地设计你的程序。

下面分享一些笔者最近工作中实际遇到的问题与经验。

起因

笔者最近的一项工作是对大批数据进行分析并将分析后的数据生成离线图表文件。主要存在以下问题和难点:

  • 数据包含近四百辆车过去近三年的历史数据,总的数据量很大,总计有30多G的Excel
  • 由于单个Excel数据量有限,所以每辆车的数据拆分成多个Excel,分析前需要先将数据进行整合
  • 具体分析要求相关技术人员已经给出,但是仍涉及一些计算,例如求电压差,计算电池SOC等指标
  • 每辆车需要进行多维度分析,因此最终需要生成多个离线图表文件
  • 数据可能存在一些缺失值和异常值,需要进行判断和筛选过滤
  • 数据的体量和时间跨度都很大,分析人员要求可以按时间范围筛选和查看数据
  • 时间要求比较紧急,因为需要分析事故原因和追查隐患,必须尽快完成

首先在技术选型上笔者主要是python + plotly + numpy + pandasplotly可以生成动态的离线图表文件,pandaspython生态最常用的数据分析工具。

最开始采取的是最朴素的方案,直接遍历每辆车的数据,挨个生成文件。但是发现每辆车的处理时间都超过10分钟有的甚至需要半小时,时间上不可接受。最终的方案就是采用多线程同时处理多个文件。但如何确定最佳的线程数其实也踩了很多坑。

线程数如何设计?

如何针对不同类型的程序确定线程数网上有很多介绍,概括来说就是:

  • 对于CPU密集型程序,线程数公式:

t h r e a d _ n u m = c p u _ n u m + 1 thread\_num = cpu\_num+1 thread_num=cpu_num+1,+1是为了预防有个线程被阻塞,cpu可以调用其他线程。

  • 对于IO密集型程序,线程数的确定有两种说法:

    最佳线程数 = (1/CPU利用率) = 1 + (I/O耗时/CPU耗时);(一般为2*CPU核心数)

    最佳线程数 = CPU核心数/(1-阻塞系数),阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。

从实际应用来看,线程数的多少没有确定公式可遵循,具体得看设备的硬件情况和程序要求。下面两张图分别是线程数=2*CPU数和线程数 = CPU数/(1-阻塞系数)的系统监控图。当线程数较小时,没有充分发挥系统能力,当然这不会有什么问题,只是可能耗费较多时间。但是当线程数过多时,系统内存占用急速上升,资源不够导致程序奔溃。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4jeTmM3F-1675827491534)(https://itbird.oss-cn-beijing.aliyuncs.com/img/2023/02/07/IO%E5%AF%86%E9%9B%86%E5%9E%8B3.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fbxU5oc3-1675827491535)(https://itbird.oss-cn-beijing.aliyuncs.com/img/2023/02/07/IO%E5%AF%86%E9%9B%86%E5%9E%8B2-16757806087952.png)]

因此,线程数的确定并不是一个固定的数值,而是需要根据系统的实际情况进行调整的。因此,最好的方法是通过测试和实验来确定线程数

  • 对于 CPU 密集型程序,线程数的确定主要考虑 CPU 核心数和 CPU 使用率。一般情况下,线程数可以等于 CPU 核心数,这样可以充分利用 CPU 的性能。当然,线程数也可以大于 CPU 核心数,但这样会增加 CPU 上下文切换的代价,从而影响程序的性能。因此,通常需要在实际测试的基础上确定线程数。
  • 对于 IO 密集型程序,线程数的确定主要考虑两个因素:I/O 设备的数量和最大并发数。一般情况下,线程数可以等于 I/O 设备的数量或者是 I/O 设备最大并发数,这取决于应用程序的需求和系统的能力。

主要代码

整合Excel

import multiprocessing
import threading
import os
import glob
import numpy as np
import pandas as pd

cpu_num = multiprocessing.cpu_count()
block_coef = 0.9
thread_num = int(np.ceil(cpu_num/(1-block_coef)))


def merge(files: list = [], is_old=True):
    if files is None or files == []:
        return
    file_name = os.path.basename(os.path.dirname(files[0]))
    if is_old:
        file_name = file_name + '-1.csv'
    else:
        file_name = file_name + '-2.csv'
    df = pd.DataFrame()
    for f in files:
        print(f)
        tmp = pd.read_excel(f)
        df = pd.concat([df, tmp])
    df.to_csv('./output/'+file_name, index=None, encoding='utf-8')


def resolve(batch: list = []):
    """
    每个线程负责处理一批数据
    """
    for b in batch:
        merge(b)


if __name__ == '__main__':
    dirs = glob.glob('./raw_data/*')
    file_list = []
    for dir in dirs:
        file_list.append([os.path.join(dir, i) for i in os.listdir(dir)])
    file_list = sorted(file_list)
    batch_size = int(len(dirs) / thread_num)
    split = int(np.ceil(len(dirs) / batch_size))
    for i in range(split):
        threading.Thread(target=resolve,
                         args=(file_list[i * batch_size:(i + 1) * batch_size],)).start()

压差计算

df = util.get_dataset(car_id)
# 解析时间格式字段
df["上报时间"] = pd.to_datetime(df["上报时间"])
df['压差'] = df['电池单体电压最高值'] - df['电池单体电压最低值']

区间频率统计

统计不同区间内的数据量

import pandas as pd
import numpy as np
pd.set_option('display.max_rows', None)

df = pd.read_csv(r"xxx.csv")
df["上报时间"] = pd.to_datetime(df["上报时间"])
df.set_index(df["上报时间"], inplace=True)
df['压差'] = (df['电池单体电压最高值'] - df['电池单体电压最低值'])*1000

bins = [-1e-10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 120, 140, 160, 180, 200, np.inf]
result = pd.cut(df['压差'], bins=bins,right=True)
# 全量数据的区间统计
result.value_counts()
# 按月区间统计
d = pd.cut(data['压差'], bins=bins,right=True).groupby([data.index.year, data.index.month]).value_counts()
d.index.names = ['year', 'month', 'group']
d = pd.DataFrame(d)
# 查询2022年6月的数据情况
d.loc[2022].loc[6]['压差']

其他

绘制图表,控制显示或者保存

import os
import plotly
import plotly.graph_objects as go

def plot_fig(data, x="上报时间", y=None, y2=None, title='', mode='lines', to_html=False, carid=''):
    fig = go.Figure()
    # fig = px.line(data, x=x, y=y)
    if type(y) == str:
        fig.add_trace(go.Scatter(x=data[x], y=data[y], name=y, mode=mode))
        if y == '压差':
            fig.update_layout(yaxis=dict(range=[data[y].min(), 0.5]))
    else:
        for col in y:
            fig.add_trace(go.Scatter(x=data[x], y=data[col], name=col, mode=mode))
    if y2 is not None:
        # 设置双y轴
        fig.add_trace(go.Scatter(x=data[x], y=data[y2], xaxis="x", yaxis="y2", name=y2))
        fig.update_layout(yaxis2=dict(anchor='x', overlaying='y', side='right'))

    fig.update_xaxes(
        rangeslider_visible=True,
        rangeselector=dict(
            buttons=list(
                [
                    dict(count=1, label="1月", step="month", stepmode="backward"),
                    dict(count=6, label="半年", step="month", stepmode="backward"),
                    dict(count=1, label="今年", step="year", stepmode="todate"),
                    dict(count=1, label="1年", step="year", stepmode="backward"),
                    dict(step="all"),
                ]
            )
        ),
        type="date",
        tickformat='%Y-%m-%d %H:%M:%S'
    )
    fig.update_layout(width=1500, height=600, title=carid + "-" + title)

    if to_html:
        parent_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'output', carid)
        print(parent_dir)
        if not os.path.exists(parent_dir):
            os.mkdir(parent_dir)
        filename = parent_dir + '/' + title + '.html'
        # plotly.io.write_html(fig, filename)
        # 关闭自动用浏览器打开,真坑爹,浏览器莫名其妙打开
        plotly.offline.plot(fig, filename=filename, auto_open=False)
    else:
        fig.show()

结语

好像快一年没写文章了。。。最近整理下过去一年工作和科研方面的成果和经验与大家分享。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

IO密集型和CPU密集型程序-概念与实现 的相关文章

  • 使用 django-rest-framework 设置对象级权限

    尝试使用 django rest framework 最干净 最规范地管理 django guardian 对象级权限 我想将对象的读取权限 module view object 分配给在执行 POST 时发出请求的用户 我的基于阶级的观点
  • 多处理中的动态池大小?

    有没有办法动态调整multiprocessing Pool尺寸 我正在编写一个简单的服务器进程 它会产生工作人员来处理新任务 使用multiprocessing Process对于这种情况可能更适合 因为工作人员的数量不应该是固定的 但我需
  • Mypy 无法从文字列表推断项目的类型

    我有一个变量x和一个文字列表 例如 0 1 2 我想转换x这些文字之一 如果x在列表中 我将其退回 否则我返回一个后备值 from typing import Literal Set Foo Literal 0 1 2 foos Set F
  • 是否可以从 Julia 调用 Python 函数并返回其结果?

    我正在使用 Python 从网络上抓取数据 我想使用这些数据在 Julia 中运行计算 是否可以在 Julia 中调用该函数并返回其结果 或者我最好直接导出到 CSV 并以这种方式加载数据 绝对地 看PyCall jl https gith
  • 无法在 selenium 和 requests 之间传递 cookie,以便使用后者进行抓取

    我用 python 结合 selenium 编写了一个脚本来登录网站 然后从driver to requests这样我就可以继续使用requests进行进一步的活动 I used item soup select one div class
  • Series.sort() 和 Series.order() 有什么区别?

    s pd Series nr randint 0 10 5 index nr randint 0 10 5 s Output 1 3 7 6 2 0 9 7 1 6 order 按值排序并返回一个新系列 s order Output 2 0
  • Arcpy 模数在 Pycharm 中不显示

    如何将 Arcpy 集成到 Pycharm 中 我尝试通过导入模块但它没有显示 我确实知道该模块仅适用于 2 x python arcpy 在 PyPi Python 包索引 上不可用 因此无法通过 pip 安装 要使用 arcpy 您需要
  • 根据其他单元格值更改多个单元格值

    我想更改包含的单元格moving to movingToOpenor movingToClose基于下一个单元格中给出的状态 有时循环会被中断并且不会从open to close or close to open 这是我当前的数据框 Dat
  • 查找 Pandas DF 行中的最短日期并创建新列

    我有一个包含多个日期的表 有些日期将为 NaN 我需要找到最旧的日期 所以一行可能有 DATE MODIFIED WITHDRAWN DATE SOLD DATE STATUS DATE 等 因此 对于每一行 一个或多个字段中都会有一个日期
  • 给定一个排序数组,就地删除重复项,使每个元素仅出现一次并返回新长度

    完整的问题 我开始在线学习 python 但对这个标记为简单的问题有疑问 给定一个排序数组 就地删除重复项 使得每个 元素只出现一次并返回新的长度 不分配 另一个数组的额外空间 您必须通过修改输入来完成此操作 数组就地 具有 O 1 额外内
  • Airflow 1.9 - 无法将日志写入 s3

    我在 aws 的 kubernetes 中运行气流 1 9 我希望将日志发送到 s3 因为气流容器本身的寿命并不长 我已经阅读了描述该过程的各种线程和文档 但我仍然无法让它工作 首先是一个测试 向我证明 s3 配置和权限是有效的 这是在我们
  • 检测是否从psycopg2游标获取?

    假设我执行以下命令 insert into hello username values me 我跑起来就像 cursor fetchall 我收到以下错误 psycopg2 ProgrammingError no results to fe
  • 使用 PIL 在 Tkinter 中显示动画 GIF

    我正在尝试制作一个程序来使用 Tkinter 显示动画 GIF 这是我最初使用的代码 from future import division Just because division doesn t work right in 2 7 4
  • 如何在亚马逊 EC2 上调试 python 网站?

    我是网络开发新手 这可能是一个愚蠢的问题 但我找不到可以帮助我的确切答案或教程 我工作的公司的网站 用 python django 构建 托管在亚马逊 EC2 上 我想知道从哪里开始调试这个生产站点并检查存储在那里的日志和数据库 我有帐户信
  • 如何将带有参数的Python装饰器实现为类?

    我正在尝试实现一个接受一些参数的装饰器 通常带有参数的装饰器被实现为双重嵌套闭包 如下所示 def mydecorator param1 param2 do something with params def wrapper fn def
  • minizinc python 安装

    我通过 anaconda 提示符在 python 上安装了 minizinc 就像其他软件包一样 pip install minizinc 该软件包表示已成功安装 我可以导入该模块 但是 我正在遵循基本示例https minizinc py
  • Django 管理器链接

    我想知道是否有可能 如果可以的话 如何 将多个管理器链接在一起以生成受两个单独管理器影响的查询集 我将解释我正在研究的具体示例 我有多个抽象模型类 用于为其他模型提供小型的特定功能 其中两个模型是DeleteMixin 和GlobalMix
  • 带 Flask 的 RPI dht22:无法将第 4 行设置为输入 - 等待 PulseIn 消息超时

    我正在尝试制作一个 Raspberry Pi 3 REST API 使用 DHT22 提供温度和湿度 整个代码 from flask import Flask jsonify request from sds011 import SDS01
  • 如何从namedtuple实例列表创建pandas DataFrame(带有索引或多索引)?

    简单的例子 from collections import namedtuple import pandas Price namedtuple Price ticker date price a Price GE 2010 01 01 30
  • pandas 中数据帧中的随机/洗牌行

    我目前正在尝试找到一种方法来按行随机化数据框中的项目 我在 pandas 中按列洗牌 排列找到了这个线程 在 pandas 中对 DataFrame 进行改组 排列 https stackoverflow com questions 157

随机推荐

  • 网络证书有含金量吗?(转)

    网络证书有含金量吗 转 more 老板 含金量 市场需求 个人能力 说到证书的含金量 微软某培训经理曾经在网上有一个非常精辟的论述 如果两个人都是清华大学计算机系本科毕业的 两个人毕业时工资会一样么 如果发展三年以后 我想工资会相差很大 但
  • Java 终止线程的几种方式

    一 正常运行结束 所谓正常运行结束 就是程序正常运行结束 线程自动结束 public class ThreadTest extends Thread public void run do something 二 使用退出标志退出线程 一般r
  • JVM篇-堆空间(Heap)

    堆的核心概述 一个JVM实例只存在一个堆内存 堆也是Java内存管理的核心区域 Java堆区在JVM启动的时候既被创建 其空间到校也就确定了 是JVM管理的最大一块内存空间 堆内存可以调节 Java虚拟机规范 规范 对可以处于物理上不连续的
  • 关于SaaS平台中应对多租户模式的设计

    这几年 在公司尝试转型做产品 所以引入了很多的产品的理念 不管是对产品的定义 还是针对产品的管理 以及摸索产品的落地等等 我之前更多的是接触的ToB端 所以想必也猜到了是一个SaaS模式的产品 其实 现在回想并总结 之前所做的产品并不理想
  • Windows应急响应 - 敏感目录文件痕迹排查,最近打开的文件 Recent,临时目录Temp,预读取文件Prefetch,程序执行情况Amcache.hve,Windows文件访问时间不更新原理

    作者简介 CSDN top100 阿里云博客专家 华为云享专家 网络安全领域优质创作者 推荐专栏 对网络安全感兴趣的小伙伴可以关注专栏 网络安全入门到精通 敏感文件痕迹排查 一 根据时间查找 Forfiles 1 访问时间不更新问题 二 最
  • 电源管理芯片8个引脚说明

    1 脚 COMP 为误差放大器补偿脚 该脚与误差放大器反相输入端 VFB 之间应接入RC补偿网络 以改善误差放大器的性能 2 脚 VFB 为误差放大器的反相输入端 反馈电压接入该脚 与误差放大器同相输入端的基准电压比较 以便设定误差电压 3
  • 实现增删改查

    实现增删改查 1 UserMapper接口 要在UserMapper xml配置中的namespace中绑定这个接口 在mybatis config xml中绑定UserMapper xml这个配置文件
  • 阿里把中台变薄,背后逻辑是什么?

    颠覆式创新怎么做 中台适合做组合式创新 不适合做颠覆式创新 那么颠覆式创新如何做呢 各家巨头做法不太一样 腾讯使用 赛马机制 马化腾没有想到 10年前的那场 赛马 最后跑出来的是一个叫张小龙的人和他所带领的 微信 团队 而他们此前 从来没做
  • 关于接口自动化,你不能不知道的高级技巧——接口自动化神器apin进阶操作

    一 变量提取和引用 变量提取和引用主要是为了解决接口之间的参数依赖问题 使用场景 接口 A 的参数中需要使用接口 B 返回的某个数据 那么就要在请求 B 接口之后 提取数据保存 给请求 A 接口时使用 1 变量提取 在用例集或用例数据中 通
  • 简单的相似度计算

    相似度就是比较两个事物的相似性 一般通过计算事物的特征之间的距离 如果距离小 那么相似度大 如果距离大 那么相似度小 欧氏距离 计算两个点的空间距离 距离越小 越相似 二维平面 三维平面 n维向量 例 a 1 1 2 0 1 1 0 0 0
  • Opencv系列1_opencv对单张DCM文件的读取并显示

    实例1 opencv对单张DCM文件的读取并显示 include
  • react+antd中使用插件js-export-excel将table数据导出为excel文件

    首先在项目中安装js export excel插件 进入项目目录 执行以下命令 npm安装 npm install save js export excel yarn安装 yarn add js export excel 安装好之后 查看p
  • OpenBLAS简介及在Windows7 VS2013上源码的编译过程

    OpenBLAS Open Basic Linear Algebra Subprograms 是开源的基本线性代数子程序库 是一个优化的高性能多核BLAS库 主要包括矩阵与矩阵 矩阵与向量 向量与向量等操作 它的License是BSD 3
  • 【OpenCV】C++ OpenCV 快速入门案例Demo

    目录 1 开发环境配置 1 1 软件版本 1 2 具体配置 1 3 程序框架 2 程序代码 2 1 Demo头文件 QuickDemo h 2 2 Demo源代码 QuickDemo cpp 2 3 测试代码 test450 cpp 1 开
  • 没有计算机基础如何学习Java、Python或者前端?

    这是一个学习为主的时代 没有自我驱动能力很容易被淘汰 在大家的固有印象中就会觉得科班出身的就一定优秀 非科班相对来说就不专业 其实不尽然 一 非科班怎么学编程 科班在计算机基础方面 计算机基础原理 计算机网络 数据结构和算法 操作系统这些方
  • 作业:修改JSP注册页面

    恢复内容开始 表单 userLogin jsp
  • 一篇文章教你Pytest快速入门和基础讲解,一定要看!

    前言 目前有两种纯测试的测试框架 pytest和unittest unittest应该是广为人知 而且也是老框架了 很多人都用来做自动化 无论是UI还是接口 pytest是基于unittest开发的另一款更高级更好用的单元测试框架 出去面试
  • MySQL利用Navicat导出数据字典

    数据字典是一名DBA需要维护的重要内容 可以通过db直接查看 也可以导出到excel后进行查看 1 利用mysql的information schema中的COLUMNS表 和navicat中的导出功能实现快速导出数据字典 SQL sele
  • UnityWebGL调研(3) 项目打包

    之前单独打包项目中用到的插件的Demo场景都没有问题 开始尝试打包项目参加 结果有问题 打包能够成功 但是运行有问题 提示中找不到问题所在 然后这个是新项目 考虑用Unity2019 2 1f打包看看 和之前一样 逐个打包插件的Demo 都
  • IO密集型和CPU密集型程序-概念与实现

    欢迎关注笔者的微信公众号 概念 在计算机科学中 有两种不同类型的程序 IO 密集型和 CPU 密集型 这两种程序的主要差别在于它们在执行任务时瓶颈所在的地方 IO 密集型 这类程序主要通过读写磁盘文件 网络通信等外部设备来完成任务 因此它们