将 pandas 数据框索引到 Elasticsearch 中,无需使用 elasticsearch-py

2024-04-20

我想将一堆大型 Pandas 数据帧(大约数百万行和 50 列)索引到 Elasticsearch 中。

在寻找如何执行此操作的示例时,大多数人会使用elasticsearch-py 的批量辅助方法 https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.bulk,传递一个实例Elasticsearch 类的 https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch它处理连接以及创建的字典列表使用 pandas 的 dataframe.to_dict(orient='records') 方法 http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_dict.html。元数据可以作为新列预先插入到数据框中,例如df['_index'] = 'my_index' etc.

但是,我有理由不使用 elasticsearch-py 库,并想与Elasticsearch 批量 API https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html直接,例如通过requests http://docs.python-requests.org/en/master/或另一个方便的 HTTP 库。除了,df.to_dict()不幸的是,在大型数据帧上速度非常慢,并且将数据帧转换为字典列表,然后由 elasticsearch-py 序列化为 JSON,当存在类似情况时,听起来像是不必要的开销dataframe.to_json() http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_json.html即使在大型数据帧上,这也相当快。

将 pandas 数据帧转换为批量 API 所需的格式的简单快捷方法是什么?我认为朝着正确方向迈出的一步是使用dataframe.to_json()如下:

import pandas as pd
df = pd.DataFrame.from_records([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}, {'a': 5, 'b': 6}])
df
   a  b
0  1  2
1  3  4
2  5  6
df.to_json(orient='records', lines=True)
'{"a":1,"b":2}\n{"a":3,"b":4}\n{"a":5,"b":6}'

现在这是一个以换行符分隔的 JSON 字符串,但是,它仍然缺少元数据。将其放入其中的执行方式是什么?

edit:为了完整起见,元数据 JSON 文档如下所示:

{"index": {"_index": "my_index", "_type": "my_type"}}

因此,最终批量 API 期望的整个 JSON 看起来像 这(在最后一行之后有一个额外的换行符):

{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":1,"b":2}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":3,"b":4}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":5,"b":6}

与此同时,我发现了多种可能性,如何以至少合理的速度做到这一点:

import json
import pandas as pd
import requests

# df is a dataframe or dataframe chunk coming from your reading logic
df['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _id
df_as_json = df.to_json(orient='records', lines=True)

final_json_string = ''
for json_document in df_as_json.split('\n'):
    jdict = json.loads(json_document)
    metadata = json.dumps({'index': {'_id': jdict['_id']}})
    jdict.pop('_id')
    final_json_string += metadata + '\n' + json.dumps(jdict) + '\n'

headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60) 

而不是使用熊猫'to_json()方法,也可以使用to_dict()如下。在我的测试中这稍微慢一些,但也慢不了多少:

dicts = df.to_dict(orient='records')
final_json_string = ''
for document in dicts:
    metadata = {"index": {"_id": document["_id"]}}
    document.pop('_id')
    final_json_string += json.dumps(metadata) + '\n' + json.dumps(document) + '\n'

在大型数据集上运行此程序时,可以通过替换 Python 的默认值来节省几分钟json图书馆与ujson https://pypi.python.org/pypi/ujson or 快速JSON https://github.com/kenrobbins/python-rapidjson通过安装它,然后import ujson as json or import rapidjson as json, 分别。

通过将步骤的顺序执行替换为并行执行,可以实现更大的加速,以便在请求等待 Elasticsearch 处理所有文档并返回响应时读取和转换不会停止。这可以通过线程、多处理、异步、任务队列等来完成,但这超出了这个问题的范围。

如果您碰巧找到一种更快地进行 json 转换的方法,请告诉我。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 pandas 数据框索引到 Elasticsearch 中,无需使用 elasticsearch-py 的相关文章

  • 缺少 python 配置

    我正在安装一个程序 需要安装 python config 唯一的问题是我目前没有 python config 而且我似乎不知道如何获取它 经过搜索后 我应该可以通过以下方式安装它 yum install python devel 然而 这样
  • 以类似字典的方式将新项目添加到某些结构化数组中

    我想扩展 numpy 中的结构化数组对象 以便我可以轻松添加新元素 例如 对于一个简单的结构化数组 gt gt gt import numpy as np gt gt gt x np ndarray 2 dtype names A B fo
  • 将新形状传递给“np.reshape”

    Within numpy ndarray reshape https docs scipy org doc numpy reference generated numpy ndarray reshape html the shape参数是一
  • 在 django 中构建动态表单

    我正在尝试根据存储在数据库中的字段及其定义动态构建一个表单 在我的数据库中 我定义了 1 个带有一些标签的复选框和 1 个带有一些标签的文本字段 如何根据数据库中的数据在我的视图中动态构建表单 Thanks 以下是我在 EuroDjango
  • 如何在cvxpy中编写多个约束?

    我想在 cvxpy 下的优化问题中添加许多约束 在 matlab 中 我可以通过添加一行 subject to 然后使用 for 循环来生成约束 我怎样才能在 cvxpy 中做同样的工作 因为 cvxpy 中没有 服从 概念 有什么建议吗
  • 使用 asyncore 读取网站

    我想异步阅读一个网站 据我所知 这是不可能的 urllib 现在我尝试使用普通套接字进行阅读 但是 HTTP 给我带来了麻烦 我遇到了各种时髦的编码 例如传输编码 分块 必须手动解析所有这些东西 我现在想编码 C 而不是 python 难道
  • python: X 服务器上的致命 IO 错误 11(资源暂时不可用):0.0

    我正在尝试读取一些图像 稍后打算对它们执行一些任务 同时将图像读入内存 我想显示动画 gif 图像 为此 我必须使用线程 现在它给出错误 python Fatal IO error 11 Resource temporarily unava
  • 使用 Pandas 滚动差异

    您好 我正在尝试使用 Pandas 滚动函数来计算下表中的滚动差异 我正在尝试生成 每月可用项目 列中的值 但没有得到任何结果 请帮忙 Item Adds Subtracts Month Monthly Available items A
  • 将图像转换为二进制流

    我的应用程序有两个方面 一方面我使用 C 来使用 Pleora 的 EBUS SDK 从相机读取帧 当第一次接收到该流时 在将缓冲区转换为图像之前 我能够一次读取 16 位流 以便对每个像素执行一些计算 即每个像素都存在一个 16 位数据块
  • 导入错误:无法导入名称 urandom

    我正在构建一个新的 Linux 环境 并在 Python 上看到以下错误 python c import random Traceback most recent call last File
  • python中remove方法的安全使用

    我从列表继承了一个 UserList 类并实现了以下方法来删除标记为已删除的条目 def purge deleted self for element in list iter self if ele mark deleted lt 1 s
  • 设置区域设置和字符串模块

    这个简单的脚本 from locale import LC ALL setlocale print setlocale LC ALL from string import letters print letters 给我这个输出 tr TR
  • Python 对象属性 - 访问方法

    假设我有一个具有某些属性的类 在 Pythonic OOP 中 如何访问这些属性是最好的 就像obj attr 或者也许编写 get 访问器 此类事物可接受的命名风格是什么 Edit 您能否详细说明使用单下划线或双前导下划线命名属性的最佳实
  • 为什么在 python 控制台中对 SparkSession.builder.getOrCreate() 的调用被视为命令行 Spark-submit?

    代替python console我正在尝试创建一个Spark Session 我没有使用pyspark以隔离依赖关系 为什么是spark submit命令行提示并生成错误 NOTE SPARK PREPEND CLASSES is set
  • 调试 python Web 服务

    我正在使用找到的说明here http www diveintopython net http web services user agent html 尝试检查发送到我的网络服务器的 HTTP 命令 但是 我没有看到按照教程中的建议在控制
  • 带回溯的 Dijkstra 算法?

    In a 相关主题 https stackoverflow com questions 28333756 finding most efficient path between two nodes in an interval graph
  • Tensorflow:提要字典错误:您必须为占位符张量提供值

    我有一个错误 我无法找出原因 这是代码 with tf Graph as default global step tf Variable 0 trainable False images tf placeholder tf float32
  • 是否可以使用 Python 中的密码安全地加密然后解密数据?

    我在 python 程序中有一些数据 我想在使用密码写入文件之前对其进行加密 然后在使用它之前读取并解密它 我正在寻找一些可以根据密码进行加密和解密的安全对称算法 这个问题 https stackoverflow com questions
  • elasticsearch查询字符串分析器针对不同字段使用不同的分析器

    当对具有不同分析器 stem ngram等 的多个字段执行查询时 elasticsearch是否对每个字段的查询字符串执行特定的分析 是的 除非您在查询中指定分析器 这请求参数 http www elasticsearch org guid
  • 用于获取有关 SVN 存储库信息的 Python 库?

    我正在寻找一个可以从 SVN 存储库中提取 至少 以下信息的库 not工作副本 修订号及其作者和提交消息 每个修订版中的更改 添加 删除 修改文件 有Python库可以做到这一点吗 对于作者和提交消息 我可以解析 db revprops 0

随机推荐