elk笔记23--定期清理索引

2023-10-26

1 介绍

在生产环境中,如果日志量过大,就会导致集群持续产生很多索引,占用很多存储空间。因此需要定期清理索引,确保磁盘处于一个比较低的水位。本文使用mysql 存储索引基础信息,基于mysql 中所以保存的时间,定期清理超过指定时间 rollover 生成的索引。

2 方案&代码

2.1 方案介绍

每次新建索引的时候将索引基础信息存放到mysql中,具体包括索引名称、别名、存放时间和ilp信息。
每次清理索引的时候从db中查看基础索引信息,然后获取集群中所有索引的信息,当索引rollover的生成的数量大于2 且生成时间大于存放时间的时候,就删除超时的索引。
每次清理的时候还需要统计出在数据库中没有记录和不符合rollover规则的索引,以便于后续整改或者过滤。

本文 rollover 方式参考 elk笔记8–index -> 1. index 创建的几种方式 -> 1.3 创建带有rollover功能的索引

2.2 代码

sql db 信息:

use bigdata_sre_log;
CREATE TABLE `es_indices` (
  `number` int(11) NOT NULL AUTO_INCREMENT,
  `cluster` char(50) NOT NULL,
  `index_name` char(50) NOT NULL,
  `save_days` int(4) DEFAULT NULL,
  `datetime` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `ilp_name` char(50) DEFAULT '',
  `alias` char(50) DEFAULT NULL,
  PRIMARY KEY (`number`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;

源码信息:
app.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-

import datetime
import json

import elasticsearch
import curator

from config import *
from db_helper import MysqlHelper


def get_indices_from_db():
    sql = MysqlHelper(SQL_HOST, SQL_USER, SQL_PWD, SQL_DBNAME)
    sql.show_version()
    tag, ret = sql.get_indices_list()
    indices = {}
    if tag:
        for index in ret:
            indices[index['index_name']] = {'alias': index['alias'], 'save_days': index['save_days']}
    return indices


def indices_ready_to_delete(rollover_index_list):
    """
    从rollover索引中找到可以删除的索引,
    :param rollover_index_list: [{index_name, ymd_number}, {}]
    :return: { 'index1': [], 'index2': []}
    """
    index_info = get_indices_from_db()
    index_delete = {}
    # print(f'indices_ready_to_delete, {rollover_index_list}')
    for index in rollover_index_list:
        if index['index_name'] not in index_delete.keys():
            index_delete[index['index_name']] = [index['ymd_number']]
        else:
            index_delete[index['index_name']].append(index['ymd_number'])
    for index in index_delete.keys():
        index_list = index_delete[index]
        index_list.sort()
        ymd_cur = datetime.datetime.now()
        ymd_cur_offset = datetime.timedelta(days=-index_info[index]['save_days'])
        ymd_old = ymd_cur + ymd_cur_offset
        ymd_old_str = ymd_old.strftime("%Y%m%d")
        if len(index_list) <= 2:
            index_delete[index] = []
        else:
            index_list_can_delete = []
            for i in range(0, len(index_list)-2):  # at least save the last 2 indices
                if int(ymd_old_str) > int(index_list[i][:4] + index_list[i][5:7] + index_list[i][8:10]):
                    # print(index, int(ymd_old_str), int(index_list[i][:4] + index_list[i][5:7] + index_list[i][8:10]))
                    index_list_can_delete.append(index + '-' + index_list[i])
            index_delete[index] = index_list_can_delete
    return index_delete


def judge_rollover_type(index_name):
    """
    判断索引是否为rollover类型, 判断格式 {index_name}-{yyyy}.{mm}.{dd}-{number} , number 为6位数
    :param index_name:
    :return: {'index_name': 'xxx', 'ymd_number': 'xxx'}
    """
    if len(index_name) > 18:
        ymd = index_name[len(index_name) - 17: len(index_name) - 7]
        number = index_name[len(index_name) - 6:]
        # print(index_name[:len(index_name) - 18], ymd[:4], ymd[5:7], ymd[8:10], number)
        ymd_list = [ymd[:4], ymd[5:7], ymd[8:10]]
        for char in ymd_list:
            if char < '0' or char > '9':
                return False, {'index_name': index_name, 'ymd_number': None}
    else:
        return False, {'index_name': index_name, 'ymd_number': None}
    return True, {'index_name': index_name[:len(index_name) - 18], 'ymd_number': index_name[len(index_name) - 17:]}


class ElasticsearchClean(object):
    def __init__(self):
        self.client = elasticsearch.Elasticsearch([ES_HOST], http_auth=(ES_USER, ES_PWD), timeout=30, max_retries=5)
        self.indices = curator.get_indices(self.client)
        self.dry_run = True

    def print_index(self):
        print(type(self.indices), len(self.indices))
        print(self.indices[0], type(self.indices[0]))

    def classify_index(self):
        """
        1) 判断rollover 类型
        2) rollover 类型index 放在 rollover 中, 非 rollover 类型的index 放在 no_rollover 中
        :return: {'rollover': [], 'rollover_index_name': [], 'no_rollover': []}
        """
        ret_index = {'rollover': [], 'rollover_index_name': [], 'no_rollover': []}
        for index in self.indices:
            tag, index_dict = judge_rollover_type(index)
            if tag:
                ret_index['rollover'].append(index_dict)
                if index_dict['index_name'] in ret_index['rollover_index_name']:
                    pass
                else:
                    ret_index['rollover_index_name'].append(index_dict['index_name'])
            else:
                ret_index['no_rollover'].append(index_dict)
        return ret_index

    def delete_index(self, index_list):
        if self.dry_run:
            print(f"dry_run delete_index:\n{index_list}")
        else:
            for index_name in index_list:
                try:
                    if self.client.indices.exists(index=index_name):
                        self.client.indices.delete(index=index_name)
                    else:
                        print(f"{index_name} not exists")
                except Exception as e:
                    print(f'delete_index error: {e}')

    def do_delete_index(self):
        indices_can_delete = indices_ready_to_delete(self.classify_index()['rollover'])
        for index_key in indices_can_delete.keys():
            if len(indices_can_delete[index_key]) > 0:
                print(f"start to clean {index_key}: {indices_can_delete[index_key]}")
                self.delete_index(indices_can_delete[index_key])
        print(f'no_rollover: {json.dumps(self.classify_index()["no_rollover"])}')


if __name__ == '__main__':
    esc = ElasticsearchClean()
    esc .dry_run = False
    esc.do_delete_index()

db_helper.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-

import pymysql
import traceback


class MysqlHelper:
    def __init__(self, sql_host, sql_user, sql_pwd, db_name):
        self.host = sql_host
        self.usr = sql_user
        self.pwd = sql_pwd
        self.dbname = db_name
        self.port = 3306
        self.charset = 'utf8'
        self.db = pymysql.connect(host=self.host, user=self.usr, passwd=self.pwd, db=self.dbname, charset=self.charset)

    def close_db(self):
        self.db.close()

    def show_version(self):
        # 使用 cursor() 方法创建一个游标对象 cursor
        cursor = self.db.cursor()
        # 使用 execute()  方法执行 SQL 查询
        cursor.execute("SELECT VERSION()")
        # 使用 fetchone() 方法获取单条数据.
        data = cursor.fetchone()
        print(f"Database version : {data}")

    def get_indices_list(self):
        ret_list = []
        instance = None
        sql = f"select number, cluster, index_name, save_days, datetime, ilp_name, alias from es_indices"
        try:
            cursor = self.db.cursor()
            cursor.execute(sql)
            ret = cursor.fetchall()
            if ret is None:
                return []
            for item in ret:
                instance = {'number': item[0], 'cluster': item[1], 'index_name': item[2], 'save_days': item[3],
                            'datetime': item[4].strftime("%Y-%m-%d %H:%M:%S"), 'ilp_name': item[5], 'alias': item[6]}
                ret_list.append(instance)
            print(f"get_indices_list succeed!")
            return True, ret_list
        except Exception as e:
            self.db.rollback()
            print(f"get_indices_list error:\n{e}, {traceback.print_exc()}")
            return False, f"get_indices_list error:\n{e}, {traceback.print_exc()}"

    def get_indice_instance(self, cluster, index_name):
        sql = f"select number, cluster, index_name, save_days, datetime, ilp_name, alias from es_indices " \
              f"where cluster='{cluster}' and index_name='{index_name}'"
        try:
            cursor = self.db.cursor()
            cursor.execute(sql)
            item = cursor.fetchone()
            if item is None:
                return False, f"no {cluster}/{index_name}"
            instance = {'number': item[0], 'cluster': item[1], 'index_name': item[2], 'save_days': item[3],
                        'datetime': item[4].strftime("%Y-%m-%d %H:%M:%S"), 'ilp_name': item[5], 'alias': item[6]}
            print(f"get_indice_instance succeed!")
            return True, instance
        except Exception as e:
            print(f"get_indice_instance error:\n{e}, {traceback.print_exc()}")
        return False, f"get_indice_instance error:\n{e}, {traceback.print_exc()}"


def test_case():
    from config import SQL_USER, SQL_PWD, SQL_HOST, SQL_DBNAME
    sql_host = SQL_HOST
    sql_user = SQL_USER
    sql_pwd = SQL_PWD
    sql_dbname = SQL_DBNAME

    sql = MysqlHelper(sql_host, sql_user, sql_pwd, sql_dbname)
    sql.show_version()

    tag, ret = sql.get_indices_list()
    if tag:
        for i in range(0, len(ret)):
            print(i, ret[i])
    else:
        print(ret)
    tag, ret = sql.get_indice_instance('sre-elk', 'test-elk')
    print(tag, ret)
    sql.close_db()


if __name__ == "__main__":
    test_case()

config.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-

SQL_HOST = '127.0.0.1'
SQL_USER = 'root'
SQL_PWD = '111111'
SQL_DBNAME = 'bigdata_sre_log'

ES_HOST = '127.0.0.1:9200'
ES_USER = 'elastic'
ES_PWD = 'elastic'

2.3 测试

db 数据:
在这里插入图片描述
es 索引:
在这里插入图片描述
执行输出:
在这里插入图片描述

3 注意事项

  1. 当前还没有统计符合要求且在db中没有记录的索引,后续有时间更新第二版的时候再补充。
  2. 数据库中存放的index_name 是rollover后生成索引的前缀, 例如 test-elk-2021.08.xx-0000xx 系列rollover后的索引对应数据库的 test-elk。

4 说明

  1. 软件环境:
    pip 包
    pip3 install elasticsearch==7.13.2 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
    pip3 install elasticsearch-curator==5.8.4 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
    pip3 install PyMySQL==1.0.2 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
    
    2 elk 5.10
    3 mysql 8.0
    4 python 3.8
    5 ubuntu 2020.04 Desktop
  2. 参考文档:
    1 curator.readthedocs.io
    2 www.elastic.co/guide/en/elasticsearch/client/curator
    3 elk笔记8–index
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

elk笔记23--定期清理索引 的相关文章

  • Neo4j 关系索引 - 搜索关系属性

    我有一个具有以下结构的 neo4j 图 账户 交易 账户 Transaction是neo4j关系 Account是节点 每笔交易都设置了各种属性 例如交易 ID 金额 日期和各种其他银行信息 我可以按帐户 ID 运行搜索 结果返回正常 但是
  • Flume的Spool Dir可以在远程机器上吗?

    每当新文件到达特定文件夹时 我就尝试将文件从远程计算机获取到我的 hdfs 我在flume中遇到了spool dir的概念 如果spool dir位于运行flume代理的同一台机器上 那么它工作得很好 有什么方法可以在远程计算机中配置假脱机
  • Pandas 根据列的值有效地分块读取大型面板 CSV

    我有一个很大的 CSV 文件 磁盘上约 50 GB 但无法立即将其完全读入内存 数据集本身是面板数据 看起来像 ID Time Col 1 Col N 1 2000 1 1 1 2000 1 2 2 2000 1 1 我加载这些数据的想法是
  • 在 MATLAB 中处理大型 CSV 文件

    我必须处理一个最大 2GB 的大 CSV 文件 更具体地说 我必须将所有这些数据上传到 mySQL 数据库 但在我必须对此进行一些计算之前 所以我需要在 MATLAB 中完成所有这些操作 我的主管也想在 MATLAB 中完成 因为他熟悉MA
  • 大稀疏矩阵到三角矩阵 R

    我在 R 中有一个非常大的 大约 9100 万个非零条目 sparseMatrix 如下所示 gt myMatrix a b c a 1 2 b 1 c 2 我想将其转换为三角矩阵 上或下 但是当我尝试 myMatrix myMatrix
  • 有效地广播具有多个变量的大型数据集

    我在尝试着dcast大型数据集 数百万行 我有一行用于到达时间和出发地 另一行用于出发时间和目的地 有一个id识别这两种情况下的单位 它看起来类似于 id time movement origin dest 1 10 06 2011 15
  • 全新安装时的 HDFS 空间使用情况

    我刚刚安装了 HDFS 并启动了该服务 并且已使用空间已经超过800MB 它代表什么 hdfs dfs df h Filesystem Size Used Available Use hdfs quickstart cloudera 802
  • 分段读取 CSV 文件的策略?

    我的计算机上有一个中等大小的文件 4GB CSV 但没有足够的 RAM 来读取该文件 64 位 Windows 上为 8GB 在过去 我只是将其加载到集群节点上并将其读入 但我的新集群似乎任意将进程限制为 4GB RAM 尽管每台机器的硬件
  • Flink 中的水印和触发器有什么区别?

    我读到 排序运算符必须缓冲它接收到的所有元素 然后 当它接收到水印时 它可以对时间戳低于水印的所有元素进行排序 并按排序顺序发出它们 这是正确 因为水印表明不能有更多元素到达并与已排序元素混合 https cwiki apache org
  • Logstash 的流行为不一致 - ELK

    我有一个包含几个平面字段和几个嵌套字段的索引 我正在尝试通过 Logstash 将 SQL Server 中的信息通过特定 ID 流式传输到嵌套字段中 当我流式传输数据时only one然后它就完全成功地通过了 没有任何问题 另一方面 当我
  • Pig - 如何迭代一袋地图

    让我解释一下这个问题 我有这行代码 u FOREACH persons GENERATE FLATTEN 0 experiences as j dump u 产生以下输出 id 1 date begin 12 2012 descriptio
  • 将 pandas 数据框中的行和上一行与数百万行进行比较的最快方法

    我正在寻找解决方案来加速我编写的函数 以循环遍历 pandas 数据帧并比较当前行和前一行之间的列值 例如 这是我的问题的简化版本 User Time Col1 newcol1 newcol2 newcol3 newcol4 0 1 6 c
  • HDFS 作为 cloudera 快速入门 docker 中的卷

    我对 hadoop 和 docker 都很陌生 我一直致力于扩展 cloudera quickstart docker 镜像 docker 文件 并希望从主机挂载一个目录并将其映射到 hdfs 位置 以便提高性能并将数据保存在本地 当我在任
  • 如何在 Elasticsearch 中或在 Lucene 级别进行联接

    在 Elasticsearch 中执行相当于 SQL 连接的最佳方法是什么 我有一个包含两个大表的 SQL 设置 Persons 和 Items 一个人可以拥有many项目 人员和项目行都可以更改 即更新 我必须运行根据人和物品的各个方面进
  • 使用 Kinesis Analytics 构建实时会话

    是否有某个地方的示例 或者有人可以解释如何使用 Kinesis Analytics 构建实时会话 即会话化 这里提到这可能 https aws amazon com blogs aws amazon kinesis analytics pr
  • Postgresql - 在大数据库中使用数组的性能

    假设我们有一个包含 600 万条记录的表 有 16 个整数列和少量文本列 它是只读表 因此每个整数列都有一个索引 每条记录大约 50 60 字节 表名称为 项目 服务器为 12 GB RAM 1 5 TB SATA 4 核 所有 postg
  • 了解 Azure 事件中心分区使用者模式

    Azure 事件中心使用分区使用者模式中描述的docs https learn microsoft com en us azure event hubs event hubs features 当涉及到现实世界场景时 我在理解该模型的消费者
  • Hive ParseException - 无法识别“结束”“字符串”附近的输入

    尝试从现有 DynamoDB 表创建 Hive 表时出现以下错误 NoViableAltException 88 at org apache hadoop hive ql parse HiveParser IdentifiersParser
  • 使用大矩阵操作

    我必须使用 big matrix 对象 并且无法计算某些函数 让我们考虑以下大矩阵 create big matrix object x lt as big matrix matrix sample 1 10 20 replace TRUE
  • 使用 mapWithState Spark Streaming 过滤部分重复项

    我们有一个DStream 比如 val ssc new StreamingContext sc Seconds 1 val kS KafkaUtils createDirectStream String TMapRecord ssc Pre

随机推荐

  • C# InvokeRequired和Invoke

    一 简介 WinForm 关于InvokeRequired与Invoke Jlins的博客 CSDN博客 invokerequired c 是禁止夸线程直接访问控件 InvokeRequired是为了解决这个问题而产生的 当一个控件的Inv
  • cpu与gpu实现矩阵相乘对比

    1 完成矩阵相乘的并行程序的实现 要求 实现2个矩阵 1024 1024 的相乘 数据类型设置为float 1 使用CPU计算 include
  • 开源的一些基础介绍

    国内 淘宝 百度 南航 网易等 国外 新浪 搜狐 facebook ebay google等 成功后的企业也在不断为开源添加新能量 如 taobao和google等 因为他们不但被开源的魅力深深吸引住同时也愿意通过开源提升自我 现在更多的企
  • Wrappers.<实体>lambdaQuery的使用

    文章目录 MP 配置 Service CURD接口 Mapper CURD接口 insert delete update select 条件构造器 LambdaUpdateWrapper Wrappers lt 实体 gt lambdaUp
  • 机器人教育是一种科学的探究方式

    创新是推动经济社会发展的核心驱动力 当前 我国已经深刻认识到世界新科技革命带来的机遇和挑战 以高度的历史责任感 强烈的忧患意识和宽广的世界眼光 把创新作为推动经济社会发展的驱动力量 机器人技术的进步将会对科学与技术的发展产生重要影响 只有开
  • 算法(C)(两数之和)

    题目 两数之和 题目描述 给定一个整数数组 nums 和一个整数目标值 target 请你在该数组中找出 和为目标值 target 的那 两个 整数 并返回它们的数组下标 你可以假设每种输入只会对应一个答案 但是 数组中同一个元素在答案里不
  • JSON使用的一些总结

    http sx666 blogspot com 2007 11 json html JSON JavaScript Object Notation 是一种轻量级的数据交换格式 它采用完全独立于语言的文本格式 可以用来在客户端和服务器端传输数
  • innerText和innerHTML区别

    innerText和innerHTML区别 尽管DOM带来了动态修改文档的能力 但对开发人员来说 这还不够 IE4 0为所有的元素引入了两个特性 以更方便的进行文档操作 这两个特性是innerText和innerHTML 其中innerTe
  • Oracle:重复数据去重,只取其中一条(最新时间/其他字段排序规则)数据

    一 问题 一个会话id代表一个聊天室 返回该聊天室最新的一条数据显示在会话列表 二 解决思路 使用row number over 分组排序功能 来解决该问题 1 语法格式 row number over partition by 分组列 o
  • TMOD、SCON、PCON寄存器的配置

    TMOD控制寄存器 TMOD是定时器 计数器模式控制寄存器 它是一个逐位定义的8为寄存器 但只能使用字节寻址 其各位是 由上图我们就可以看出 这个寄存器控制了两个定时器 计数器 寄存器的高四位控制定时器1 低四位控制定时器0 GATE 门控
  • 数据分析毕业设计 二手房数据爬取与分析可视化系统 -python

    1 前言 这两年开始毕业设计和毕业答辩的要求和难度不断提升 传统的毕设题目缺少创新和亮点 往往达不到毕业答辩的要求 这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求 为了大家能够顺利以及最少的精力通过毕设 学长分享优质毕业设计项
  • Air700E开发板

    文章目录 基础资料 概述 主要功能 外设分布 PinOut 管脚定义 管脚功能说明 固件升级 正常开机模式 下载模式 IO 电平选择 基础资料 Air700E文档中心 概述 EVB Air700E 开发板是合宙通信推出的基于 Air700E
  • 去除VsCode代码前面的小点点

    去除VsCode代码前面的小点点 去除图片中的点 步骤 File gt Preferences gt Setting 搜索RenderWhitespace 将Text Editor下的Editor Render Whitespace改为no
  • peewee-async使用描述

    1 peewee async是一个为peewee ORM 提供由asyncio支持的异步io库 在单独使用peewee连接池连接时 同时使用到了async和await协程 这样操作会阻塞整个进程 因为tornado是单进程 必须数据库也使用
  • 数据库的简介与类型 #CSDN博文精选# #IT技术# #数据库#

    大家好 小C将继续与你们见面 带来精选的CSDN博文 又到周一啦 上周的系统化学习专栏已经结束 我们总共一起学习了20篇文章 这周将开启全新专栏 放假不停学 全栈工程师养成记 在这里 你将收获 将系统化学习理论运用于实践 系统学习IT技术
  • 高通 AR Unity 虚拟按钮

    1 虚拟按钮是图像上的目标 用户可以在现实世界中触摸 以触发一个动作的 热点 现有的图像目标的一个实例的VirtualButton预制拖动到场景中添加虚拟按键 平移和缩放按钮 以匹配所需的位置 并给它一个名字 虚拟的按钮添加这样写入到con
  • 计算机视觉概述

    关注公众号 CV算法恩仇录 本文介绍了计算机视觉的主要任务及应用 全文大约 3500 字 阅读时间 10 分钟 人们或许没有意识到自己的视觉系统是如此的强大 婴儿在出生几个小时后能识别出母亲的容貌 在大雾的天气 学生看见朦胧的身体形态 能辨
  • v-viewer 插件图片点击放大预览的几种使用方法

    官网git地址 https github com mirari v viewer 最终效果如下 ps 按钮样式都是可以根据自己需求调整的 第一种使用方法 支持UMD用法 建议把v viewer相关的js和css文件下载到本地引用 可以到上面
  • set容器、map容器

    set multiset 容器 set基本概念 简介 所有元素都会在插入时自动被排序 本质 set multiset属于关联式容器 底层结构是用二叉树实现 set和multiset区别 set不允许容器中有重复的元素 multiset允许容
  • elk笔记23--定期清理索引

    elk笔记23 定期清理索引 1 介绍 2 方案 代码 2 1 方案介绍 2 2 代码 2 3 测试 3 注意事项 4 说明 1 介绍 在生产环境中 如果日志量过大 就会导致集群持续产生很多索引 占用很多存储空间 因此需要定期清理索引 确保