在 Elasticsearch 中插入多个文档 - 批量文档格式化程序

2024-03-07

太长了;如何批量格式化 JSON 文件以摄取到 Elasticsearch?

我正在尝试将一些 NOAA 数据提取到 Elasticsearch 中并一直在利用NOAA Python SDK https://github.com/paulokuong/noaa.

我编写了以下 Python 脚本来加载数据并将其存储为 JSON 格式。

from noaa_sdk import noaa
import json

n = noaa.NOAA()
alerts = n.alerts()
f = open('nhc_alerts.json', 'w')
json.dump(alerts, f)
f.write('\n')

JSON 输出:

{"@context": ["https://raw.githubusercontent.com/geojson/geojson-ld/master/contexts/geojson-base.jsonld", {"wx": "https://api.weather.gov/ontology#", "@vocab": "https://api.weather.gov/ontology#"}], "type": "FeatureCollection", "features": [{"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-KEEPALIVE-5246", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-KEEPALIVE-5246", "@type": "wx:Alert", "id": "NWS-IDP-PROD-KEEPALIVE-5246", "areaDesc": "Montgomery", "geocode": {"UGC": ["MDC031"], "SAME": ["024031"]}, "affectedZones": ["https://api.weather.gov/zones/county/MDC031"], "references": [], "sent": "2020-04-25T19:21:03+00:00", "effective": "2020-04-25T19:21:03+00:00", "onset": null, "expires": "2020-04-25T19:31:03+00:00", "ends": null, "status": "Test", "messageType": "Alert", "category": "Met", "severity": "Unknown", "certainty": "Unknown", "urgency": "Unknown", "event": "Test Message", "sender": "[email protected] /cdn-cgi/l/email-protection", "senderName": "NWS", "headline": null, "description": "Monitoring message only. Please disregard.", "instruction": "Monitoring message only. Please disregard.", "response": "None", "parameters": {"PIL": ["NWSKEPWBC"], "BLOCKCHANNEL": ["CMAS", "EAS", "NWEM"]}}}, {"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179499-3536427", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179499-3536427", "@type": "wx:Alert", "id": "NWS-IDP-PROD-4179499-3536427", "areaDesc": "La Salle; Livingston", "geocode": {"UGC": ["ILZ019", "ILZ032"], "SAME": ["017099", "017105"]}, "affectedZones": ["https://api.weather.gov/zones/forecast/ILZ019", "https://api.weather.gov/zones/forecast/ILZ032"], "references": [{"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179245-3536278", "identifier": "NWS-IDP-PROD-4179245-3536278", "sender": "[email protected] /cdn-cgi/l/email-protection", "sent": "2020-04-25T10:02:00-05:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4178935-3536074", "identifier": "NWS-IDP-PROD-4178935-3536074", "sender": "[email protected] /cdn-cgi/l/email-protection", "sent": "2020-04-25T03:09:00-05:00"}], "sent": "2020-04-25T14:21:00-05:00", "effective": "2020-04-25T14:21:00-05:00", "onset": "2020-04-25T14:21:00-05:00", "expires": "2020-04-25T22:30:00-05:00", "ends": "2020-04-26T01:00:00-05:00", "status": "Actual", "messageType": "Update", "category": "Met", "severity": "Severe", "certainty": "Possible", "urgency": "Future", "event": "Flood Watch", "sender": "[email protected] /cdn-cgi/l/email-protection", "senderName": "NWS Chicago IL", "headline": "Flood Watch issued April 25 at 2:21PM CDT until April 26 at 1:00AM CDT by NWS Chicago IL", "description": "The Flood Watch is now in effect for\n\n* Livingston and La Salle counties in north central Illinois\n\n* Until 1 AM CDT Sunday\n\n* WHAT...Steady rain. One to two inches of rain has already\nfallen. Additional rainfall amounts of one inch or locally more\nare possible which may lead to total rainfall amounts in excess\nof three inches.\n\n* IMPACTS...Rises in rivers and small streams will occur with\nflooding possible. This especially includes the Vermilion River\nand its tributary streams, and the Illinois River. Roadways,\nviaducts, ditches, agricultural land, and other poor drainage\nareas may become flooded.", "instruction": "A Flood Watch means there is a potential for flooding based on\ncurrent forecasts.\n\nYou should monitor later forecasts and be alert for possible\nFlood Warnings. Those living in areas prone to flooding should be\nprepared to take action should flooding develop.", "response": "Prepare", "parameters": {"NWSheadline": ["FLOOD WATCH NOW IN EFFECT UNTIL 1 AM CDT SUNDAY"], "VTEC": ["/O.EXT.KLOT.FA.A.0002.000000T0000Z-200426T0600Z/"], "EAS-ORG": ["WXR"], "PIL": ["LOTFFALOT"], "BLOCKCHANNEL": ["CMAS", "EAS", "NWEM"], "eventEndingTime": ["2020-04-26T01:00:00-05:00"]}}}, {"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179497-3536425", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179497-3536425", "@type": "wx:Alert", "id": "NWS-IDP-PROD-4179497-3536425", "areaDesc": "San Luis Obispo County Central Coast; Santa Barbara County Central Coast; Santa Ynez Valley", "geocode": {"UGC": ["CAZ034", "CAZ035", "CAZ036"], "SAME": ["006079", "006083"]}, "affectedZones": ["https://api.weather.gov/zones/forecast/CAZ034", "https://api.weather.gov/zones/forecast/CAZ035", "https://api.weather.gov/zones/forecast/CAZ036"], "references": [{"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4177692-3535278", "identifier": "NWS-IDP-PROD-4177692-3535278", "sender": "[email protected] /cdn-cgi/l/email-protection", "sent": "2020-04-24T08:54:00-07:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4178774-3535999", "identifier": "NWS-IDP-PROD-4178774-3535999", "sender": "[email protected] /cdn-cgi/l/email-protection", "sent": "2020-04-24T21:37:00-07:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179040-3536147", "identifier": "NWS-IDP-PROD-4179040-3536147", "sender": "[email protected] /cdn-cgi/l/email-protection", "sent": 

这个脚本解决了我遇到的一些格式问题,我的下一个障碍是尝试对其进行格式化,以便我可以利用elasticsearch中的批量导入功能。我偶然发现一个答案 https://stackoverflow.com/a/33981143这在一定程度上有效,我遇到的问题是它将插入适当的索引字符串,但它是在每个字符之后执行的。

批量转换脚本:

import json


JSON_FILE_IN = "nhc_alerts.json"
JSON_FILE_OUT = "nhc_bulk.json"


out = open(JSON_FILE_OUT, 'w')
with open(JSON_FILE_IN, 'r') as json_in:
    docs = json.dumps(json_in.read())
    for doc in docs:
        out.write('%s\n' % json.dumps({'index': {}}));
        out.write('%s\n' % json.dumps(doc, indent=0).replace('\n', ''))

批量脚本的输出:

{"index": {}}
"\""
{"index": {}}
"{"
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
"@"
{"index": {}}
"c"
{"index": {}}
"o"
{"index": {}}
"n"
{"index": {}}
"t"
{"index": {}}
"e"
{"index": {}}
"x"
{"index": {}}
"t"
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
":"
{"index": {}}
" "
{"index": {}}
"["
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
"h"
{"index": {}}
"t"
{"index": {}}
"t"
{"index": {}}
"p"
{"index": {}}
"s"
{"index": {}}
":"
{"index": {}}
"/"
{"index": {}}
"/"
{"index": {}}
"r"
{"index": {}}
"a"
{"index": {}}
"w"
{"index": {}}
"."
{"index": {}}
"g"
{"index": {}}
"i"
{"index": {}}
"t"
{"index": {}}
"h"
{"index": {}}
"u"
{"index": {}}
"b"
{"index": {}}
"u"
{"index": {}}
"s"
{"index": {}}
"e"
{"index": {}}
"r"
{"index": {}}
"c"
{"index": {}}
"o"
{"index": {}}
"n"
{"index": {}}

理想情况下,我想将这两个脚本合并为一个,但此时,如果可以完成工作,我将运行两个单独的脚本。


您可以利用bulk官方python包的方法:

import json

from noaa_sdk import noaa
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk


noaa_client = noaa.NOAA()
alerts = noaa_client.alerts()['features']

es = Elasticsearch()


def save_alerts():
    with open('nhc_alerts.json', 'w') as f:
        f.write(json.dumps(alerts))


def bulk_sync():
    actions = [
        {
            "_index": "my_noaa_index",
            "_source": alert
        } for alert in alerts
    ]

    bulk(es, actions)


save_alerts()
bulk_sync()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Elasticsearch 中插入多个文档 - 批量文档格式化程序 的相关文章

  • ImportError:尝试在没有已知父包的情况下进行相对导入[重复]

    这个问题在这里已经有答案了 我正在学习使用 python 编程 并且在从包中的模块导入时遇到问题 我正在使用 Visual Studio 代码和 Python 3 8 2 64 位 我的项目目录 https i stack imgur co
  • 在 Django 中处理 subprocess.call()

    我正在开发的应用程序的简单想法是用户给出 Linux 命令 Linux 命令的结果将显示在网络浏览器中 这是我的观点 py from django shortcuts import render to response from djang
  • 使用 QtDesigner 的 pyQt 信号/槽

    我正在尝试编写一个与 QGraphicsView 交互的程序 我想在 QGraphicsView 中发生事件时收集鼠标和键盘事件 例如 如果用户单击 QGraphicsView 小部件 我将获得鼠标位置 类似的东西 我可以很容易地对其进行硬
  • 如何从 __subclasses__ 中删除类?

    当从类继承时 子类可以通过父类访问 subclasses method class BaseClass pass class SubClass BaseClass pass BaseClass subclasses
  • AMD plaidml 与 CPU Tensorflow - 意外结果

    我目前正在运行一个简单的脚本来训练mnist数据集 通过 Tensorflow 通过我的 CPU 运行训练给了我49us sample和使用以下代码的 3e 纪元 CPU import tensorflow as tf mnist tf k
  • 使用 Pyodbc + UnixODBC + FreeTDS 设置连接设置

    我使用 Pyodbc UnixODBC 和 FreeTDS 进行了设置 但在其中的某个地方设置了一些选项 但我不知道在哪里 根据 SQL Server Management Studio 我的程序在打开连接时发送一些设置 set quote
  • Pytorch CUDA 错误:没有内核映像可用于在带有 cuda 11.1 的 RTX 3090 设备上执行

    如果我运行以下命令 import torch import sys print A sys version print B torch version print C torch cuda is available print D torc
  • 读取Excel文件时的Pandas数据框和字符编码

    我正在阅读一个包含多个数值和分类数据的 Excel 文件 name string 列包含外语字符 当我尝试查看 name string 列的内容时 我得到了我想要的结果 但外来字符 在 Excel 电子表格中正确显示 以错误的编码显示 这是
  • pip 安装与本地包具有相同命名空间的包

    我使用的是 Python 3 6 5 通过 miniconda 安装 我的问题是由于我正在安装一个与本地包具有相同命名空间的包 pip 安装此包后 我无法再从本地包导入 我收到一个ModuleNotFoundError错误 如果可能的话 命
  • 如何更新 certifi 的根证书?

    我正在使用 certifi python 模块来验证 ssl 连接 我查看了 certifi python2 7 site packages certifi cacert pem 中包含的根证书 其中一些证书已过期 我如何更新这些证书 我尝
  • python中根据变量类型处理数据子集

    我将以下数据存储在 csv df sample csv 中 我将列名放在名为 cols list 的列表中 df 数据 样本 df data sample pd DataFrame new video BASE SHIVER PREFER
  • 使用 JsonWriter 时,WriteStartConstructor 的用途是什么?

    标题说明了一切 我看到它 及其相应的结尾 吐出以下内容 new Foo 但我不明白什么new实际上是在反序列化时执行的 文档只是说它编写了一个 Json 构造函数 但没有说 Json 构造函数是什么is 此方法是作为增强功能的一部分引入的
  • 将数据导入 Django 的好方法

    我想定期将数据导入 Django 项目 我需要告诉我的数据提供者我想要以什么格式接收数据 我应该以 Json XML CSV 格式请求吗 在 Django 中通常如何处理这个问题 Django 有一个用于导入数据的完整框架 称为 Fixtu
  • 如何在关心 NaN 值的同时获取数据框中值的百分比变化?

    我有以下数据框 Date A 2015 01 01 10 2015 01 02 14 2015 01 05 NaN 2015 01 06 NaN 2015 01 07 5 2015 01 10 1 2015 01 11 NaN 2015 0
  • 在 python matplotlib 中格式化损坏的 y 轴

    我正在 matplotlib 中处理一个 相当复杂的 条形图 它包含来自多个源的摘要数据 每个源都沿 x 轴标记 y 轴上有一系列结果 许多结果都是异常值 我尝试使用断开的 y 轴来显示这些结果 而不会使用以下组合来扭曲整个图表这个方法 h
  • 带有多表查询的 SQL Join 版本的 Djangoviews.py

    需要一些有关 Django 版本的 SQL 多表查询的帮助 该查询使用 3 个表来检索餐厅名称 地址Restaurants table和美食类型来自Cuisinetypes table 所有这些都基于通过 URL 传递的菜品名称 菜品 ID
  • 在matplotlib中绘制曲线连接点

    所以我试图绘制曲线来连接点 这是我正在使用的代码 def hanging line point1 point2 a point2 1 point1 1 np cosh point2 0 np cosh point1 0 b point1 1
  • Python:计算非整数的阶乘

    我想知道是否有一种快速的 Pythonic 的方法来计算非整数的阶乘 例如 3 4 当然 内置的factorial 函数在Math模块可用 但它仅适用于积分 我不关心这里的负数 你想用math gamma x http docs pytho
  • 优化Gson反序列化

    优化反序列化的最佳方法是什么 我目前正在使用标准 Gson toJson 和 Gson fromJson 方法来序列化和反序列化一些复杂对象 我希望尽可能减少反序列化时间 如果重要的话 我的最复杂的对象包含 43 个变量 如果你想使用 Gs
  • Python TDD 目录结构

    Python 中是否有用于 TDD 的特定目录结构 教程讨论测试的内容 但不讨论测试的位置 通过研究 Python Koans 怀疑它是这样的 project main program py This has main method sta

随机推荐