1 介绍
在生产环境中,如果日志量过大,就会导致集群持续产生很多索引,占用很多存储空间。因此需要定期清理索引,确保磁盘处于一个比较低的水位。本文使用mysql 存储索引基础信息,基于mysql 中所以保存的时间,定期清理超过指定时间 rollover 生成的索引。
2 方案&代码
2.1 方案介绍
每次新建索引的时候将索引基础信息存放到mysql中,具体包括索引名称、别名、存放时间和ilp信息。
每次清理索引的时候从db中查看基础索引信息,然后获取集群中所有索引的信息,当索引rollover的生成的数量大于2 且生成时间大于存放时间的时候,就删除超时的索引。
每次清理的时候还需要统计出在数据库中没有记录和不符合rollover规则的索引,以便于后续整改或者过滤。
本文 rollover 方式参考 elk笔记8–index -> 1. index 创建的几种方式 -> 1.3 创建带有rollover功能的索引
2.2 代码
sql db 信息:
use bigdata_sre_log;
CREATE TABLE `es_indices` (
`number` int(11) NOT NULL AUTO_INCREMENT,
`cluster` char(50) NOT NULL,
`index_name` char(50) NOT NULL,
`save_days` int(4) DEFAULT NULL,
`datetime` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`ilp_name` char(50) DEFAULT '',
`alias` char(50) DEFAULT NULL,
PRIMARY KEY (`number`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;
源码信息:
app.py
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import datetime
import json
import elasticsearch
import curator
from config import *
from db_helper import MysqlHelper
def get_indices_from_db():
sql = MysqlHelper(SQL_HOST, SQL_USER, SQL_PWD, SQL_DBNAME)
sql.show_version()
tag, ret = sql.get_indices_list()
indices = {}
if tag:
for index in ret:
indices[index['index_name']] = {'alias': index['alias'], 'save_days': index['save_days']}
return indices
def indices_ready_to_delete(rollover_index_list):
"""
从rollover索引中找到可以删除的索引,
:param rollover_index_list: [{index_name, ymd_number}, {}]
:return: { 'index1': [], 'index2': []}
"""
index_info = get_indices_from_db()
index_delete = {}
# print(f'indices_ready_to_delete, {rollover_index_list}')
for index in rollover_index_list:
if index['index_name'] not in index_delete.keys():
index_delete[index['index_name']] = [index['ymd_number']]
else:
index_delete[index['index_name']].append(index['ymd_number'])
for index in index_delete.keys():
index_list = index_delete[index]
index_list.sort()
ymd_cur = datetime.datetime.now()
ymd_cur_offset = datetime.timedelta(days=-index_info[index]['save_days'])
ymd_old = ymd_cur + ymd_cur_offset
ymd_old_str = ymd_old.strftime("%Y%m%d")
if len(index_list) <= 2:
index_delete[index] = []
else:
index_list_can_delete = []
for i in range(0, len(index_list)-2): # at least save the last 2 indices
if int(ymd_old_str) > int(index_list[i][:4] + index_list[i][5:7] + index_list[i][8:10]):
# print(index, int(ymd_old_str), int(index_list[i][:4] + index_list[i][5:7] + index_list[i][8:10]))
index_list_can_delete.append(index + '-' + index_list[i])
index_delete[index] = index_list_can_delete
return index_delete
def judge_rollover_type(index_name):
"""
判断索引是否为rollover类型, 判断格式 {index_name}-{yyyy}.{mm}.{dd}-{number} , number 为6位数
:param index_name:
:return: {'index_name': 'xxx', 'ymd_number': 'xxx'}
"""
if len(index_name) > 18:
ymd = index_name[len(index_name) - 17: len(index_name) - 7]
number = index_name[len(index_name) - 6:]
# print(index_name[:len(index_name) - 18], ymd[:4], ymd[5:7], ymd[8:10], number)
ymd_list = [ymd[:4], ymd[5:7], ymd[8:10]]
for char in ymd_list:
if char < '0' or char > '9':
return False, {'index_name': index_name, 'ymd_number': None}
else:
return False, {'index_name': index_name, 'ymd_number': None}
return True, {'index_name': index_name[:len(index_name) - 18], 'ymd_number': index_name[len(index_name) - 17:]}
class ElasticsearchClean(object):
def __init__(self):
self.client = elasticsearch.Elasticsearch([ES_HOST], http_auth=(ES_USER, ES_PWD), timeout=30, max_retries=5)
self.indices = curator.get_indices(self.client)
self.dry_run = True
def print_index(self):
print(type(self.indices), len(self.indices))
print(self.indices[0], type(self.indices[0]))
def classify_index(self):
"""
1) 判断rollover 类型
2) rollover 类型index 放在 rollover 中, 非 rollover 类型的index 放在 no_rollover 中
:return: {'rollover': [], 'rollover_index_name': [], 'no_rollover': []}
"""
ret_index = {'rollover': [], 'rollover_index_name': [], 'no_rollover': []}
for index in self.indices:
tag, index_dict = judge_rollover_type(index)
if tag:
ret_index['rollover'].append(index_dict)
if index_dict['index_name'] in ret_index['rollover_index_name']:
pass
else:
ret_index['rollover_index_name'].append(index_dict['index_name'])
else:
ret_index['no_rollover'].append(index_dict)
return ret_index
def delete_index(self, index_list):
if self.dry_run:
print(f"dry_run delete_index:\n{index_list}")
else:
for index_name in index_list:
try:
if self.client.indices.exists(index=index_name):
self.client.indices.delete(index=index_name)
else:
print(f"{index_name} not exists")
except Exception as e:
print(f'delete_index error: {e}')
def do_delete_index(self):
indices_can_delete = indices_ready_to_delete(self.classify_index()['rollover'])
for index_key in indices_can_delete.keys():
if len(indices_can_delete[index_key]) > 0:
print(f"start to clean {index_key}: {indices_can_delete[index_key]}")
self.delete_index(indices_can_delete[index_key])
print(f'no_rollover: {json.dumps(self.classify_index()["no_rollover"])}')
if __name__ == '__main__':
esc = ElasticsearchClean()
esc .dry_run = False
esc.do_delete_index()
db_helper.py
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import pymysql
import traceback
class MysqlHelper:
def __init__(self, sql_host, sql_user, sql_pwd, db_name):
self.host = sql_host
self.usr = sql_user
self.pwd = sql_pwd
self.dbname = db_name
self.port = 3306
self.charset = 'utf8'
self.db = pymysql.connect(host=self.host, user=self.usr, passwd=self.pwd, db=self.dbname, charset=self.charset)
def close_db(self):
self.db.close()
def show_version(self):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = self.db.cursor()
# 使用 execute() 方法执行 SQL 查询
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()
print(f"Database version : {data}")
def get_indices_list(self):
ret_list = []
instance = None
sql = f"select number, cluster, index_name, save_days, datetime, ilp_name, alias from es_indices"
try:
cursor = self.db.cursor()
cursor.execute(sql)
ret = cursor.fetchall()
if ret is None:
return []
for item in ret:
instance = {'number': item[0], 'cluster': item[1], 'index_name': item[2], 'save_days': item[3],
'datetime': item[4].strftime("%Y-%m-%d %H:%M:%S"), 'ilp_name': item[5], 'alias': item[6]}
ret_list.append(instance)
print(f"get_indices_list succeed!")
return True, ret_list
except Exception as e:
self.db.rollback()
print(f"get_indices_list error:\n{e}, {traceback.print_exc()}")
return False, f"get_indices_list error:\n{e}, {traceback.print_exc()}"
def get_indice_instance(self, cluster, index_name):
sql = f"select number, cluster, index_name, save_days, datetime, ilp_name, alias from es_indices " \
f"where cluster='{cluster}' and index_name='{index_name}'"
try:
cursor = self.db.cursor()
cursor.execute(sql)
item = cursor.fetchone()
if item is None:
return False, f"no {cluster}/{index_name}"
instance = {'number': item[0], 'cluster': item[1], 'index_name': item[2], 'save_days': item[3],
'datetime': item[4].strftime("%Y-%m-%d %H:%M:%S"), 'ilp_name': item[5], 'alias': item[6]}
print(f"get_indice_instance succeed!")
return True, instance
except Exception as e:
print(f"get_indice_instance error:\n{e}, {traceback.print_exc()}")
return False, f"get_indice_instance error:\n{e}, {traceback.print_exc()}"
def test_case():
from config import SQL_USER, SQL_PWD, SQL_HOST, SQL_DBNAME
sql_host = SQL_HOST
sql_user = SQL_USER
sql_pwd = SQL_PWD
sql_dbname = SQL_DBNAME
sql = MysqlHelper(sql_host, sql_user, sql_pwd, sql_dbname)
sql.show_version()
tag, ret = sql.get_indices_list()
if tag:
for i in range(0, len(ret)):
print(i, ret[i])
else:
print(ret)
tag, ret = sql.get_indice_instance('sre-elk', 'test-elk')
print(tag, ret)
sql.close_db()
if __name__ == "__main__":
test_case()
config.py
#!/usr/bin/python3
# -*- coding:utf-8 -*-
SQL_HOST = '127.0.0.1'
SQL_USER = 'root'
SQL_PWD = '111111'
SQL_DBNAME = 'bigdata_sre_log'
ES_HOST = '127.0.0.1:9200'
ES_USER = 'elastic'
ES_PWD = 'elastic'
2.3 测试
db 数据:
es 索引:
执行输出:
3 注意事项
- 当前还没有统计符合要求且在db中没有记录的索引,后续有时间更新第二版的时候再补充。
- 数据库中存放的index_name 是rollover后生成索引的前缀, 例如 test-elk-2021.08.xx-0000xx 系列rollover后的索引对应数据库的 test-elk。
4 说明
- 软件环境:
pip 包pip3 install elasticsearch==7.13.2 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
pip3 install elasticsearch-curator==5.8.4 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
pip3 install PyMySQL==1.0.2 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
2 elk 5.10
3 mysql 8.0
4 python 3.8
5 ubuntu 2020.04 Desktop
- 参考文档:
1 curator.readthedocs.io
2 www.elastic.co/guide/en/elasticsearch/client/curator
3 elk笔记8–index