Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

2023-11-12


ES Python客户端介绍

官方提供了两个客户端elasticsearch、elasticsearch-dsl

pip install elasticsearch
pip install elasticsearch-dsl

第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。

封装代码

  1. 封装后依然暴露了es,方便有特殊情况下使用
  2. index一般很少改动,就直接放到对象中了,可以使用set_index修改
  3. 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据

代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常

pip install elasticsearch==7.17.12

es.py

import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dict


class ESClient:

    def __init__(self, host="127.0.0.1",index="", http_auth = None):
        self.index = index
        if http_auth is None:
            self.es = Elasticsearch(hosts=host)
        else:
            self.es = Elasticsearch(hosts=host, http_auth=http_auth)
        print("success to connect " + host)

    def close(self):
        self.es.close()

    # 设置索引
    def set_index(self,index:str):
        self.index = index

    # 创建索引
    def create_index(self, index_name: str, mappings=None):
        res = self.es.indices.create(index=index_name, mappings=mappings)
        return res

    # 删除索引
    def delete_index(self, index_name: str):
        res = self.es.indices.delete(index=index_name)
        return res

    # 获取索引
    def get_index(self, index_name: str):
        res = self.es.indices.get(index=index_name)
        return res

    # 创建文档(单个)
    def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):
        res = self.es.create(index=self.index, body=body, id=_id)
        return res

    # 创建文档(批量)
    def create_doc_bulk(self, docs: List[Dict]):
        actions = []
        for doc in docs:
            action = {
                "_index": self.index,
                "_op_type": "create",
                "_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))
            }
            for k,v in doc.items():
                action[k] = v
            actions.append(action)
        res = bulk(client=self.es, actions=actions)
        return res

    # 删除文档
    def delete_doc(self, doc_id):
        res = self.es.delete(index=self.index, id=doc_id)
        return res

    # 更新文档
    def update_doc(self, doc_id, doc:Dict):
        body = {
            "doc" : doc
        }
        res = self.es.update(index=self.index, id=doc_id, body=body)
        return res

    # 分页获取超过100000的文档
    def get_doc_scroll(self,query:Dict):
        res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")
        data_list = []
        hits = res.get("hits")
        scroll_id = res.get('_scroll_id')
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'),Dict):
            total_value= hits.get('total').get('value')
        else:
            total_value = hits.get('total')

        if total_value>0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return scroll_id,data_list

    # 通过scroll_id分页获取后序文档
    def get_doc_by_scroll_id(self,scroll_id):
        page = self.es.scroll(scroll_id=scroll_id,scroll="5m")
        data_list = []
        scroll_id = page.get('_scroll_id')
        for data in page.get('hits').get('hits'):
            data_list.append(data)
        return scroll_id,data_list

    # 清空scroll_id,防止服务端不够用
    def clear_scroll(self,scroll_id):
        self.es.clear_scroll(scroll_id)

    # 获取索引的hits内容(一般用于获取文档id、总数)
    def get_doc_all(self):
        res = self.es.search(index=self.index)
        return res['hits']

    # 获取一个文档
    def get_doc_by_id(self, id_):
        res = self.es.get(index=self.index, id=id_)
        return res["_source"]

    # 获取所有文档的_source内容(小于100000)
    def get_doc(self,query:Dict,size:int=100000):
        query['size'] = size
        res = self.es.search(index=self.index,body=query)
        data_list = []
        hits = res.get("hits")
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'), Dict):
            total_value = hits.get('total').get('value')
        else:
            total_value = hits.get('total')

        if total_value > 0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return data_list

    # 聚合查询(分组条件名为group_by,返回buckets)
    def get_doc_agg(self, query):
        res = self.es.search(index=self.index, body=query)
        return res['aggregations']['group_by'].get('buckets')

    # 统计查询(统计条件为stats_by,返回最值、平均值等)
    def get_doc_stats(self,query):
        res = self.es.search(index=self.index,body=query)
        return res['aggregations']["stats_by"]

测试代码

import unittest
from es import ESClient

cli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():
    res = cli.create_index(index_name="test")
    print(res)

def test_delete_index():
    res = cli.delete_index(index_name="test")
    print(res)

def test_get_index():
    res = cli.get_index(index_name="test")
    print(res)

def test_set_index():
    cli.set_index(index="test")

def test_create_doc():
    body = {
        "name": "lady_killer9",
        "age": 19
    }
    res = cli.create_doc(body=body)
    print(res)

def test_create_doc_bulk():
    from copy import deepcopy
    body = {
        "name": "lady_killer9"
    }
    users = []
    for i in range(100001):
        tmp = deepcopy(body)
        tmp["age"] = i
        users.append(tmp)
    res = cli.create_doc_bulk(docs=users)
    print(res)


def test_get_doc_all():
    res = cli.get_doc_all()
    print(res)


def test_get_doc_by_id():
    res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")
    print(res)

def test_get_doc():
    query = {
        "query": {
            "match_all": {

            }
        }
    }
    res = cli.get_doc(query=query,size=20)
    print(res)

def test_update_doc():
    body={
        "name": "lady_killer_after_update"
    }
    res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)
    print(res)


def test_delete_doc():
    res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")
    print(res)

def test_get_doc_agg():
    query = {
            "aggs": {
                "group_by": {
                    "terms": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_agg(query=query)
    print(res)

def test_get_doc_stats():
    query = {
            "aggs": {
                "stats_by": {
                    "stats": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_stats(query=query)
    print(res)

def test_get_doc_scroll():
    query = {
        "query": {
            "match_all": {}
        }
    }
    scroll_id,data_list = cli.get_doc_scroll(query=query)
    res = []
    while data_list:
        res.extend(data_list)
        scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)
    print(len(res))


if __name__ == '__main__':
    # test_delete_index()
    test_create_index()
    test_get_index()
    # test_set_index()
    # test_create_doc()
    # test_create_doc_bulk()
    # test_get_doc_all()
    # test_update_doc()
    # test_get_doc_by_id()
    # test_get_doc()
    # test_delete_doc()
    # test_get_doc_agg()
    # test_get_doc_stats()
    # test_get_doc_scroll()
    cli.close()

测试截图
在这里插入图片描述
更多python相关内容:【python总结】python学习框架梳理

本人b站账号:一路狂飚的蜗牛

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。

参考

github-elasticsearch
github-elasticsearch-dsl

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据) 的相关文章

  • Python 函数句柄 ala Matlab

    在 MATLAB 中可以创建function handles http www mathworks co uk help techdoc ref function handle html与类似的东西 myfun arglist body 这
  • UnicodeDecodeError:“utf-8”编解码器无法解码位置 14 中的字节 0xb9:起始字节无效

    我正在使用 Django REST 进行文件上传测试 Python3 6 2Django1 11djangorest框架 3 6 4Excel OSX 15 38 170902 操作系统 10 12 6 过去使用普通照片文件可以成功完成此操
  • 从 Django 基于类的视图的 form_valid 方法调用特殊(非 HTTP)URL

    如果你这样做的话 有一个 HTML 技巧 a href New SMS Message a 点击新短信打开手机的本机短信应用程序并预 先填写To包含所提供号码的字段 在本例中为 1 408 555 1212 以及body与提供的消息 Hel
  • 如何使用 python http.server 运行 CGI“hello world”

    我使用的是 Windows 7 和 Python 3 4 3 我想在浏览器中运行这个简单的 helloworld py 文件 print Content Type text html print print print print h2 H
  • Colab 的使用限制持续多久?

    当我对同一帐户的两个笔记本同时使用两个 GPU 约半小时后 Colab 已 12 小时未运行 此消息不断弹出 由于 Colab 中的使用限制 您当前无法连接到 GPU 自从我上次使用 colab 以来已经过去了大约两个小时 但该消息仍然弹出
  • Django 说“id 可能不为 NULL”,但为什么会这样呢?

    我今天要疯了 我只是尝试插入一条新记录 但它返回了 post blogpost id 可能不为 NULL 错误 这是我的模型 class BlogPost models Model title models CharField max le
  • 在 Python 中使用类作为命名空间是个好主意吗

    我正在将一堆相关的东西放入一个类中 主要目的是将它们组织到命名空间中 class Direction north 0 east 1 south 2 west 3 staticmethod def turn right d return tu
  • 如何使用 ipywidgets 创建动态依赖下拉菜单?

    我创建了一个带有两个下拉菜单的表单 我的目标是使一个下拉列表依赖于另一个下拉列表 这张图说明了我的目标和目前的情况 https i stack imgur com o9k5G png 下面的示例代码可以在 Google Colab 或 Ju
  • 如何解决CDK CLI版本不匹配的问题

    我收到以下错误 此 CDK CLI 与您的应用程序使用的 CDK 库不兼容 请将CLI升级到最新版本 云程序集架构版本不匹配 支持的最大架构版本为 8 0 0 但发现为 9 0 0 发出后cdk diff命令 我确实跑了npm instal
  • 如何在每次运行 python 程序时添加新列

    我希望我的表的第一列作为卷号 第二列作为名称 每当我运行 python 程序时 我想在表中添加一列日期 在这个新列中 我想填充从 user list 获得的列表将包含值 P A P P 等 如何处理 我尝试首先通过 alter 命令添加一列
  • telethon 库:如何通过电话号码添加用户

    我正在研究 Telegram 的 Telethon 库 它可以使用 Telegram API 充当 Telegram 客户端 重要提示 这是电报客户端 API https core telegram org telegram api 而不是
  • 使用 Python 导入包含文本和数字数据的文件

    I have a txt file which has text data and numerical data The first two rows of the file have essential information in te
  • 安塞布尔 + 10.11.6

    我在 非常 干净地安装 10 11 6 时遇到了 Ansible 的奇怪问题 我已经安装了brew zsh oh my zsh Lil snitch 和1password 实际上没有安装其他任何东西 我安装了ansible brew ins
  • 列表中的“u”是什么意思?

    这是我第一次遇到这种情况 刚刚打印了一个列表 每个元素似乎都有一个u在它前面 即 u hello u hi u hey 它是什么意思 为什么列表的每个元素前面都会有这个 由于我不知道这种情况有多常见 如果您想了解我是如何遇到它的 我会很乐意
  • datetime strftime 不输出正确的时间戳

    下列 gt gt gt from dateutil parser import parse gt gt gt parse 2013 07 02 00 00 00 0000 datetime datetime 2013 7 2 0 0 tzi
  • python:xml.etree.ElementTree,删除“命名空间”

    我喜欢 ElementTree 解析 xml 的方式 特别是 Xpath 功能 我有一个带有嵌套标签的应用程序的 xml 输出 我想按名称访问此标签而不指定名称空间 这可能吗 例如 root findall molpro job 代替 ro
  • Matplotlib:检查空图

    我有一个循环加载并绘制一些数据 如下所示 import os import numpy as np import matplotlib pyplot as plt for filename in filenames plt figure i
  • 具有行业级约束的 SciPy 投资组合优化

    尝试在这里优化投资组合权重分配 通过限制风险来最大化我的回报函数 我可以毫无问题地通过简单的约束 所有权重之和等于 1 找到产生我的回报函数的优化权重 并做出另一个约束 即我的总风险低于目标风险 我的问题是 如何为每个组添加行业权重界限 我
  • Elasticsearch 如何使用通配符进行 OR 查询

    我很难尝试使用 elasticsearch 构建查询 我想查询类似的内容 WHERE field 1 is match string OR field 2 is wildcard match string OR field 3 is fuz
  • gnuplot:第 1 行:无效命令

    stackoverflow 上可爱的人们大家好 我正在尝试使用 gnuplot 绘制数据 我首先阅读表格并提取我想要的数据 我将此数据写入 dat 文件 截至目前 我只是尝试通过命令行绘制它 但会添加必要的代码以在 python 脚本工作后

随机推荐

  • 新版来啦|ShardingSphere 5.4.0 重磅发布

    Apache ShardingSphere 本周迎来了 5 4 0 版本的发布 该版本历时两个月 共合并了来自全球的团队和个人累计 1271 个 PR 新版本在功能 性能 测试 文档 示例等方面都进行了大量的优化 本次更新包含了不少能够提升
  • CUDA向量加法示例

    CUDA向量相加示例 贺志国 下面以向量加法为例 介绍CUDA实现代码 以下是具体代码vector add cu 我的版本除CUDA相关函数外 其他代码均以C 的方式实现 并且尽可能调用STL库的数据结构和算法 注意 CUDA核函数内部的日
  • delphi 获取有输入焦点的活动窗口信息

    var wintext array 0 MAXBYTE of Char WdChar array of Char focuswhd THandle processId Pointer threadid Cardinal GUITHREADI
  • c语言入门---调试技巧

    目录 什么是bug 调试是什么 调试的基本步骤是什么 调试是什么 调试的基本步骤是什么 Debug和release的区别 windows的调试介绍 调试的准备 调试的操作 1 F5 2 F9 3 F10 4 F11 调试的时候查看程序当前的
  • kali linux基本命令

    文章目录 shell 什么是shell 查看shell shell与终端的区别 VIM编辑器 Linux常用命令 shell 什么是shell 在计算机科学中 shell俗称外壳 能够接收用户的命令并翻译给操作系统执行 是用户与操作系统 内
  • CryptoPP的LC_RNG算法的使用

    随机数发生器是密码学的一个重要原语 密码学库CryptoPP中提供了一些随机数发生器算法 如下图所示 今天 介绍一些其中LC RNG算法的使用 该库中的LC RNG算法就是著名的线性同余发生器算法 该算法由于执行效率高而被广泛使用 C语言库
  • @Conditional 初学

    点击 Conditional Target ElementType TYPE ElementType METHOD Retention RetentionPolicy RUNTIME Documented public interface
  • win10安装Tensorflow1.14.0 CUP版

    安装cpu版本 python3 6 12 tensorflow1 14 0 numpy1 16 0 python tensorflow 和 numpy之间版本要相对应 这很重要 不然可能会装不上 这是尝试了4天后的可行搭配 目 录 预备备
  • 代码题-判断循环依赖

    interface Module name string imports Module const moduleC Module name moduleC const moduleB Module name moduleB imports
  • 【ORACLE性能分析和优化思路学习笔记02:什么时候需要对性能进行干预】

    背景 近期负责的一些单位 一些数据库节点总是出现宕机或者自动重启 之前简单接触过oracle RAC数据库的一些管理 但是对性能分析和优化研究不深 这次实在是没办法了 DBA协调不动 只能自己出马了 好在自己有一定的基础 上手很快 现在对学
  • pytorch常见问题

    1 pytorch 的 dataloader 在读取数据时 设置了较大的 batchsize 和 num workers 然后训练一段时间报错 RuntimeError Too many open files Communication w
  • LeetCode 414. 第三大的数-C语言

    LeetCode 414 第三大的数 C语言 题目描述 解题思路 1 设置数组max 3 用于保存前三大的值 初始化为LONG MIN意为最小值 2 遍历数组对前三大的值进行更新 3 判断max 2 是否存在 若不存在直接返回max 0 代
  • 笔记本电脑切换不到投影仪 问题 解决方法

    我的笔记本是ati显卡的 在某次切换到投影仪的时候 出现问题 无法正确应用您所选择的以下设置 请更改设置并重试 外部监视器或投影仪 电视机 分辨率 颜色质量 无法正确应用您所选择的以下设置 请更改设置并重试 显示配置 解决思路 公司还有一个
  • Neo-reGeorg正向代理配合kali使用

    Neo reGeorg正向代理配合kali使用 一 Neo reGeorg介绍 在了解Neo reGeorg之前 首先应该知道大名鼎鼎的项目 https github com sensepost reGeorg 其用于开启目标服务器到本地的
  • 数据存储的随想

    文章目录 数据分布的演变 数据的使用 总结 数据分布的演变 数据分布就是一个关于数据存放在哪里的问题 数据存储的地方不是固定的 随着应用规模的扩大 为了治理的方便 会适时地调整 其中就会包括数据存储的调整 数据与应用部署在同一台设备 在早期
  • ACCESS的VBA中如何打开文件对话框并获取选中文件的路径

    在 ACCESS 的 VBA 中 可以使用 FileDialog 对象的 Show 方法来打开文件对话框 并使用 SelectedItems 属性来获取选中文件的路径 例如 Dim fd As FileDialog Set fd Appli
  • C/C++ 报错提示 “表达式必须包含类类型” 与 “不可访问”

    今天给大家分享两个常见的错误 定义对象 调用函数 时提示 表达式必须包含类类型 的报错 对象调用函数时提示 不可访问 的报错 一 表达式必须包含类类型 这种报错会出现在两种情况 类没有数据成员时 使用类定义对象时带括号了 定义类时以指针方式
  • MySQL重装——Database initialization failed错误处理

    卸载MySQL 笔者由于跟着网上的教程将MySQL安装到了C盘 忘记了可以走更改路径这条路 在卸载MySQL的路上一去不复返 试过网上诸多重装方案 大体均为以下步骤 控制面板卸载MySQL 删除注册表 删除ProgramData Appli
  • 导出文件:window.open()

    导出文件 window open globalBus emit loading const Download http window location host DI activity orderExcel actId this actId
  • Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

    目录 ES Python客户端介绍 封装代码 测试代码 参考 ES Python客户端介绍 官方提供了两个客户端elasticsearch elasticsearch dsl pip install elasticsearch pip in