我正在尝试通过队列使解析器成为多线程。它似乎有效,但我的队列挂起。如果有人能告诉我如何解决这个问题,我将不胜感激,因为我很少编写多线程代码。
此代码从 Q 中读取:
from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread
l = []
q = Queue.Queue()
def parse_record():
d = {}
while not q.empty():
rec = q.get()
d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
# ... many ops like this
d['dport'] = rec.dport
l.append(d) # l is global
这填补了问题:
def parse_records():
ffile = '/tmp/query.rwf'
flows = SilkFile(ffile, READ)
numthreads = 2
# fill queue
for rec in flows:
q.put(rec)
# work on Queue
for i in range(numthreads):
t = Thread(target = parse_record)
t.daemon = True
t.start()
# blocking
q.join()
# never reached
data_df = pandas.DataFrame.from_records(l)
return data_df
我只打电话parse_records()
在我的主要。它永远不会终止。
The 队列.空文档 https://docs.python.org/3.6/library/queue.html#queue.Queue.empty says:
...如果empty()返回False,它不能保证对get()的后续调用不会阻塞。
至少你应该使用get_nowait
或面临数据丢失的风险。但更重要的是,只有当所有排队项目都被标记为完成时,连接才会释放队列.task_done https://docs.python.org/3.6/library/queue.html#queue.Queue.task_done call:
如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(这意味着对于已 put() 到队列中的每个项目都会收到一个 task_done() 调用)。
作为旁注,l.append(d)
不是原子的,应该用锁来保护。
from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread, Lock
l = []
l_lock = Lock()
q = Queue.Queue()
def parse_record():
d = {}
while 1:
try:
rec = q.getnowait()
d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
# ... many ops like this
d['dport'] = rec.dport
with l_lock():
l.append(d) # l is global
q.task_done()
except Queue.Empty:
return
通过使用标准库中的线程池,您可以大大缩短代码。
from silk import *
import json
import datetime
import pandas
import multiprocessing.pool
def parse_record(rec):
d = {}
d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
# ... many ops like this
d['dport'] = rec.dport
return d
def parse_records():
ffile = '/tmp/query.rwf'
flows = SilkFile(ffile, READ)
pool = multiprocessing.pool.Pool(2)
data_df = pandas.DataFrame.from_records(pool.map(parse_record), flows)
pool.close()
return data_df
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)