并行文件匹配,Python

2023-12-15

我正在尝试改进扫描文件中是否存在恶意代码的脚本。我们在文件中有一个正则表达式模式列表,每行一个模式。这些正则表达式适用于 grep,因为我们当前的实现基本上是 bash 脚本 find\grep 组合。 bash 脚本在我的基准目录上花费了 358 秒。我能够编写一个 Python 脚本,在 72 秒内完成此操作,但希望进一步改进。首先,我将发布基本代码,然后调整我尝试过的:

import os, sys, Queue, threading, re

fileList = []
rootDir = sys.argv[1]

class Recurser(threading.Thread):

    def __init__(self, queue, dir):
    self.queue = queue
    self.dir = dir
    threading.Thread.__init__(self)

    def run(self):
    self.addToQueue(self.dir)

    ## HELPER FUNCTION FOR INTERNAL USE ONLY
    def addToQueue(self,  rootDir):
      for root, subFolders, files in os.walk(rootDir):
    for file in files:
       self.queue.put(os.path.join(root,file))
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)

class Scanner(threading.Thread):

    def __init__(self, queue, patterns):
    self.queue = queue
    self.patterns = patterns
    threading.Thread.__init__(self)

    def run(self):
    nextFile = self.queue.get()
    while nextFile is not -1:
       #print "Trying " + nextFile
       self.scanFile(nextFile)
       nextFile = self.queue.get()


    #HELPER FUNCTION FOR INTERNAL UES ONLY
    def scanFile(self, file):
       fp = open(file)
       contents = fp.read()
       i=0
       #for patt in self.patterns:
       if self.patterns.search(contents):
      print "Match " + str(i) + " found in " + file

############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################


fileQueue = Queue.Queue()

#Get the shell scanner patterns
patterns = []
fPatt = open('/root/patterns')
giantRE = '('
for line in fPatt:
   #patterns.append(re.compile(line.rstrip(), re.IGNORECASE))
   giantRE = giantRE + line.rstrip() + '|'

giantRE = giantRE[:-1] + ')'
giantRE = re.compile(giantRE, re.IGNORECASE)

#start recursing the directories
recurser = Recurser(fileQueue,rootDir)
recurser.start()

print "starting scanner"
#start checking the files
for scanner in xrange(0,8):
   scanner = Scanner(fileQueue, giantRE)
   scanner.start()

这显然是调试\丑陋的代码,不要介意百万个queue.put(-1),我稍后会清理它。某些缩进未正确显示,特别是在 scanFile 中。

无论如何,我注意到了一些事情。使用 1 个、4 个甚至 8 个线程(对于 xrange(0,???) 中的扫描仪:) 没有什么区别。不管怎样,我仍然有大约 72 秒的时间。我认为这是由于 python 的 GIL 造成的。

与制作巨大的正则表达式相反,我尝试将每一行(模式)作为编译 RE 放在列表中,并在我的 scanfile 函数中迭代此列表。这导致执行时间更长。

为了避免 python 的 GIL,我尝试让每个线程 fork 到 grep,如下所示:

#HELPER FUNCTION FOR INTERNAL UES ONLY
def scanFile(self, file):
      s = subprocess.Popen(("grep", "-El", "--file=/root/patterns", file), stdout = subprocess.PIPE)
      output = s.communicate()[0]
      if output != '':
         print 'Matchfound in ' + file

这导致执行时间更长。

关于提高性能的任何建议。

:::::::::::::编辑::::::::

我无法发布我自己问题的答案,但以下是对所提出的几点的答案:

@David Nehme - 只是为了让人们知道我知道我有一百万个queue.put(-1)

@Blender - 标记队列的底部。我的扫描器线程不断出队,直到到达底部的 -1(而 nextFile 不是 -1:)。处理器核心为 8 个,但由于 GIL 使用 1 个线程、4 个线程或 8 个线程,因此没有区别。生成 8 个子进程会导致代码显着变慢(142 秒 vs 72 秒)

@ed - 是的,它和 find\grep 组合一样慢,实际上更慢,因为它不加区别地 grep 不需要的文件

@Ron - 无法升级,这必须是通用的。你认为这会加速 > 72 秒吗? bash grepper 执行 358 秒。我的 python 巨型 RE 方法需要 1-8 个线程执行 72 秒。 popen 方法有 8 个线程(8 个子进程),运行时间为 142 秒。到目前为止,巨型 RE python 方法显然是赢家

@intuted

这是我们当前的 find\grep 组合的核心内容(不是我的脚本)。这很简单。其中还有一些其他内容,例如 ls,但不会导致速度减慢 5 倍。即使 grep -r 的效率稍微高一点,5 倍也会造成巨大的减慢。

 find "${TARGET}" -type f -size "${SZLIMIT}" -exec grep -Eaq --file="${HOME}/patterns" "{}" \; -and -ls | tee -a "${HOME}/found.txt"

python代码效率更高,不知道为什么,但是我实验测试了一下。我更喜欢用 python 来做这件事。我已经用 python 实现了 5 倍的加速,我想让它加速更多。

::::::::::::::获胜者获胜者获胜者::::::::::::::::::

看来我们有赢家了。

intued 的 shell 脚本以 34 秒位居第二,但 @steveha 的 shell 脚本以 24 秒位居第一。由于我们的很多盒子没有 python2.6,我不得不 cx_freeze 它。我可以编写一个 shell 脚本包装器来获取 tar 并将其解压。不过,为了简单起见,我确实喜欢直觉。

感谢你们所有的帮助,我现在有了一个有效的系统管理工具


我认为,而不是使用threading模块,你应该使用multiprocessingPython 解决方案的模块。 Python 线程可能会与 GIL 发生冲突;如果您只是运行多个 Python 进程,那么 GIL 不是问题。

我认为对于你正在做的事情来说,工作进程池正是你想要的。默认情况下,池将默认为系统处理器中的每个核心分配一个进程。只需致电.map()带有要检查的文件名列表的方法以及执行检查的函数。

http://docs.python.org/library/multiprocessing.html

如果这不比你的快threading实施,那么我不认为 GIL 是你的问题。

编辑:好的,我正在添加一个可用的 Python 程序。这使用工作进程池来打开每个文件并搜索每个文件中的模式。当工作程序找到匹配的文件名时,它只是将其打印(到标准输出),以便您可以将此脚本的输出重定向到文件中,并且您将获得文件列表。

编辑:我认为这是一个更容易阅读的版本,更容易理解。

我计时了这一点,搜索了我计算机上 /usr/include 中的文件。它在大约半秒内完成搜索。使用find管道通过xargs运行尽可能少grep尽可能地处理,大约需要 0.05 秒,大约有 10 倍的加速。但我讨厌你必须使用巴洛克式的奇怪语言才能得到find可以正常工作,而且我喜欢 Python 版本。也许在真正大的目录上,差异会更小,因为 Python 的半秒的一部分一定是启动时间。对于大多数用途来说,也许半秒就足够快了!

import multiprocessing as mp
import os
import re
import sys

from stat import S_ISREG


# uncomment these if you really want a hard-coded $HOME/patterns file
#home = os.environ.get('HOME')
#patterns_file = os.path.join(home, 'patterns')

target = sys.argv[1]
size_limit = int(sys.argv[2])
assert size_limit >= 0
patterns_file = sys.argv[3]


# build s_pat as string like:  (?:foo|bar|baz)
# This will match any of the sub-patterns foo, bar, or baz
# but the '?:' means Python won't bother to build a "match group".
with open(patterns_file) as f:
    s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))

# pre-compile pattern for speed
pat = re.compile(s_pat)


def walk_files(topdir):
    """yield up full pathname for each file in tree under topdir"""
    for dirpath, dirnames, filenames in os.walk(topdir):
        for fname in filenames:
            pathname = os.path.join(dirpath, fname)
            yield pathname

def files_to_search(topdir):
    """yield up full pathname for only files we want to search"""
    for fname in walk_files(topdir):
        try:
            # if it is a regular file and big enough, we want to search it
            sr = os.stat(fname)
            if S_ISREG(sr.st_mode) and sr.st_size >= size_limit:
                yield fname
        except OSError:
            pass

def worker_search_fn(fname):
    with open(fname, 'rt') as f:
        # read one line at a time from file
        for line in f:
            if re.search(pat, line):
                # found a match! print filename to stdout
                print(fname)
                # stop reading file; just return
                return

mp.Pool().map(worker_search_fn, files_to_search(target))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

并行文件匹配,Python 的相关文章

  • numpy:大量线段/点的快速规则间隔平均值

    我沿着一维线有许多 约 100 万个 不规则间隔的点 P 这些标记线段 这样 如果点是 0 x a x b x c x d 则线段从 0 gt x a x a gt x b x b gt x c x c gt x d 等 我还有每个段的 y
  • 如何成功地用 XML 中的批处理替换文本

    我尝试使用批处理在 XML 页面中替换字符串 但无法成功完全替换它 我有这个批处理代码 echo off setlocal EnableDelayedExpansion set search logLevel 3 set replace l
  • 信号处理程序有单独的堆栈吗?

    信号处理程序是否有单独的堆栈 就像每个线程都有单独的堆栈一样 这是在 Linux C 环境中 来自 Linux 手册页signal 7 http kernel org doc man pages online pages man7 sign
  • 在Python中以交互方式执行多行语句

    我是 Python 世界的新手 这是我用 Python 编写的第一个程序 我来自 R 世界 所以这对我来说有点不直观 当我执行时 In 15 import math import random random random math sqrt
  • 张量流和线程

    下面是来自 Tensorflow 网站的简单 mnist 教程 即单层 softmax 我尝试通过多线程训练步骤对其进行扩展 from tensorflow examples tutorials mnist import input dat
  • scikit-learn 和tensorflow 有什么区别?可以一起使用它们吗?

    对于这个问题我无法得到满意的答案 据我了解 TensorFlow是一个数值计算库 经常用于深度学习应用 而Scikit learn是一个通用机器学习框架 但它们之间的确切区别是什么 TensorFlow 的目的和功能是什么 我可以一起使用它
  • Tensorflow 不分配完整的 GPU 内存

    Tensorflow 默认分配所有 GPU 内存 但我的新设置实际上只有 9588 MiB 11264 MiB 我预计大约 11 000MiB 就像我的旧设置一样 张量流信息在这里 from tensorflow python client
  • Android 为什么这不会抛出错误的线程异常?

    我的印象是视图只能从主线程操作 但是 为什么这不会崩溃 public class MainActivity extends Activity TextView tv Override protected void onCreate Bund
  • 返回上个月的日期时间对象

    如果 timedelta 在它的构造函数中有一个月份参数就好了 那么最简单的方法是什么 EDIT 正如下面指出的那样 我并没有认真考虑这一点 我真正想要的是上个月的任何一天 因为最终我只会获取年份和月份 因此 给定一个日期时间对象 返回的最
  • 如何将类添加到 LinkML 中的 SchemaDefinition?

    中的图表https linkml io linkml model docs SchemaDefinition https linkml io linkml model docs SchemaDefinition and https link
  • 如何使用 Celery 多工作人员启用自动缩放?

    命令celery worker A proj autoscale 10 1 loglevel info启动具有自动缩放功能的工作人员 当创建多个工人时 me mypc projects x celery multi start mywork
  • 如何在字符串vba中包含引号

    我想存储以下文本 Test1 Monday Test Abcdef 全部在字符串中包含引号 我知道要在字符串中包含引号 我必须包含 之前 但在这里这不是一个很好的解决方案 因为我在文本中有太多这样的解决方案 知道如何一次完成这一切吗 您有两
  • 更改特定字符串的颜色

    有谁知道如果将特定单词输入文本区域 我如何更改它的颜色 例如 如果用户输入 你好我的朋友 它会动态地将 你好 更改为绿色 在google上花了很多时间 找不到任何相关的东西 谢谢 textareas 的设计目的不是选择性着色
  • 如何使用 django-pyodbc (ubuntu 16.04) 配置数据库设置 Django-MSSQL?

    我是 Django 新手 目前正在尝试使用另一个数据库来保存我的模型 即MS SQL 我的数据库部署在docker容器中 903876e64b67 microsoft mssql server linux bin sh c opt mssq
  • 线性同余生成器 - 如何选择种子和统计检验

    我需要做一个线性同余生成器 它将成功通过所选的统计测试 我的问题是 如何正确选择发电机的数字以及 我应该选择哪些统计检验 我想 均匀性的卡方频率测试 每代收集10 000个号码的方法 将 0 1 细分为10个相等的细分 柯尔莫哥洛夫 斯米尔
  • Django Rest Framework POST 更新(如果存在或创建)

    我是 DRF 的新手 我阅读了 API 文档 也许这是显而易见的 但我找不到一个方便的方法来做到这一点 我有一个Answer与 a 具有一对一关系的对象Question 在前端 我曾经使用 POST 方法来创建发送到的答案api answe
  • 在 scipy 中创建新的发行版

    我试图根据我拥有的一些数据创建一个分布 然后从该分布中随机抽取 这是我所拥有的 from scipy import stats import numpy def getDistribution data kernel stats gauss
  • 更新 SQLAlchemy 中的特定行

    我将 SQLAlchemy 与 python 一起使用 我想更新表中等于此查询的特定行 UPDATE User SET name user WHERE id 3 我通过 sql alchemy 编写了这段代码 但它不起作用 session
  • 沿轴 0 重复 scipy csr 稀疏矩阵

    我想重复 scipy csr 稀疏矩阵的行 但是当我尝试调用 numpy 的重复方法时 它只是将稀疏矩阵视为对象 并且只会将其作为 ndarray 中的对象重复 我浏览了文档 但找不到任何实用程序来重复 scipy csr 稀疏矩阵的行 我
  • 如何在 xslt 2.0 中解析字符串到日期

    是否可以像这样转换字符串30042013 2013 年 4 月 30 日 日期格式 所以我可以稍后在类似的函数中使用它format date 就像托马拉克说的 你可以使用substring and concat 要构建一个字符串 您可以将其

随机推荐