我读过多篇关于此问题的 StackOverflow 文章以及大部分 Google 前 10 名结果。我的问题的不同之处在于我使用 python 中的一个脚本来创建 JSON 文件。不到 10 分钟后运行的下一个脚本无法读取该文件。
简而言之,我为我的在线业务生成潜在客户。我正在尝试学习Python,以便更好地分析这些线索。我正在搜寻 2 年的潜在客户,目的是保留有用的数据并删除任何个人信息(电子邮件地址、姓名等),同时还将 30,000 多个潜在客户保存到几十个文件中以便于访问。
因此,我的第一个脚本打开每个单独的潜在客户文件 - 30,000+ - 根据文件中的时间戳确定捕获的日期。然后它会保存指向 dict 中相应键的信息。当所有数据都聚合到该字典中时,将使用 json.dumps 写入文本文件。
该字典的结构是:
addData['lead']['July_2013'] = { ... }
其中“lead”键可以是“lead”、“partial”和其他一些键,“July_2013”键显然是一个基于日期的键,可以是完整月份和 2013 年或 2014 年的任意组合,可追溯到“February_2013”。
完整的错误是这样的:
ValueError: Unterminated string starting at: line 1 column 9997847 (char 9997846)
但我手动查看了该文件,我的 IDE 显示该文件中只有 76,655 个字符。那么它是如何达到9997846的呢?
失败的文件是第8个要读取的文件;其他 7 个文件以及之后通过 json.loads 读入的所有其他文件都很好。
Python 说存在未终止的字符串,因此我查看了失败的文件中 JSON 的末尾,看起来没问题。我见过一些关于 JSON 中换行符 \n 的提及,但该字符串都是一行。我见过提到 \ vs \ 但在快速浏览整个文件时我没有看到任何 .其他文件确实有 \ 并且它们读起来很好。而且,这些文件都是由 json.dumps 创建的。
我无法发布该文件,因为其中仍然包含个人信息。手动尝试验证 76,000 个字符文件的 JSON 并不真正可行。
关于如何调试这个的想法将不胜感激。与此同时,我将尝试重建文件,看看这是否不仅仅是一次性错误,而是需要一段时间。
- 通过 Spyder 和 Anaconda 的 Python 2.7
- Windows 7 专业版
- - 编辑 - -
根据请求,我在这里发布编写代码:
from p2p.basic import files as f
from p2p.adv import strTools as st
from p2p.basic import strTools as s
import os
import json
import copy
from datetime import datetime
import time
global leadDir
global archiveDir
global aggLeads
def aggregate_individual_lead_files():
"""
"""
# Get the aggLead global and
global aggLeads
# Get all the Files with a 'lead' extension & aggregate them
exts = [
'lead',
'partial',
'inp',
'err',
'nobuyer',
'prospect',
'sent'
]
for srchExt in exts:
agg = {}
leads = f.recursiveGlob(leadDir, '*.cd.' + srchExt)
print "There are {} {} files to process".format(len(leads), srchExt)
for lead in leads:
# Get the Base Filename
fname = f.basename(lead)
#uniqID = st.fetchBefore('.', fname)
#print "File: ", lead
# Get Lead Data
leadData = json.loads(f.file_get_contents(lead))
agg = agg_data(leadData, agg, fname)
aggLeads[srchExt] = copy.deepcopy(agg)
print "Aggregate Top Lvl Keys: ", aggLeads.keys()
print "Aggregate Next Lvl Keys: "
for key in aggLeads:
print "{}: ".format(key)
for arcDate in aggLeads[key].keys():
print "{}: {}".format(arcDate, len(aggLeads[key][arcDate]))
# raw_input("Press Enter to continue...")
def agg_data(leadData, agg, fname=None):
"""
"""
#print "Lead: ", leadData
# Get the timestamp of the lead
try:
ts = leadData['timeStamp']
leadData.pop('timeStamp')
except KeyError:
return agg
leadDate = datetime.fromtimestamp(ts)
arcDate = leadDate.strftime("%B_%Y")
#print "Archive Date: ", arcDate
try:
agg[arcDate][ts] = leadData
except KeyError:
agg[arcDate] = {}
agg[arcDate][ts] = leadData
except TypeError:
print "Timestamp: ", ts
print "Lead: ", leadData
print "Archive Date: ", arcDate
return agg
"""
if fname is not None:
archive_lead(fname, arcDate)
"""
#print "File: {} added to {}".format(fname, arcDate)
return agg
def archive_lead(fname, arcDate):
# Archive Path
newArcPath = archiveDir + arcDate + '//'
if not os.path.exists(newArcPath):
os.makedirs(newArcPath)
# Move the file to the archive
os.rename(leadDir + fname, newArcPath + fname)
def reformat_old_agg_data():
"""
"""
# Get the aggLead global and
global aggLeads
aggComplete = {}
aggPartial = {}
oldAggFiles = f.recursiveGlob(leadDir, '*.cd.agg')
print "There are {} old aggregate files to process".format(len(oldAggFiles))
for agg in oldAggFiles:
tmp = json.loads(f.file_get_contents(agg))
for uniqId in tmp:
leadData = tmp[uniqId]
if leadData['isPartial'] == True:
aggPartial = agg_data(leadData, aggPartial)
else:
aggComplete = agg_data(leadData, aggComplete)
arcData = dict(aggLeads['lead'].items() + aggComplete.items())
aggLeads['lead'] = arcData
arcData = dict(aggLeads['partial'].items() + aggPartial.items())
aggLeads['partial'] = arcData
def output_agg_files():
for ext in aggLeads:
for arcDate in aggLeads[ext]:
arcFile = leadDir + arcDate + '.cd.' + ext + '.agg'
if f.file_exists(arcFile):
tmp = json.loads(f.file_get_contents(arcFile))
else:
tmp = {}
arcData = dict(tmp.items() + aggLeads[ext][arcDate].items())
f.file_put_contents(arcFile, json.dumps(arcData))
def main():
global leadDir
global archiveDir
global aggLeads
leadDir = 'D://Server Data//eagle805//emmetrics//forms//leads//'
archiveDir = leadDir + 'archive//'
aggLeads = {}
# Aggregate all the old individual file
aggregate_individual_lead_files()
# Reformat the old aggregate files
reformat_old_agg_data()
# Write it all out to an aggregate file
output_agg_files()
if __name__ == "__main__":
main()
这是读取的代码:
from p2p.basic import files as f
from p2p.adv import strTools as st
from p2p.basic import strTools as s
import os
import json
import copy
from datetime import datetime
import time
global leadDir
global fields
global fieldTimes
global versions
def parse_agg_file(aggFile):
global leadDir
global fields
global fieldTimes
try:
tmp = json.loads(f.file_get_contents(aggFile))
except ValueError:
print "{} failed the JSON load".format(aggFile)
return False
print "Opening: ", aggFile
for ts in tmp:
try:
tmpTs = float(ts)
except:
print "Timestamp: ", ts
continue
leadData = tmp[ts]
for field in leadData:
if field not in fields:
fields[field] = []
fields[field].append(float(ts))
def determine_form_versions():
global fieldTimes
global versions
# Determine all the fields and their start and stop times
times = []
for field in fields:
minTs = min(fields[field])
fieldTimes[field] = [minTs, max(fields[field])]
times.append(minTs)
print 'Min ts: {}'.format(minTs)
times = set(sorted(times))
print "Times: ", times
print "Fields: ", fieldTimes
versions = {}
for ts in times:
d = datetime.fromtimestamp(ts)
ver = d.strftime("%d_%B_%Y")
print "Version: ", ver
versions[ver] = []
for field in fields:
if ts in fields[field]:
versions[ver].append(field)
def main():
global leadDir
global fields
global fieldTimes
leadDir = 'D://Server Data//eagle805//emmetrics//forms//leads//'
fields = {}
fieldTimes = {}
aggFiles = f.glob(leadDir + '*.lead.agg')
for aggFile in aggFiles:
parse_agg_file(aggFile)
determine_form_versions()
print "Versions: ", versions
if __name__ == "__main__":
main()